Hi Luke,

Thanks for the review!

> Do you think it is helpful if we store the "least abort start offset in
the
segment", and -1 means no txnIndex. So that we can have a way to know if we
need to fetch this txn index or not.

1. No, this change won't have an effect. To find the upper-bound offset
[1], we have to
    fetch that segment's offset index file. The RemoteIndexCache [2]
fetches all the 3
    index files together and caches them for subsequent use, so this
improvement
    won't have an effect as the current segment txn index gets downloaded
anyway.

2. The reason for choosing boolean is to make the change backward
compatible.
     There can be existing RLM events for the uploaded segments. The
default
     value of `txnIdxEmpty` is false so the *old* RLM events are assumed to
contain
     the txn index files and those files are downloaded if they exist.

[1]:
https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1732
[2]:
https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java?L383

Thanks,
Kamal

On Thu, Oct 3, 2024 at 3:11 PM Luke Chen <show...@gmail.com> wrote:

> Hi Kamal,
>
> Sorry for the late review.
> Thanks for the KIP, this will improve the transaction reading for remote
> storage.
> Overall LGTM, just one minor thought:
>
> Currently, we only store the `TxnIndexEmpty` bool value in the segment
> metadata.
> Do you think it is helpful if we store the "least abort start offset in the
> segment", and -1 means no txnIndex. So that we can have a way to know if we
> need to fetch this txn index or not.
>
> Thanks.
> Luke
>
> On Mon, Sep 9, 2024 at 3:26 PM Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > Hi all,
> >
> > If there are no more comments, I'll start a voting thread soon.
> >
> > Thanks,
> > Kamal
> >
> > On Fri, Sep 6, 2024 at 7:28 PM Kamal Chandraprakash <
> > kamal.chandraprak...@gmail.com> wrote:
> >
> > > Bumping this thread again for review!
> > >
> > > Reduced the scope of the proposal to minimum. We will be adding only
> one
> > > field (txnIdxEmpty) to the
> > > RemoteLogSegmentMetadata event which is backward compatible. PTAL.
> > >
> > > Thanks,
> > > Kamal
> > >
> > >
> > > On Tue, Aug 13, 2024 at 6:33 PM Kamal Chandraprakash <
> > > kamal.chandraprak...@gmail.com> wrote:
> > >
> > >> Bumping this thread for KIP review!
> > >>
> > >> We can go for the simplest solution that is proposed in this KIP and
> > >> it can be improved in the subsequent iteration. PTAL.
> > >>
> > >> Thanks,
> > >> Kamal
> > >>
> > >> On Fri, Aug 2, 2024 at 11:42 AM Kamal Chandraprakash <
> > >> kamal.chandraprak...@gmail.com> wrote:
> > >>
> > >>> Hi Divij,
> > >>>
> > >>> Thanks for the review! And, sorry for the late reply.
> > >>>
> > >>> From the UnifiedLog.scala
> > >>> <
> >
> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L421-427
> > >
> > >>> doc:
> > >>>
> > >>> """
> > >>> The last stable offset (LSO) is defined as the first offset such that
> > >>> all lower offsets have been "decided."
> > >>>    * Non-transactional messages are considered decided immediately,
> but
> > >>> transactional messages are only decided when
> > >>>    * the corresponding COMMIT or ABORT marker is written. This
> implies
> > >>> that the last stable offset will be equal
> > >>>    * to the high watermark if there are no transactional messages in
> > the
> > >>> log. Note also that the LSO cannot advance
> > >>>    * beyond the high watermark.
> > >>> """
> > >>> While rolling the active segment to passive, if LSO equals to HW,
> then
> > >>> all the messages in that segment are
> > >>> decided and we can store the `lastStableOffsetLag` as an attribute in
> > >>> the rolled segment. We can then propagate
> > >>> the `lastStableOffsetLag` information in the RemoteLogMetadata
> events.
> > >>>
> > >>> While reading the remote log segment, if the `lastStableOffsetLag` is
> > 0,
> > >>> then there is no need to traverse to
> > >>> the subsequent segments for aborted transactions which covers the
> case
> > >>> for the dominant case where the
> > >>> partition had no transactions at all.
> > >>>
> > >>> With Log compaction, the shrinked segments might get merged. One
> option
> > >>> is to take the max of `lastStableOffsetLag`
> > >>> and store it in the new LogSegment. Since, the tiered storage does
> not
> > >>> support compacted topics / historical compacted
> > >>> topics, we can omit this case.
> > >>>
> > >>> If this approach looks good, I can update the KIP with the details.
> > >>>
> > >>> --
> > >>> Kamal
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On Tue, Jun 25, 2024 at 4:24 PM Divij Vaidya <
> divijvaidy...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi Kamal
> > >>>>
> > >>>> Thanks for the bump. I have been thinking about this passively for
> the
> > >>>> past
> > >>>> few days.
> > >>>>
> > >>>> The simplest solution is to store a state at segment level metadata.
> > The
> > >>>> state should specify whether the trx index is empty or not. It would
> > be
> > >>>> populated during segment archival. We would then iterate over the
> > >>>> metadata
> > >>>> for future segments without having to make a remote call to download
> > the
> > >>>> trx index itself.
> > >>>>
> > >>>> The other solution for storing state at a partition level wouldn't
> > >>>> work, as
> > >>>> you mentioned, because we will have to change the state on every
> > >>>> mutation
> > >>>> to the log i.e. at expiration of segments and append.
> > >>>>
> > >>>> I have been thinking whether we can do something better than the
> > simple
> > >>>> solution, hence the delay in replying. Let me tell you my half baked
> > >>>> train
> > >>>> of thoughts, perhaps, you can explore this as well. I have been
> > thinking
> > >>>> about using LSO (last stable offset) to handle the case when the
> > >>>> partition
> > >>>> never had any transactions. For a partition which never had any
> > >>>> transaction, I would assume that the LSO is never initialized (or is
> > >>>> equal
> > >>>> to log start offset)? Or is it equal to HW in that case? This is
> > >>>> something
> > >>>> that I am yet to verify. If this idea works, then we would not have
> to
> > >>>> iterate through the metadata for the dominant case where the
> partition
> > >>>> had
> > >>>> no transactions at all.
> > >>>>
> > >>>> --
> > >>>> Divij Vaidya
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Tue, Jun 25, 2024 at 11:42 AM Kamal Chandraprakash <
> > >>>> kamal.chandraprak...@gmail.com> wrote:
> > >>>>
> > >>>> > Bump. Please review this proposal.
> > >>>> >
> > >>>> >
> > >>>> > On Mon, Jun 17, 2024 at 6:55 PM Kamal Chandraprakash <
> > >>>> > kamal.chandraprak...@gmail.com> wrote:
> > >>>> >
> > >>>> > > Divij,
> > >>>> > >
> > >>>> > > Thanks for the review! Updated the KIP with 1, 2, 3, and 4
> review
> > >>>> > > comments.
> > >>>> > >
> > >>>> > > > 4. Potential alternative - Instead of having an algorithm
> where
> > we
> > >>>> > > traverse
> > >>>> > > across segment metadata and looking for isTxnIdxEmpty flag,
> should
> > >>>> we
> > >>>> > > directly introduce a nextSegmentWithTrxInx() function? This
> would
> > >>>> allow
> > >>>> > > implementers to optimize the otherwise linear scan across
> metadata
> > >>>> for
> > >>>> > all
> > >>>> > > segments by using techniques such as skip list etc.
> > >>>> > >
> > >>>> > > This is a good point to optimize the scan. We need to maintain
> the
> > >>>> > > skip-list
> > >>>> > > for each leader-epoch. With unclean leader election, some
> brokers
> > >>>> may not
> > >>>> > > have
> > >>>> > > the complete lineage. This will expand the scope of the work.
> > >>>> > >
> > >>>> > > In this version, we plan to optimize only for the below 2 cases:
> > >>>> > >
> > >>>> > > 1. A partition does not have the transaction index for any of
> the
> > >>>> > uploaded
> > >>>> > > segments.
> > >>>> > >    The individual log segments `isTxnIdxEmpty` flag can be
> reduced
> > >>>> to a
> > >>>> > > single flag
> > >>>> > >    in RLMM (using AND operator) that can serve the query - "Is
> all
> > >>>> the
> > >>>> > > transaction indexes empty for a partition?".
> > >>>> > >    If yes, then we can directly scan the local-log for aborted
> > >>>> > > transactions.
> > >>>> > > 2. A partition is produced using the transactional producer. The
> > >>>> > > assumption made is that
> > >>>> > >     the transaction will either commit/rollback within 15
> minutes
> > >>>> > >     (default transaction.max.timeout.ms = 15 mins), possibly we
> > >>>> may have
> > >>>> > > to search only
> > >>>> > >     a few consecutive remote log segments to collect the aborted
> > >>>> > > transactions.
> > >>>> > > 3. A partition is being produced with both normal and
> > transactional
> > >>>> > > producers. In this case,
> > >>>> > >     we will be doing linear traversal. Maintaining a skip-list
> > might
> > >>>> > > improve the performance but
> > >>>> > >     we delegate the RLMM implementation to users. If implemented
> > >>>> > > incorrectly, then it can lead
> > >>>> > >     to delivery of the aborted transaction records to the
> > consumer.
> > >>>> > >
> > >>>> > > I notice two drawbacks with the reduction method as proposed in
> > the
> > >>>> KIP:
> > >>>> > >
> > >>>> > > 1. Even if one segment has a transaction index, then we have to
> > >>>> iterate
> > >>>> > > over all the metadata events.
> > >>>> > > 2. Assume that there are 10 segments and segment-5 has a txn
> > index.
> > >>>> Once
> > >>>> > > the first 6 segments are deleted,
> > >>>> > >     due to breach by time/size/start-offset, then we should
> return
> > >>>> `true`
> > >>>> > > for "Is all the transaction indexes empty for a partition?"
> > >>>> > >    query but it will return `false` until the broker gets
> > restarted
> > >>>> and
> > >>>> > we
> > >>>> > > have to resort to iterate over all the metadata events.
> > >>>> > >
> > >>>> > > > 5. Potential alternative#2 - We know that we may want the
> > indexes
> > >>>> of
> > >>>> > > multiple higher segments. Instead of fetching them sequentially,
> > we
> > >>>> could
> > >>>> > > implement a parallel fetch or a pre-fetch for the indexes. This
> > >>>> would
> > >>>> > help
> > >>>> > > hide the latency of sequentially fetching the trx indexes.
> > >>>> > >
> > >>>> > > We can implement parallel-fetch/prefetch once the tiered storage
> > is
> > >>>> GAed.
> > >>>> > > Since this feature will be useful
> > >>>> > > to prefetch the next remote log segment and it expands the scope
> > of
> > >>>> the
> > >>>> > > work.
> > >>>> > >
> > >>>> > > > 6. Should the proposed API take "segmentId" as a parameter
> > >>>> instead of
> > >>>> > > "topicIdPartition"? Suggesting because isTxnIdEmpty is not a
> > >>>> property of
> > >>>> > a
> > >>>> > > partition, instead it's a property of a specific segment.
> > >>>> > >
> > >>>> > > We propose to use the `topicIdPartition` in
> > >>>> RemoteLogMetadataManager.
> > >>>> > > The implementation can fold/reduce the value of the individual
> log
> > >>>> > segment
> > >>>> > > `isTxnIdEmpty` flag. This is added to avoid scanning all the
> > >>>> metadata
> > >>>> > > events
> > >>>> > > when the partition does not have a transaction index in any of
> the
> > >>>> > > segments.
> > >>>> > >
> > >>>> > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya <
> > >>>> divijvaidy...@gmail.com>
> > >>>> > > wrote:
> > >>>> > >
> > >>>> > >> Hi Kamal
> > >>>> > >>
> > >>>> > >> Thanks for bringing this up. This is a problem worth solving.
> We
> > >>>> have
> > >>>> > >> faced
> > >>>> > >> this in situations where some Kafka clients default to
> > >>>> read_committed
> > >>>> > mode
> > >>>> > >> and end up having high latencies for remote fetches due to this
> > >>>> > traversal
> > >>>> > >> across all segments.
> > >>>> > >>
> > >>>> > >> First some nits to clarify the KIP:
> > >>>> > >> 1. The motivation should make it clear that traversal of all
> > >>>> segments is
> > >>>> > >> only in the worst case. If I am not mistaken (please correct me
> > if
> > >>>> > wrong),
> > >>>> > >> the traversal stops when it has found a segment containing LSO.
> > >>>> > >> 2. There is nothing like a non-txn topic. A transaction may be
> > >>>> started
> > >>>> > on
> > >>>> > >> any topic. Perhaps, rephrase the statement in the KIP so that
> it
> > is
> > >>>> > clear
> > >>>> > >> to the reader.
> > >>>> > >> 3. The hyperlink in the "the broker has to traverse all the..."
> > >>>> seems
> > >>>> > >> incorrect. Did you want to point to
> > >>>> > >>
> > >>>> > >>
> > >>>> >
> > >>>>
> >
> https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444
> > >>>> > >> ?
> > >>>> > >> 4. In the testing section, could we add a test plan? For
> > example, I
> > >>>> > would
> > >>>> > >> list down adding a test which would verify the number of calls
> > >>>> made to
> > >>>> > >> RLMM. This test would have a higher number of calls earlier vs.
> > >>>> after
> > >>>> > this
> > >>>> > >> KIP.
> > >>>> > >>
> > >>>> > >> Other thoughts:
> > >>>> > >> 4. Potential alternative - Instead of having an algorithm where
> > we
> > >>>> > >> traverse
> > >>>> > >> across segment metadata and looking for isTxnIdxEmpty flag,
> > should
> > >>>> we
> > >>>> > >> directly introduce a nextSegmentWithTrxInx() function? This
> would
> > >>>> allow
> > >>>> > >> implementers to optimize the otherwise linear scan across
> > metadata
> > >>>> for
> > >>>> > all
> > >>>> > >> segments by using techniques such as skip list etc.
> > >>>> > >> 5. Potential alternative#2 - We know that we may want the
> indexes
> > >>>> of
> > >>>> > >> multiple higher segments. Instead of fetching them
> sequentially,
> > we
> > >>>> > could
> > >>>> > >> implement a parallel fetch or a pre-fetch for the indexes. This
> > >>>> would
> > >>>> > help
> > >>>> > >> hide the latency of sequentially fetching the trx indexes.
> > >>>> > >> 6. Should the proposed API take "segmentId" as a parameter
> > instead
> > >>>> of
> > >>>> > >> "topicIdPartition"? Suggesting because isTxnIdEmpty is not a
> > >>>> property
> > >>>> > of a
> > >>>> > >> partition, instead it's a property of a specific segment.
> > >>>> > >>
> > >>>> > >> Looking forward to hearing your thoughts about the
> alternatives.
> > >>>> Let's
> > >>>> > get
> > >>>> > >> this fixed.
> > >>>> > >>
> > >>>> > >> --
> > >>>> > >> Divij Vaidya
> > >>>> > >>
> > >>>> > >>
> > >>>> > >>
> > >>>> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal Chandraprakash <
> > >>>> > >> kamal.chandraprak...@gmail.com> wrote:
> > >>>> > >>
> > >>>> > >> > Hi all,
> > >>>> > >> >
> > >>>> > >> > I have opened a KIP-1058
> > >>>> > >> > <
> > >>>> > >> >
> > >>>> > >>
> > >>>> >
> > >>>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
> > >>>> > >> > >
> > >>>> > >> > to reduce the pressure on remote storage when transactional
> > >>>> consumers
> > >>>> > >> are
> > >>>> > >> > reading non-txn topics from remote storage.
> > >>>> > >> >
> > >>>> > >> >
> > >>>> > >> >
> > >>>> > >>
> > >>>> >
> > >>>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
> > >>>> > >> >
> > >>>> > >> > Feedbacks and suggestions are welcome.
> > >>>> > >> >
> > >>>> > >> > Thanks,
> > >>>> > >> > Kamal
> > >>>> > >> >
> > >>>> > >>
> > >>>> > >
> > >>>> >
> > >>>>
> > >>>
> >
>

Reply via email to