Hi Kamal,

Sorry for the late review.
Thanks for the KIP, this will improve the transaction reading for remote
storage.
Overall LGTM, just one minor thought:

Currently, we only store the `TxnIndexEmpty` bool value in the segment
metadata.
Do you think it is helpful if we store the "least abort start offset in the
segment", and -1 means no txnIndex. So that we can have a way to know if we
need to fetch this txn index or not.

Thanks.
Luke

On Mon, Sep 9, 2024 at 3:26 PM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Hi all,
>
> If there are no more comments, I'll start a voting thread soon.
>
> Thanks,
> Kamal
>
> On Fri, Sep 6, 2024 at 7:28 PM Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > Bumping this thread again for review!
> >
> > Reduced the scope of the proposal to minimum. We will be adding only one
> > field (txnIdxEmpty) to the
> > RemoteLogSegmentMetadata event which is backward compatible. PTAL.
> >
> > Thanks,
> > Kamal
> >
> >
> > On Tue, Aug 13, 2024 at 6:33 PM Kamal Chandraprakash <
> > kamal.chandraprak...@gmail.com> wrote:
> >
> >> Bumping this thread for KIP review!
> >>
> >> We can go for the simplest solution that is proposed in this KIP and
> >> it can be improved in the subsequent iteration. PTAL.
> >>
> >> Thanks,
> >> Kamal
> >>
> >> On Fri, Aug 2, 2024 at 11:42 AM Kamal Chandraprakash <
> >> kamal.chandraprak...@gmail.com> wrote:
> >>
> >>> Hi Divij,
> >>>
> >>> Thanks for the review! And, sorry for the late reply.
> >>>
> >>> From the UnifiedLog.scala
> >>> <
> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L421-427
> >
> >>> doc:
> >>>
> >>> """
> >>> The last stable offset (LSO) is defined as the first offset such that
> >>> all lower offsets have been "decided."
> >>>    * Non-transactional messages are considered decided immediately, but
> >>> transactional messages are only decided when
> >>>    * the corresponding COMMIT or ABORT marker is written. This implies
> >>> that the last stable offset will be equal
> >>>    * to the high watermark if there are no transactional messages in
> the
> >>> log. Note also that the LSO cannot advance
> >>>    * beyond the high watermark.
> >>> """
> >>> While rolling the active segment to passive, if LSO equals to HW, then
> >>> all the messages in that segment are
> >>> decided and we can store the `lastStableOffsetLag` as an attribute in
> >>> the rolled segment. We can then propagate
> >>> the `lastStableOffsetLag` information in the RemoteLogMetadata events.
> >>>
> >>> While reading the remote log segment, if the `lastStableOffsetLag` is
> 0,
> >>> then there is no need to traverse to
> >>> the subsequent segments for aborted transactions which covers the case
> >>> for the dominant case where the
> >>> partition had no transactions at all.
> >>>
> >>> With Log compaction, the shrinked segments might get merged. One option
> >>> is to take the max of `lastStableOffsetLag`
> >>> and store it in the new LogSegment. Since, the tiered storage does not
> >>> support compacted topics / historical compacted
> >>> topics, we can omit this case.
> >>>
> >>> If this approach looks good, I can update the KIP with the details.
> >>>
> >>> --
> >>> Kamal
> >>>
> >>>
> >>>
> >>>
> >>> On Tue, Jun 25, 2024 at 4:24 PM Divij Vaidya <divijvaidy...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Kamal
> >>>>
> >>>> Thanks for the bump. I have been thinking about this passively for the
> >>>> past
> >>>> few days.
> >>>>
> >>>> The simplest solution is to store a state at segment level metadata.
> The
> >>>> state should specify whether the trx index is empty or not. It would
> be
> >>>> populated during segment archival. We would then iterate over the
> >>>> metadata
> >>>> for future segments without having to make a remote call to download
> the
> >>>> trx index itself.
> >>>>
> >>>> The other solution for storing state at a partition level wouldn't
> >>>> work, as
> >>>> you mentioned, because we will have to change the state on every
> >>>> mutation
> >>>> to the log i.e. at expiration of segments and append.
> >>>>
> >>>> I have been thinking whether we can do something better than the
> simple
> >>>> solution, hence the delay in replying. Let me tell you my half baked
> >>>> train
> >>>> of thoughts, perhaps, you can explore this as well. I have been
> thinking
> >>>> about using LSO (last stable offset) to handle the case when the
> >>>> partition
> >>>> never had any transactions. For a partition which never had any
> >>>> transaction, I would assume that the LSO is never initialized (or is
> >>>> equal
> >>>> to log start offset)? Or is it equal to HW in that case? This is
> >>>> something
> >>>> that I am yet to verify. If this idea works, then we would not have to
> >>>> iterate through the metadata for the dominant case where the partition
> >>>> had
> >>>> no transactions at all.
> >>>>
> >>>> --
> >>>> Divij Vaidya
> >>>>
> >>>>
> >>>>
> >>>> On Tue, Jun 25, 2024 at 11:42 AM Kamal Chandraprakash <
> >>>> kamal.chandraprak...@gmail.com> wrote:
> >>>>
> >>>> > Bump. Please review this proposal.
> >>>> >
> >>>> >
> >>>> > On Mon, Jun 17, 2024 at 6:55 PM Kamal Chandraprakash <
> >>>> > kamal.chandraprak...@gmail.com> wrote:
> >>>> >
> >>>> > > Divij,
> >>>> > >
> >>>> > > Thanks for the review! Updated the KIP with 1, 2, 3, and 4 review
> >>>> > > comments.
> >>>> > >
> >>>> > > > 4. Potential alternative - Instead of having an algorithm where
> we
> >>>> > > traverse
> >>>> > > across segment metadata and looking for isTxnIdxEmpty flag, should
> >>>> we
> >>>> > > directly introduce a nextSegmentWithTrxInx() function? This would
> >>>> allow
> >>>> > > implementers to optimize the otherwise linear scan across metadata
> >>>> for
> >>>> > all
> >>>> > > segments by using techniques such as skip list etc.
> >>>> > >
> >>>> > > This is a good point to optimize the scan. We need to maintain the
> >>>> > > skip-list
> >>>> > > for each leader-epoch. With unclean leader election, some brokers
> >>>> may not
> >>>> > > have
> >>>> > > the complete lineage. This will expand the scope of the work.
> >>>> > >
> >>>> > > In this version, we plan to optimize only for the below 2 cases:
> >>>> > >
> >>>> > > 1. A partition does not have the transaction index for any of the
> >>>> > uploaded
> >>>> > > segments.
> >>>> > >    The individual log segments `isTxnIdxEmpty` flag can be reduced
> >>>> to a
> >>>> > > single flag
> >>>> > >    in RLMM (using AND operator) that can serve the query - "Is all
> >>>> the
> >>>> > > transaction indexes empty for a partition?".
> >>>> > >    If yes, then we can directly scan the local-log for aborted
> >>>> > > transactions.
> >>>> > > 2. A partition is produced using the transactional producer. The
> >>>> > > assumption made is that
> >>>> > >     the transaction will either commit/rollback within 15 minutes
> >>>> > >     (default transaction.max.timeout.ms = 15 mins), possibly we
> >>>> may have
> >>>> > > to search only
> >>>> > >     a few consecutive remote log segments to collect the aborted
> >>>> > > transactions.
> >>>> > > 3. A partition is being produced with both normal and
> transactional
> >>>> > > producers. In this case,
> >>>> > >     we will be doing linear traversal. Maintaining a skip-list
> might
> >>>> > > improve the performance but
> >>>> > >     we delegate the RLMM implementation to users. If implemented
> >>>> > > incorrectly, then it can lead
> >>>> > >     to delivery of the aborted transaction records to the
> consumer.
> >>>> > >
> >>>> > > I notice two drawbacks with the reduction method as proposed in
> the
> >>>> KIP:
> >>>> > >
> >>>> > > 1. Even if one segment has a transaction index, then we have to
> >>>> iterate
> >>>> > > over all the metadata events.
> >>>> > > 2. Assume that there are 10 segments and segment-5 has a txn
> index.
> >>>> Once
> >>>> > > the first 6 segments are deleted,
> >>>> > >     due to breach by time/size/start-offset, then we should return
> >>>> `true`
> >>>> > > for "Is all the transaction indexes empty for a partition?"
> >>>> > >    query but it will return `false` until the broker gets
> restarted
> >>>> and
> >>>> > we
> >>>> > > have to resort to iterate over all the metadata events.
> >>>> > >
> >>>> > > > 5. Potential alternative#2 - We know that we may want the
> indexes
> >>>> of
> >>>> > > multiple higher segments. Instead of fetching them sequentially,
> we
> >>>> could
> >>>> > > implement a parallel fetch or a pre-fetch for the indexes. This
> >>>> would
> >>>> > help
> >>>> > > hide the latency of sequentially fetching the trx indexes.
> >>>> > >
> >>>> > > We can implement parallel-fetch/prefetch once the tiered storage
> is
> >>>> GAed.
> >>>> > > Since this feature will be useful
> >>>> > > to prefetch the next remote log segment and it expands the scope
> of
> >>>> the
> >>>> > > work.
> >>>> > >
> >>>> > > > 6. Should the proposed API take "segmentId" as a parameter
> >>>> instead of
> >>>> > > "topicIdPartition"? Suggesting because isTxnIdEmpty is not a
> >>>> property of
> >>>> > a
> >>>> > > partition, instead it's a property of a specific segment.
> >>>> > >
> >>>> > > We propose to use the `topicIdPartition` in
> >>>> RemoteLogMetadataManager.
> >>>> > > The implementation can fold/reduce the value of the individual log
> >>>> > segment
> >>>> > > `isTxnIdEmpty` flag. This is added to avoid scanning all the
> >>>> metadata
> >>>> > > events
> >>>> > > when the partition does not have a transaction index in any of the
> >>>> > > segments.
> >>>> > >
> >>>> > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya <
> >>>> divijvaidy...@gmail.com>
> >>>> > > wrote:
> >>>> > >
> >>>> > >> Hi Kamal
> >>>> > >>
> >>>> > >> Thanks for bringing this up. This is a problem worth solving. We
> >>>> have
> >>>> > >> faced
> >>>> > >> this in situations where some Kafka clients default to
> >>>> read_committed
> >>>> > mode
> >>>> > >> and end up having high latencies for remote fetches due to this
> >>>> > traversal
> >>>> > >> across all segments.
> >>>> > >>
> >>>> > >> First some nits to clarify the KIP:
> >>>> > >> 1. The motivation should make it clear that traversal of all
> >>>> segments is
> >>>> > >> only in the worst case. If I am not mistaken (please correct me
> if
> >>>> > wrong),
> >>>> > >> the traversal stops when it has found a segment containing LSO.
> >>>> > >> 2. There is nothing like a non-txn topic. A transaction may be
> >>>> started
> >>>> > on
> >>>> > >> any topic. Perhaps, rephrase the statement in the KIP so that it
> is
> >>>> > clear
> >>>> > >> to the reader.
> >>>> > >> 3. The hyperlink in the "the broker has to traverse all the..."
> >>>> seems
> >>>> > >> incorrect. Did you want to point to
> >>>> > >>
> >>>> > >>
> >>>> >
> >>>>
> https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444
> >>>> > >> ?
> >>>> > >> 4. In the testing section, could we add a test plan? For
> example, I
> >>>> > would
> >>>> > >> list down adding a test which would verify the number of calls
> >>>> made to
> >>>> > >> RLMM. This test would have a higher number of calls earlier vs.
> >>>> after
> >>>> > this
> >>>> > >> KIP.
> >>>> > >>
> >>>> > >> Other thoughts:
> >>>> > >> 4. Potential alternative - Instead of having an algorithm where
> we
> >>>> > >> traverse
> >>>> > >> across segment metadata and looking for isTxnIdxEmpty flag,
> should
> >>>> we
> >>>> > >> directly introduce a nextSegmentWithTrxInx() function? This would
> >>>> allow
> >>>> > >> implementers to optimize the otherwise linear scan across
> metadata
> >>>> for
> >>>> > all
> >>>> > >> segments by using techniques such as skip list etc.
> >>>> > >> 5. Potential alternative#2 - We know that we may want the indexes
> >>>> of
> >>>> > >> multiple higher segments. Instead of fetching them sequentially,
> we
> >>>> > could
> >>>> > >> implement a parallel fetch or a pre-fetch for the indexes. This
> >>>> would
> >>>> > help
> >>>> > >> hide the latency of sequentially fetching the trx indexes.
> >>>> > >> 6. Should the proposed API take "segmentId" as a parameter
> instead
> >>>> of
> >>>> > >> "topicIdPartition"? Suggesting because isTxnIdEmpty is not a
> >>>> property
> >>>> > of a
> >>>> > >> partition, instead it's a property of a specific segment.
> >>>> > >>
> >>>> > >> Looking forward to hearing your thoughts about the alternatives.
> >>>> Let's
> >>>> > get
> >>>> > >> this fixed.
> >>>> > >>
> >>>> > >> --
> >>>> > >> Divij Vaidya
> >>>> > >>
> >>>> > >>
> >>>> > >>
> >>>> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal Chandraprakash <
> >>>> > >> kamal.chandraprak...@gmail.com> wrote:
> >>>> > >>
> >>>> > >> > Hi all,
> >>>> > >> >
> >>>> > >> > I have opened a KIP-1058
> >>>> > >> > <
> >>>> > >> >
> >>>> > >>
> >>>> >
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
> >>>> > >> > >
> >>>> > >> > to reduce the pressure on remote storage when transactional
> >>>> consumers
> >>>> > >> are
> >>>> > >> > reading non-txn topics from remote storage.
> >>>> > >> >
> >>>> > >> >
> >>>> > >> >
> >>>> > >>
> >>>> >
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
> >>>> > >> >
> >>>> > >> > Feedbacks and suggestions are welcome.
> >>>> > >> >
> >>>> > >> > Thanks,
> >>>> > >> > Kamal
> >>>> > >> >
> >>>> > >>
> >>>> > >
> >>>> >
> >>>>
> >>>
>

Reply via email to