Hi Christo,

Thanks for the review!

Adding the new API `nextRemoteLogSegmentMetadataWithTxnIndex` in RLMM helps
to
reduce the complexity of linear search. With this API, we have to:

1. Maintain one more skip-list [1] for each of the epochs in the partition
in RLMM that might
    increase the memory usage of TopicBased RLMM implementation.
    1a) The skip-list will be empty when there are no aborted txn entries
for a partition/epoch which is the predominant case.
    1b) The skip-list will act as a duplicate when *most* of the segments
have aborted txn entries, assuming aborted txn are quite low, this should
be fine.
2. Change the logic to retrieve the aborted txns (we have to query the
nextRLSMWithTxnIndex
    for each of the leader-epoch).
3. Logic divergence from how we retrieve the aborted txn entries compared
to the local-log.

The approach looks good to me. If everyone is aligned, then we can proceed
to add this API to RLMM.

Another option I was thinking of is to capture the `lastStableOffsetLag`
[2] while rotating the segment.
But, that is a bigger change we can take later.

[1]:
https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java?L43
[2]:
https://sourcegraph.com/github.com/apache/kafka/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L432


Thanks,
Kamal

On Fri, Oct 4, 2024 at 4:21 PM Christo Lolov <christolo...@gmail.com> wrote:

> Heya,
>
> Apologies for the delay. I have been thinking about this problem recently
> as well and while I believe storing a boolean in the metadata is good, I
> think we can do better by introducing a new method to the RLMM along the
> lines of
>
> Optional<RemoteLogSegmentMetadata>
> nextRemoteLogSegmentMetadataWithTxnIndex(TopicIdPartition topicIdPartition,
> int epochForOffset, long offset) throws RemoteStorageException
>
> This will help plugin implementers to build optimisations such as skip
> lists which will give them the next segment quicker than a linear search.
>
> I am keen to hear your thoughts!
>
> Best,
> Christo
>
> On Fri, 4 Oct 2024 at 10:48, Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > Hi Luke,
> >
> > Thanks for the review!
> >
> > > Do you think it is helpful if we store the "least abort start offset in
> > the
> > segment", and -1 means no txnIndex. So that we can have a way to know if
> we
> > need to fetch this txn index or not.
> >
> > 1. No, this change won't have an effect. To find the upper-bound offset
> > [1], we have to
> >     fetch that segment's offset index file. The RemoteIndexCache [2]
> > fetches all the 3
> >     index files together and caches them for subsequent use, so this
> > improvement
> >     won't have an effect as the current segment txn index gets downloaded
> > anyway.
> >
> > 2. The reason for choosing boolean is to make the change backward
> > compatible.
> >      There can be existing RLM events for the uploaded segments. The
> > default
> >      value of `txnIdxEmpty` is false so the *old* RLM events are assumed
> to
> > contain
> >      the txn index files and those files are downloaded if they exist.
> >
> > [1]:
> >
> >
> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1732
> > [2]:
> >
> >
> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java?L383
> >
> > Thanks,
> > Kamal
> >
> > On Thu, Oct 3, 2024 at 3:11 PM Luke Chen <show...@gmail.com> wrote:
> >
> > > Hi Kamal,
> > >
> > > Sorry for the late review.
> > > Thanks for the KIP, this will improve the transaction reading for
> remote
> > > storage.
> > > Overall LGTM, just one minor thought:
> > >
> > > Currently, we only store the `TxnIndexEmpty` bool value in the segment
> > > metadata.
> > > Do you think it is helpful if we store the "least abort start offset in
> > the
> > > segment", and -1 means no txnIndex. So that we can have a way to know
> if
> > we
> > > need to fetch this txn index or not.
> > >
> > > Thanks.
> > > Luke
> > >
> > > On Mon, Sep 9, 2024 at 3:26 PM Kamal Chandraprakash <
> > > kamal.chandraprak...@gmail.com> wrote:
> > >
> > > > Hi all,
> > > >
> > > > If there are no more comments, I'll start a voting thread soon.
> > > >
> > > > Thanks,
> > > > Kamal
> > > >
> > > > On Fri, Sep 6, 2024 at 7:28 PM Kamal Chandraprakash <
> > > > kamal.chandraprak...@gmail.com> wrote:
> > > >
> > > > > Bumping this thread again for review!
> > > > >
> > > > > Reduced the scope of the proposal to minimum. We will be adding
> only
> > > one
> > > > > field (txnIdxEmpty) to the
> > > > > RemoteLogSegmentMetadata event which is backward compatible. PTAL.
> > > > >
> > > > > Thanks,
> > > > > Kamal
> > > > >
> > > > >
> > > > > On Tue, Aug 13, 2024 at 6:33 PM Kamal Chandraprakash <
> > > > > kamal.chandraprak...@gmail.com> wrote:
> > > > >
> > > > >> Bumping this thread for KIP review!
> > > > >>
> > > > >> We can go for the simplest solution that is proposed in this KIP
> and
> > > > >> it can be improved in the subsequent iteration. PTAL.
> > > > >>
> > > > >> Thanks,
> > > > >> Kamal
> > > > >>
> > > > >> On Fri, Aug 2, 2024 at 11:42 AM Kamal Chandraprakash <
> > > > >> kamal.chandraprak...@gmail.com> wrote:
> > > > >>
> > > > >>> Hi Divij,
> > > > >>>
> > > > >>> Thanks for the review! And, sorry for the late reply.
> > > > >>>
> > > > >>> From the UnifiedLog.scala
> > > > >>> <
> > > >
> > >
> >
> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L421-427
> > > > >
> > > > >>> doc:
> > > > >>>
> > > > >>> """
> > > > >>> The last stable offset (LSO) is defined as the first offset such
> > that
> > > > >>> all lower offsets have been "decided."
> > > > >>>    * Non-transactional messages are considered decided
> immediately,
> > > but
> > > > >>> transactional messages are only decided when
> > > > >>>    * the corresponding COMMIT or ABORT marker is written. This
> > > implies
> > > > >>> that the last stable offset will be equal
> > > > >>>    * to the high watermark if there are no transactional messages
> > in
> > > > the
> > > > >>> log. Note also that the LSO cannot advance
> > > > >>>    * beyond the high watermark.
> > > > >>> """
> > > > >>> While rolling the active segment to passive, if LSO equals to HW,
> > > then
> > > > >>> all the messages in that segment are
> > > > >>> decided and we can store the `lastStableOffsetLag` as an
> attribute
> > in
> > > > >>> the rolled segment. We can then propagate
> > > > >>> the `lastStableOffsetLag` information in the RemoteLogMetadata
> > > events.
> > > > >>>
> > > > >>> While reading the remote log segment, if the
> `lastStableOffsetLag`
> > is
> > > > 0,
> > > > >>> then there is no need to traverse to
> > > > >>> the subsequent segments for aborted transactions which covers the
> > > case
> > > > >>> for the dominant case where the
> > > > >>> partition had no transactions at all.
> > > > >>>
> > > > >>> With Log compaction, the shrinked segments might get merged. One
> > > option
> > > > >>> is to take the max of `lastStableOffsetLag`
> > > > >>> and store it in the new LogSegment. Since, the tiered storage
> does
> > > not
> > > > >>> support compacted topics / historical compacted
> > > > >>> topics, we can omit this case.
> > > > >>>
> > > > >>> If this approach looks good, I can update the KIP with the
> details.
> > > > >>>
> > > > >>> --
> > > > >>> Kamal
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> On Tue, Jun 25, 2024 at 4:24 PM Divij Vaidya <
> > > divijvaidy...@gmail.com>
> > > > >>> wrote:
> > > > >>>
> > > > >>>> Hi Kamal
> > > > >>>>
> > > > >>>> Thanks for the bump. I have been thinking about this passively
> for
> > > the
> > > > >>>> past
> > > > >>>> few days.
> > > > >>>>
> > > > >>>> The simplest solution is to store a state at segment level
> > metadata.
> > > > The
> > > > >>>> state should specify whether the trx index is empty or not. It
> > would
> > > > be
> > > > >>>> populated during segment archival. We would then iterate over
> the
> > > > >>>> metadata
> > > > >>>> for future segments without having to make a remote call to
> > download
> > > > the
> > > > >>>> trx index itself.
> > > > >>>>
> > > > >>>> The other solution for storing state at a partition level
> wouldn't
> > > > >>>> work, as
> > > > >>>> you mentioned, because we will have to change the state on every
> > > > >>>> mutation
> > > > >>>> to the log i.e. at expiration of segments and append.
> > > > >>>>
> > > > >>>> I have been thinking whether we can do something better than the
> > > > simple
> > > > >>>> solution, hence the delay in replying. Let me tell you my half
> > baked
> > > > >>>> train
> > > > >>>> of thoughts, perhaps, you can explore this as well. I have been
> > > > thinking
> > > > >>>> about using LSO (last stable offset) to handle the case when the
> > > > >>>> partition
> > > > >>>> never had any transactions. For a partition which never had any
> > > > >>>> transaction, I would assume that the LSO is never initialized
> (or
> > is
> > > > >>>> equal
> > > > >>>> to log start offset)? Or is it equal to HW in that case? This is
> > > > >>>> something
> > > > >>>> that I am yet to verify. If this idea works, then we would not
> > have
> > > to
> > > > >>>> iterate through the metadata for the dominant case where the
> > > partition
> > > > >>>> had
> > > > >>>> no transactions at all.
> > > > >>>>
> > > > >>>> --
> > > > >>>> Divij Vaidya
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> On Tue, Jun 25, 2024 at 11:42 AM Kamal Chandraprakash <
> > > > >>>> kamal.chandraprak...@gmail.com> wrote:
> > > > >>>>
> > > > >>>> > Bump. Please review this proposal.
> > > > >>>> >
> > > > >>>> >
> > > > >>>> > On Mon, Jun 17, 2024 at 6:55 PM Kamal Chandraprakash <
> > > > >>>> > kamal.chandraprak...@gmail.com> wrote:
> > > > >>>> >
> > > > >>>> > > Divij,
> > > > >>>> > >
> > > > >>>> > > Thanks for the review! Updated the KIP with 1, 2, 3, and 4
> > > review
> > > > >>>> > > comments.
> > > > >>>> > >
> > > > >>>> > > > 4. Potential alternative - Instead of having an algorithm
> > > where
> > > > we
> > > > >>>> > > traverse
> > > > >>>> > > across segment metadata and looking for isTxnIdxEmpty flag,
> > > should
> > > > >>>> we
> > > > >>>> > > directly introduce a nextSegmentWithTrxInx() function? This
> > > would
> > > > >>>> allow
> > > > >>>> > > implementers to optimize the otherwise linear scan across
> > > metadata
> > > > >>>> for
> > > > >>>> > all
> > > > >>>> > > segments by using techniques such as skip list etc.
> > > > >>>> > >
> > > > >>>> > > This is a good point to optimize the scan. We need to
> maintain
> > > the
> > > > >>>> > > skip-list
> > > > >>>> > > for each leader-epoch. With unclean leader election, some
> > > brokers
> > > > >>>> may not
> > > > >>>> > > have
> > > > >>>> > > the complete lineage. This will expand the scope of the
> work.
> > > > >>>> > >
> > > > >>>> > > In this version, we plan to optimize only for the below 2
> > cases:
> > > > >>>> > >
> > > > >>>> > > 1. A partition does not have the transaction index for any
> of
> > > the
> > > > >>>> > uploaded
> > > > >>>> > > segments.
> > > > >>>> > >    The individual log segments `isTxnIdxEmpty` flag can be
> > > reduced
> > > > >>>> to a
> > > > >>>> > > single flag
> > > > >>>> > >    in RLMM (using AND operator) that can serve the query -
> "Is
> > > all
> > > > >>>> the
> > > > >>>> > > transaction indexes empty for a partition?".
> > > > >>>> > >    If yes, then we can directly scan the local-log for
> aborted
> > > > >>>> > > transactions.
> > > > >>>> > > 2. A partition is produced using the transactional producer.
> > The
> > > > >>>> > > assumption made is that
> > > > >>>> > >     the transaction will either commit/rollback within 15
> > > minutes
> > > > >>>> > >     (default transaction.max.timeout.ms = 15 mins),
> possibly
> > we
> > > > >>>> may have
> > > > >>>> > > to search only
> > > > >>>> > >     a few consecutive remote log segments to collect the
> > aborted
> > > > >>>> > > transactions.
> > > > >>>> > > 3. A partition is being produced with both normal and
> > > > transactional
> > > > >>>> > > producers. In this case,
> > > > >>>> > >     we will be doing linear traversal. Maintaining a
> skip-list
> > > > might
> > > > >>>> > > improve the performance but
> > > > >>>> > >     we delegate the RLMM implementation to users. If
> > implemented
> > > > >>>> > > incorrectly, then it can lead
> > > > >>>> > >     to delivery of the aborted transaction records to the
> > > > consumer.
> > > > >>>> > >
> > > > >>>> > > I notice two drawbacks with the reduction method as proposed
> > in
> > > > the
> > > > >>>> KIP:
> > > > >>>> > >
> > > > >>>> > > 1. Even if one segment has a transaction index, then we have
> > to
> > > > >>>> iterate
> > > > >>>> > > over all the metadata events.
> > > > >>>> > > 2. Assume that there are 10 segments and segment-5 has a txn
> > > > index.
> > > > >>>> Once
> > > > >>>> > > the first 6 segments are deleted,
> > > > >>>> > >     due to breach by time/size/start-offset, then we should
> > > return
> > > > >>>> `true`
> > > > >>>> > > for "Is all the transaction indexes empty for a partition?"
> > > > >>>> > >    query but it will return `false` until the broker gets
> > > > restarted
> > > > >>>> and
> > > > >>>> > we
> > > > >>>> > > have to resort to iterate over all the metadata events.
> > > > >>>> > >
> > > > >>>> > > > 5. Potential alternative#2 - We know that we may want the
> > > > indexes
> > > > >>>> of
> > > > >>>> > > multiple higher segments. Instead of fetching them
> > sequentially,
> > > > we
> > > > >>>> could
> > > > >>>> > > implement a parallel fetch or a pre-fetch for the indexes.
> > This
> > > > >>>> would
> > > > >>>> > help
> > > > >>>> > > hide the latency of sequentially fetching the trx indexes.
> > > > >>>> > >
> > > > >>>> > > We can implement parallel-fetch/prefetch once the tiered
> > storage
> > > > is
> > > > >>>> GAed.
> > > > >>>> > > Since this feature will be useful
> > > > >>>> > > to prefetch the next remote log segment and it expands the
> > scope
> > > > of
> > > > >>>> the
> > > > >>>> > > work.
> > > > >>>> > >
> > > > >>>> > > > 6. Should the proposed API take "segmentId" as a parameter
> > > > >>>> instead of
> > > > >>>> > > "topicIdPartition"? Suggesting because isTxnIdEmpty is not a
> > > > >>>> property of
> > > > >>>> > a
> > > > >>>> > > partition, instead it's a property of a specific segment.
> > > > >>>> > >
> > > > >>>> > > We propose to use the `topicIdPartition` in
> > > > >>>> RemoteLogMetadataManager.
> > > > >>>> > > The implementation can fold/reduce the value of the
> individual
> > > log
> > > > >>>> > segment
> > > > >>>> > > `isTxnIdEmpty` flag. This is added to avoid scanning all the
> > > > >>>> metadata
> > > > >>>> > > events
> > > > >>>> > > when the partition does not have a transaction index in any
> of
> > > the
> > > > >>>> > > segments.
> > > > >>>> > >
> > > > >>>> > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya <
> > > > >>>> divijvaidy...@gmail.com>
> > > > >>>> > > wrote:
> > > > >>>> > >
> > > > >>>> > >> Hi Kamal
> > > > >>>> > >>
> > > > >>>> > >> Thanks for bringing this up. This is a problem worth
> solving.
> > > We
> > > > >>>> have
> > > > >>>> > >> faced
> > > > >>>> > >> this in situations where some Kafka clients default to
> > > > >>>> read_committed
> > > > >>>> > mode
> > > > >>>> > >> and end up having high latencies for remote fetches due to
> > this
> > > > >>>> > traversal
> > > > >>>> > >> across all segments.
> > > > >>>> > >>
> > > > >>>> > >> First some nits to clarify the KIP:
> > > > >>>> > >> 1. The motivation should make it clear that traversal of
> all
> > > > >>>> segments is
> > > > >>>> > >> only in the worst case. If I am not mistaken (please
> correct
> > me
> > > > if
> > > > >>>> > wrong),
> > > > >>>> > >> the traversal stops when it has found a segment containing
> > LSO.
> > > > >>>> > >> 2. There is nothing like a non-txn topic. A transaction may
> > be
> > > > >>>> started
> > > > >>>> > on
> > > > >>>> > >> any topic. Perhaps, rephrase the statement in the KIP so
> that
> > > it
> > > > is
> > > > >>>> > clear
> > > > >>>> > >> to the reader.
> > > > >>>> > >> 3. The hyperlink in the "the broker has to traverse all
> > the..."
> > > > >>>> seems
> > > > >>>> > >> incorrect. Did you want to point to
> > > > >>>> > >>
> > > > >>>> > >>
> > > > >>>> >
> > > > >>>>
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444
> > > > >>>> > >> ?
> > > > >>>> > >> 4. In the testing section, could we add a test plan? For
> > > > example, I
> > > > >>>> > would
> > > > >>>> > >> list down adding a test which would verify the number of
> > calls
> > > > >>>> made to
> > > > >>>> > >> RLMM. This test would have a higher number of calls earlier
> > vs.
> > > > >>>> after
> > > > >>>> > this
> > > > >>>> > >> KIP.
> > > > >>>> > >>
> > > > >>>> > >> Other thoughts:
> > > > >>>> > >> 4. Potential alternative - Instead of having an algorithm
> > where
> > > > we
> > > > >>>> > >> traverse
> > > > >>>> > >> across segment metadata and looking for isTxnIdxEmpty flag,
> > > > should
> > > > >>>> we
> > > > >>>> > >> directly introduce a nextSegmentWithTrxInx() function? This
> > > would
> > > > >>>> allow
> > > > >>>> > >> implementers to optimize the otherwise linear scan across
> > > > metadata
> > > > >>>> for
> > > > >>>> > all
> > > > >>>> > >> segments by using techniques such as skip list etc.
> > > > >>>> > >> 5. Potential alternative#2 - We know that we may want the
> > > indexes
> > > > >>>> of
> > > > >>>> > >> multiple higher segments. Instead of fetching them
> > > sequentially,
> > > > we
> > > > >>>> > could
> > > > >>>> > >> implement a parallel fetch or a pre-fetch for the indexes.
> > This
> > > > >>>> would
> > > > >>>> > help
> > > > >>>> > >> hide the latency of sequentially fetching the trx indexes.
> > > > >>>> > >> 6. Should the proposed API take "segmentId" as a parameter
> > > > instead
> > > > >>>> of
> > > > >>>> > >> "topicIdPartition"? Suggesting because isTxnIdEmpty is not
> a
> > > > >>>> property
> > > > >>>> > of a
> > > > >>>> > >> partition, instead it's a property of a specific segment.
> > > > >>>> > >>
> > > > >>>> > >> Looking forward to hearing your thoughts about the
> > > alternatives.
> > > > >>>> Let's
> > > > >>>> > get
> > > > >>>> > >> this fixed.
> > > > >>>> > >>
> > > > >>>> > >> --
> > > > >>>> > >> Divij Vaidya
> > > > >>>> > >>
> > > > >>>> > >>
> > > > >>>> > >>
> > > > >>>> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal Chandraprakash <
> > > > >>>> > >> kamal.chandraprak...@gmail.com> wrote:
> > > > >>>> > >>
> > > > >>>> > >> > Hi all,
> > > > >>>> > >> >
> > > > >>>> > >> > I have opened a KIP-1058
> > > > >>>> > >> > <
> > > > >>>> > >> >
> > > > >>>> > >>
> > > > >>>> >
> > > > >>>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
> > > > >>>> > >> > >
> > > > >>>> > >> > to reduce the pressure on remote storage when
> transactional
> > > > >>>> consumers
> > > > >>>> > >> are
> > > > >>>> > >> > reading non-txn topics from remote storage.
> > > > >>>> > >> >
> > > > >>>> > >> >
> > > > >>>> > >> >
> > > > >>>> > >>
> > > > >>>> >
> > > > >>>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
> > > > >>>> > >> >
> > > > >>>> > >> > Feedbacks and suggestions are welcome.
> > > > >>>> > >> >
> > > > >>>> > >> > Thanks,
> > > > >>>> > >> > Kamal
> > > > >>>> > >> >
> > > > >>>> > >>
> > > > >>>> > >
> > > > >>>> >
> > > > >>>>
> > > > >>>
> > > >
> > >
> >
>

Reply via email to