Heya,

Apologies for the delay. I have been thinking about this problem recently
as well and while I believe storing a boolean in the metadata is good, I
think we can do better by introducing a new method to the RLMM along the
lines of

Optional<RemoteLogSegmentMetadata>
nextRemoteLogSegmentMetadataWithTxnIndex(TopicIdPartition topicIdPartition,
int epochForOffset, long offset) throws RemoteStorageException

This will help plugin implementers to build optimisations such as skip
lists which will give them the next segment quicker than a linear search.

I am keen to hear your thoughts!

Best,
Christo

On Fri, 4 Oct 2024 at 10:48, Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Hi Luke,
>
> Thanks for the review!
>
> > Do you think it is helpful if we store the "least abort start offset in
> the
> segment", and -1 means no txnIndex. So that we can have a way to know if we
> need to fetch this txn index or not.
>
> 1. No, this change won't have an effect. To find the upper-bound offset
> [1], we have to
>     fetch that segment's offset index file. The RemoteIndexCache [2]
> fetches all the 3
>     index files together and caches them for subsequent use, so this
> improvement
>     won't have an effect as the current segment txn index gets downloaded
> anyway.
>
> 2. The reason for choosing boolean is to make the change backward
> compatible.
>      There can be existing RLM events for the uploaded segments. The
> default
>      value of `txnIdxEmpty` is false so the *old* RLM events are assumed to
> contain
>      the txn index files and those files are downloaded if they exist.
>
> [1]:
>
> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1732
> [2]:
>
> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java?L383
>
> Thanks,
> Kamal
>
> On Thu, Oct 3, 2024 at 3:11 PM Luke Chen <show...@gmail.com> wrote:
>
> > Hi Kamal,
> >
> > Sorry for the late review.
> > Thanks for the KIP, this will improve the transaction reading for remote
> > storage.
> > Overall LGTM, just one minor thought:
> >
> > Currently, we only store the `TxnIndexEmpty` bool value in the segment
> > metadata.
> > Do you think it is helpful if we store the "least abort start offset in
> the
> > segment", and -1 means no txnIndex. So that we can have a way to know if
> we
> > need to fetch this txn index or not.
> >
> > Thanks.
> > Luke
> >
> > On Mon, Sep 9, 2024 at 3:26 PM Kamal Chandraprakash <
> > kamal.chandraprak...@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > If there are no more comments, I'll start a voting thread soon.
> > >
> > > Thanks,
> > > Kamal
> > >
> > > On Fri, Sep 6, 2024 at 7:28 PM Kamal Chandraprakash <
> > > kamal.chandraprak...@gmail.com> wrote:
> > >
> > > > Bumping this thread again for review!
> > > >
> > > > Reduced the scope of the proposal to minimum. We will be adding only
> > one
> > > > field (txnIdxEmpty) to the
> > > > RemoteLogSegmentMetadata event which is backward compatible. PTAL.
> > > >
> > > > Thanks,
> > > > Kamal
> > > >
> > > >
> > > > On Tue, Aug 13, 2024 at 6:33 PM Kamal Chandraprakash <
> > > > kamal.chandraprak...@gmail.com> wrote:
> > > >
> > > >> Bumping this thread for KIP review!
> > > >>
> > > >> We can go for the simplest solution that is proposed in this KIP and
> > > >> it can be improved in the subsequent iteration. PTAL.
> > > >>
> > > >> Thanks,
> > > >> Kamal
> > > >>
> > > >> On Fri, Aug 2, 2024 at 11:42 AM Kamal Chandraprakash <
> > > >> kamal.chandraprak...@gmail.com> wrote:
> > > >>
> > > >>> Hi Divij,
> > > >>>
> > > >>> Thanks for the review! And, sorry for the late reply.
> > > >>>
> > > >>> From the UnifiedLog.scala
> > > >>> <
> > >
> >
> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L421-427
> > > >
> > > >>> doc:
> > > >>>
> > > >>> """
> > > >>> The last stable offset (LSO) is defined as the first offset such
> that
> > > >>> all lower offsets have been "decided."
> > > >>>    * Non-transactional messages are considered decided immediately,
> > but
> > > >>> transactional messages are only decided when
> > > >>>    * the corresponding COMMIT or ABORT marker is written. This
> > implies
> > > >>> that the last stable offset will be equal
> > > >>>    * to the high watermark if there are no transactional messages
> in
> > > the
> > > >>> log. Note also that the LSO cannot advance
> > > >>>    * beyond the high watermark.
> > > >>> """
> > > >>> While rolling the active segment to passive, if LSO equals to HW,
> > then
> > > >>> all the messages in that segment are
> > > >>> decided and we can store the `lastStableOffsetLag` as an attribute
> in
> > > >>> the rolled segment. We can then propagate
> > > >>> the `lastStableOffsetLag` information in the RemoteLogMetadata
> > events.
> > > >>>
> > > >>> While reading the remote log segment, if the `lastStableOffsetLag`
> is
> > > 0,
> > > >>> then there is no need to traverse to
> > > >>> the subsequent segments for aborted transactions which covers the
> > case
> > > >>> for the dominant case where the
> > > >>> partition had no transactions at all.
> > > >>>
> > > >>> With Log compaction, the shrinked segments might get merged. One
> > option
> > > >>> is to take the max of `lastStableOffsetLag`
> > > >>> and store it in the new LogSegment. Since, the tiered storage does
> > not
> > > >>> support compacted topics / historical compacted
> > > >>> topics, we can omit this case.
> > > >>>
> > > >>> If this approach looks good, I can update the KIP with the details.
> > > >>>
> > > >>> --
> > > >>> Kamal
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Tue, Jun 25, 2024 at 4:24 PM Divij Vaidya <
> > divijvaidy...@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>>> Hi Kamal
> > > >>>>
> > > >>>> Thanks for the bump. I have been thinking about this passively for
> > the
> > > >>>> past
> > > >>>> few days.
> > > >>>>
> > > >>>> The simplest solution is to store a state at segment level
> metadata.
> > > The
> > > >>>> state should specify whether the trx index is empty or not. It
> would
> > > be
> > > >>>> populated during segment archival. We would then iterate over the
> > > >>>> metadata
> > > >>>> for future segments without having to make a remote call to
> download
> > > the
> > > >>>> trx index itself.
> > > >>>>
> > > >>>> The other solution for storing state at a partition level wouldn't
> > > >>>> work, as
> > > >>>> you mentioned, because we will have to change the state on every
> > > >>>> mutation
> > > >>>> to the log i.e. at expiration of segments and append.
> > > >>>>
> > > >>>> I have been thinking whether we can do something better than the
> > > simple
> > > >>>> solution, hence the delay in replying. Let me tell you my half
> baked
> > > >>>> train
> > > >>>> of thoughts, perhaps, you can explore this as well. I have been
> > > thinking
> > > >>>> about using LSO (last stable offset) to handle the case when the
> > > >>>> partition
> > > >>>> never had any transactions. For a partition which never had any
> > > >>>> transaction, I would assume that the LSO is never initialized (or
> is
> > > >>>> equal
> > > >>>> to log start offset)? Or is it equal to HW in that case? This is
> > > >>>> something
> > > >>>> that I am yet to verify. If this idea works, then we would not
> have
> > to
> > > >>>> iterate through the metadata for the dominant case where the
> > partition
> > > >>>> had
> > > >>>> no transactions at all.
> > > >>>>
> > > >>>> --
> > > >>>> Divij Vaidya
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On Tue, Jun 25, 2024 at 11:42 AM Kamal Chandraprakash <
> > > >>>> kamal.chandraprak...@gmail.com> wrote:
> > > >>>>
> > > >>>> > Bump. Please review this proposal.
> > > >>>> >
> > > >>>> >
> > > >>>> > On Mon, Jun 17, 2024 at 6:55 PM Kamal Chandraprakash <
> > > >>>> > kamal.chandraprak...@gmail.com> wrote:
> > > >>>> >
> > > >>>> > > Divij,
> > > >>>> > >
> > > >>>> > > Thanks for the review! Updated the KIP with 1, 2, 3, and 4
> > review
> > > >>>> > > comments.
> > > >>>> > >
> > > >>>> > > > 4. Potential alternative - Instead of having an algorithm
> > where
> > > we
> > > >>>> > > traverse
> > > >>>> > > across segment metadata and looking for isTxnIdxEmpty flag,
> > should
> > > >>>> we
> > > >>>> > > directly introduce a nextSegmentWithTrxInx() function? This
> > would
> > > >>>> allow
> > > >>>> > > implementers to optimize the otherwise linear scan across
> > metadata
> > > >>>> for
> > > >>>> > all
> > > >>>> > > segments by using techniques such as skip list etc.
> > > >>>> > >
> > > >>>> > > This is a good point to optimize the scan. We need to maintain
> > the
> > > >>>> > > skip-list
> > > >>>> > > for each leader-epoch. With unclean leader election, some
> > brokers
> > > >>>> may not
> > > >>>> > > have
> > > >>>> > > the complete lineage. This will expand the scope of the work.
> > > >>>> > >
> > > >>>> > > In this version, we plan to optimize only for the below 2
> cases:
> > > >>>> > >
> > > >>>> > > 1. A partition does not have the transaction index for any of
> > the
> > > >>>> > uploaded
> > > >>>> > > segments.
> > > >>>> > >    The individual log segments `isTxnIdxEmpty` flag can be
> > reduced
> > > >>>> to a
> > > >>>> > > single flag
> > > >>>> > >    in RLMM (using AND operator) that can serve the query - "Is
> > all
> > > >>>> the
> > > >>>> > > transaction indexes empty for a partition?".
> > > >>>> > >    If yes, then we can directly scan the local-log for aborted
> > > >>>> > > transactions.
> > > >>>> > > 2. A partition is produced using the transactional producer.
> The
> > > >>>> > > assumption made is that
> > > >>>> > >     the transaction will either commit/rollback within 15
> > minutes
> > > >>>> > >     (default transaction.max.timeout.ms = 15 mins), possibly
> we
> > > >>>> may have
> > > >>>> > > to search only
> > > >>>> > >     a few consecutive remote log segments to collect the
> aborted
> > > >>>> > > transactions.
> > > >>>> > > 3. A partition is being produced with both normal and
> > > transactional
> > > >>>> > > producers. In this case,
> > > >>>> > >     we will be doing linear traversal. Maintaining a skip-list
> > > might
> > > >>>> > > improve the performance but
> > > >>>> > >     we delegate the RLMM implementation to users. If
> implemented
> > > >>>> > > incorrectly, then it can lead
> > > >>>> > >     to delivery of the aborted transaction records to the
> > > consumer.
> > > >>>> > >
> > > >>>> > > I notice two drawbacks with the reduction method as proposed
> in
> > > the
> > > >>>> KIP:
> > > >>>> > >
> > > >>>> > > 1. Even if one segment has a transaction index, then we have
> to
> > > >>>> iterate
> > > >>>> > > over all the metadata events.
> > > >>>> > > 2. Assume that there are 10 segments and segment-5 has a txn
> > > index.
> > > >>>> Once
> > > >>>> > > the first 6 segments are deleted,
> > > >>>> > >     due to breach by time/size/start-offset, then we should
> > return
> > > >>>> `true`
> > > >>>> > > for "Is all the transaction indexes empty for a partition?"
> > > >>>> > >    query but it will return `false` until the broker gets
> > > restarted
> > > >>>> and
> > > >>>> > we
> > > >>>> > > have to resort to iterate over all the metadata events.
> > > >>>> > >
> > > >>>> > > > 5. Potential alternative#2 - We know that we may want the
> > > indexes
> > > >>>> of
> > > >>>> > > multiple higher segments. Instead of fetching them
> sequentially,
> > > we
> > > >>>> could
> > > >>>> > > implement a parallel fetch or a pre-fetch for the indexes.
> This
> > > >>>> would
> > > >>>> > help
> > > >>>> > > hide the latency of sequentially fetching the trx indexes.
> > > >>>> > >
> > > >>>> > > We can implement parallel-fetch/prefetch once the tiered
> storage
> > > is
> > > >>>> GAed.
> > > >>>> > > Since this feature will be useful
> > > >>>> > > to prefetch the next remote log segment and it expands the
> scope
> > > of
> > > >>>> the
> > > >>>> > > work.
> > > >>>> > >
> > > >>>> > > > 6. Should the proposed API take "segmentId" as a parameter
> > > >>>> instead of
> > > >>>> > > "topicIdPartition"? Suggesting because isTxnIdEmpty is not a
> > > >>>> property of
> > > >>>> > a
> > > >>>> > > partition, instead it's a property of a specific segment.
> > > >>>> > >
> > > >>>> > > We propose to use the `topicIdPartition` in
> > > >>>> RemoteLogMetadataManager.
> > > >>>> > > The implementation can fold/reduce the value of the individual
> > log
> > > >>>> > segment
> > > >>>> > > `isTxnIdEmpty` flag. This is added to avoid scanning all the
> > > >>>> metadata
> > > >>>> > > events
> > > >>>> > > when the partition does not have a transaction index in any of
> > the
> > > >>>> > > segments.
> > > >>>> > >
> > > >>>> > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya <
> > > >>>> divijvaidy...@gmail.com>
> > > >>>> > > wrote:
> > > >>>> > >
> > > >>>> > >> Hi Kamal
> > > >>>> > >>
> > > >>>> > >> Thanks for bringing this up. This is a problem worth solving.
> > We
> > > >>>> have
> > > >>>> > >> faced
> > > >>>> > >> this in situations where some Kafka clients default to
> > > >>>> read_committed
> > > >>>> > mode
> > > >>>> > >> and end up having high latencies for remote fetches due to
> this
> > > >>>> > traversal
> > > >>>> > >> across all segments.
> > > >>>> > >>
> > > >>>> > >> First some nits to clarify the KIP:
> > > >>>> > >> 1. The motivation should make it clear that traversal of all
> > > >>>> segments is
> > > >>>> > >> only in the worst case. If I am not mistaken (please correct
> me
> > > if
> > > >>>> > wrong),
> > > >>>> > >> the traversal stops when it has found a segment containing
> LSO.
> > > >>>> > >> 2. There is nothing like a non-txn topic. A transaction may
> be
> > > >>>> started
> > > >>>> > on
> > > >>>> > >> any topic. Perhaps, rephrase the statement in the KIP so that
> > it
> > > is
> > > >>>> > clear
> > > >>>> > >> to the reader.
> > > >>>> > >> 3. The hyperlink in the "the broker has to traverse all
> the..."
> > > >>>> seems
> > > >>>> > >> incorrect. Did you want to point to
> > > >>>> > >>
> > > >>>> > >>
> > > >>>> >
> > > >>>>
> > >
> >
> https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444
> > > >>>> > >> ?
> > > >>>> > >> 4. In the testing section, could we add a test plan? For
> > > example, I
> > > >>>> > would
> > > >>>> > >> list down adding a test which would verify the number of
> calls
> > > >>>> made to
> > > >>>> > >> RLMM. This test would have a higher number of calls earlier
> vs.
> > > >>>> after
> > > >>>> > this
> > > >>>> > >> KIP.
> > > >>>> > >>
> > > >>>> > >> Other thoughts:
> > > >>>> > >> 4. Potential alternative - Instead of having an algorithm
> where
> > > we
> > > >>>> > >> traverse
> > > >>>> > >> across segment metadata and looking for isTxnIdxEmpty flag,
> > > should
> > > >>>> we
> > > >>>> > >> directly introduce a nextSegmentWithTrxInx() function? This
> > would
> > > >>>> allow
> > > >>>> > >> implementers to optimize the otherwise linear scan across
> > > metadata
> > > >>>> for
> > > >>>> > all
> > > >>>> > >> segments by using techniques such as skip list etc.
> > > >>>> > >> 5. Potential alternative#2 - We know that we may want the
> > indexes
> > > >>>> of
> > > >>>> > >> multiple higher segments. Instead of fetching them
> > sequentially,
> > > we
> > > >>>> > could
> > > >>>> > >> implement a parallel fetch or a pre-fetch for the indexes.
> This
> > > >>>> would
> > > >>>> > help
> > > >>>> > >> hide the latency of sequentially fetching the trx indexes.
> > > >>>> > >> 6. Should the proposed API take "segmentId" as a parameter
> > > instead
> > > >>>> of
> > > >>>> > >> "topicIdPartition"? Suggesting because isTxnIdEmpty is not a
> > > >>>> property
> > > >>>> > of a
> > > >>>> > >> partition, instead it's a property of a specific segment.
> > > >>>> > >>
> > > >>>> > >> Looking forward to hearing your thoughts about the
> > alternatives.
> > > >>>> Let's
> > > >>>> > get
> > > >>>> > >> this fixed.
> > > >>>> > >>
> > > >>>> > >> --
> > > >>>> > >> Divij Vaidya
> > > >>>> > >>
> > > >>>> > >>
> > > >>>> > >>
> > > >>>> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal Chandraprakash <
> > > >>>> > >> kamal.chandraprak...@gmail.com> wrote:
> > > >>>> > >>
> > > >>>> > >> > Hi all,
> > > >>>> > >> >
> > > >>>> > >> > I have opened a KIP-1058
> > > >>>> > >> > <
> > > >>>> > >> >
> > > >>>> > >>
> > > >>>> >
> > > >>>>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
> > > >>>> > >> > >
> > > >>>> > >> > to reduce the pressure on remote storage when transactional
> > > >>>> consumers
> > > >>>> > >> are
> > > >>>> > >> > reading non-txn topics from remote storage.
> > > >>>> > >> >
> > > >>>> > >> >
> > > >>>> > >> >
> > > >>>> > >>
> > > >>>> >
> > > >>>>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
> > > >>>> > >> >
> > > >>>> > >> > Feedbacks and suggestions are welcome.
> > > >>>> > >> >
> > > >>>> > >> > Thanks,
> > > >>>> > >> > Kamal
> > > >>>> > >> >
> > > >>>> > >>
> > > >>>> > >
> > > >>>> >
> > > >>>>
> > > >>>
> > >
> >
>

Reply via email to