Bump for review.

If the additional proposal looks good, I'll append them to the KIP. PTAL.

New API in RLMM#nextRemoteLogSegmentMetadataWithTxnIndex

--
Kamal

On Sun, Oct 6, 2024 at 7:20 PM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Hi Christo,
>
> Thanks for the review!
>
> Adding the new API `nextRemoteLogSegmentMetadataWithTxnIndex` in RLMM
> helps to
> reduce the complexity of linear search. With this API, we have to:
>
> 1. Maintain one more skip-list [1] for each of the epochs in the partition
> in RLMM that might
>     increase the memory usage of TopicBased RLMM implementation.
>     1a) The skip-list will be empty when there are no aborted txn entries
> for a partition/epoch which is the predominant case.
>     1b) The skip-list will act as a duplicate when *most* of the segments
> have aborted txn entries, assuming aborted txn are quite low, this should
> be fine.
> 2. Change the logic to retrieve the aborted txns (we have to query the
> nextRLSMWithTxnIndex
>     for each of the leader-epoch).
> 3. Logic divergence from how we retrieve the aborted txn entries compared
> to the local-log.
>
> The approach looks good to me. If everyone is aligned, then we can proceed
> to add this API to RLMM.
>
> Another option I was thinking of is to capture the `lastStableOffsetLag`
> [2] while rotating the segment.
> But, that is a bigger change we can take later.
>
> [1]:
> https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java?L43
> [2]:
> https://sourcegraph.com/github.com/apache/kafka/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L432
>
>
> Thanks,
> Kamal
>
> On Fri, Oct 4, 2024 at 4:21 PM Christo Lolov <christolo...@gmail.com>
> wrote:
>
>> Heya,
>>
>> Apologies for the delay. I have been thinking about this problem recently
>> as well and while I believe storing a boolean in the metadata is good, I
>> think we can do better by introducing a new method to the RLMM along the
>> lines of
>>
>> Optional<RemoteLogSegmentMetadata>
>> nextRemoteLogSegmentMetadataWithTxnIndex(TopicIdPartition
>> topicIdPartition,
>> int epochForOffset, long offset) throws RemoteStorageException
>>
>> This will help plugin implementers to build optimisations such as skip
>> lists which will give them the next segment quicker than a linear search.
>>
>> I am keen to hear your thoughts!
>>
>> Best,
>> Christo
>>
>> On Fri, 4 Oct 2024 at 10:48, Kamal Chandraprakash <
>> kamal.chandraprak...@gmail.com> wrote:
>>
>> > Hi Luke,
>> >
>> > Thanks for the review!
>> >
>> > > Do you think it is helpful if we store the "least abort start offset
>> in
>> > the
>> > segment", and -1 means no txnIndex. So that we can have a way to know
>> if we
>> > need to fetch this txn index or not.
>> >
>> > 1. No, this change won't have an effect. To find the upper-bound offset
>> > [1], we have to
>> >     fetch that segment's offset index file. The RemoteIndexCache [2]
>> > fetches all the 3
>> >     index files together and caches them for subsequent use, so this
>> > improvement
>> >     won't have an effect as the current segment txn index gets
>> downloaded
>> > anyway.
>> >
>> > 2. The reason for choosing boolean is to make the change backward
>> > compatible.
>> >      There can be existing RLM events for the uploaded segments. The
>> > default
>> >      value of `txnIdxEmpty` is false so the *old* RLM events are
>> assumed to
>> > contain
>> >      the txn index files and those files are downloaded if they exist.
>> >
>> > [1]:
>> >
>> >
>> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1732
>> > [2]:
>> >
>> >
>> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java?L383
>> >
>> > Thanks,
>> > Kamal
>> >
>> > On Thu, Oct 3, 2024 at 3:11 PM Luke Chen <show...@gmail.com> wrote:
>> >
>> > > Hi Kamal,
>> > >
>> > > Sorry for the late review.
>> > > Thanks for the KIP, this will improve the transaction reading for
>> remote
>> > > storage.
>> > > Overall LGTM, just one minor thought:
>> > >
>> > > Currently, we only store the `TxnIndexEmpty` bool value in the segment
>> > > metadata.
>> > > Do you think it is helpful if we store the "least abort start offset
>> in
>> > the
>> > > segment", and -1 means no txnIndex. So that we can have a way to know
>> if
>> > we
>> > > need to fetch this txn index or not.
>> > >
>> > > Thanks.
>> > > Luke
>> > >
>> > > On Mon, Sep 9, 2024 at 3:26 PM Kamal Chandraprakash <
>> > > kamal.chandraprak...@gmail.com> wrote:
>> > >
>> > > > Hi all,
>> > > >
>> > > > If there are no more comments, I'll start a voting thread soon.
>> > > >
>> > > > Thanks,
>> > > > Kamal
>> > > >
>> > > > On Fri, Sep 6, 2024 at 7:28 PM Kamal Chandraprakash <
>> > > > kamal.chandraprak...@gmail.com> wrote:
>> > > >
>> > > > > Bumping this thread again for review!
>> > > > >
>> > > > > Reduced the scope of the proposal to minimum. We will be adding
>> only
>> > > one
>> > > > > field (txnIdxEmpty) to the
>> > > > > RemoteLogSegmentMetadata event which is backward compatible. PTAL.
>> > > > >
>> > > > > Thanks,
>> > > > > Kamal
>> > > > >
>> > > > >
>> > > > > On Tue, Aug 13, 2024 at 6:33 PM Kamal Chandraprakash <
>> > > > > kamal.chandraprak...@gmail.com> wrote:
>> > > > >
>> > > > >> Bumping this thread for KIP review!
>> > > > >>
>> > > > >> We can go for the simplest solution that is proposed in this KIP
>> and
>> > > > >> it can be improved in the subsequent iteration. PTAL.
>> > > > >>
>> > > > >> Thanks,
>> > > > >> Kamal
>> > > > >>
>> > > > >> On Fri, Aug 2, 2024 at 11:42 AM Kamal Chandraprakash <
>> > > > >> kamal.chandraprak...@gmail.com> wrote:
>> > > > >>
>> > > > >>> Hi Divij,
>> > > > >>>
>> > > > >>> Thanks for the review! And, sorry for the late reply.
>> > > > >>>
>> > > > >>> From the UnifiedLog.scala
>> > > > >>> <
>> > > >
>> > >
>> >
>> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L421-427
>> > > > >
>> > > > >>> doc:
>> > > > >>>
>> > > > >>> """
>> > > > >>> The last stable offset (LSO) is defined as the first offset such
>> > that
>> > > > >>> all lower offsets have been "decided."
>> > > > >>>    * Non-transactional messages are considered decided
>> immediately,
>> > > but
>> > > > >>> transactional messages are only decided when
>> > > > >>>    * the corresponding COMMIT or ABORT marker is written. This
>> > > implies
>> > > > >>> that the last stable offset will be equal
>> > > > >>>    * to the high watermark if there are no transactional
>> messages
>> > in
>> > > > the
>> > > > >>> log. Note also that the LSO cannot advance
>> > > > >>>    * beyond the high watermark.
>> > > > >>> """
>> > > > >>> While rolling the active segment to passive, if LSO equals to
>> HW,
>> > > then
>> > > > >>> all the messages in that segment are
>> > > > >>> decided and we can store the `lastStableOffsetLag` as an
>> attribute
>> > in
>> > > > >>> the rolled segment. We can then propagate
>> > > > >>> the `lastStableOffsetLag` information in the RemoteLogMetadata
>> > > events.
>> > > > >>>
>> > > > >>> While reading the remote log segment, if the
>> `lastStableOffsetLag`
>> > is
>> > > > 0,
>> > > > >>> then there is no need to traverse to
>> > > > >>> the subsequent segments for aborted transactions which covers
>> the
>> > > case
>> > > > >>> for the dominant case where the
>> > > > >>> partition had no transactions at all.
>> > > > >>>
>> > > > >>> With Log compaction, the shrinked segments might get merged. One
>> > > option
>> > > > >>> is to take the max of `lastStableOffsetLag`
>> > > > >>> and store it in the new LogSegment. Since, the tiered storage
>> does
>> > > not
>> > > > >>> support compacted topics / historical compacted
>> > > > >>> topics, we can omit this case.
>> > > > >>>
>> > > > >>> If this approach looks good, I can update the KIP with the
>> details.
>> > > > >>>
>> > > > >>> --
>> > > > >>> Kamal
>> > > > >>>
>> > > > >>>
>> > > > >>>
>> > > > >>>
>> > > > >>> On Tue, Jun 25, 2024 at 4:24 PM Divij Vaidya <
>> > > divijvaidy...@gmail.com>
>> > > > >>> wrote:
>> > > > >>>
>> > > > >>>> Hi Kamal
>> > > > >>>>
>> > > > >>>> Thanks for the bump. I have been thinking about this passively
>> for
>> > > the
>> > > > >>>> past
>> > > > >>>> few days.
>> > > > >>>>
>> > > > >>>> The simplest solution is to store a state at segment level
>> > metadata.
>> > > > The
>> > > > >>>> state should specify whether the trx index is empty or not. It
>> > would
>> > > > be
>> > > > >>>> populated during segment archival. We would then iterate over
>> the
>> > > > >>>> metadata
>> > > > >>>> for future segments without having to make a remote call to
>> > download
>> > > > the
>> > > > >>>> trx index itself.
>> > > > >>>>
>> > > > >>>> The other solution for storing state at a partition level
>> wouldn't
>> > > > >>>> work, as
>> > > > >>>> you mentioned, because we will have to change the state on
>> every
>> > > > >>>> mutation
>> > > > >>>> to the log i.e. at expiration of segments and append.
>> > > > >>>>
>> > > > >>>> I have been thinking whether we can do something better than
>> the
>> > > > simple
>> > > > >>>> solution, hence the delay in replying. Let me tell you my half
>> > baked
>> > > > >>>> train
>> > > > >>>> of thoughts, perhaps, you can explore this as well. I have been
>> > > > thinking
>> > > > >>>> about using LSO (last stable offset) to handle the case when
>> the
>> > > > >>>> partition
>> > > > >>>> never had any transactions. For a partition which never had any
>> > > > >>>> transaction, I would assume that the LSO is never initialized
>> (or
>> > is
>> > > > >>>> equal
>> > > > >>>> to log start offset)? Or is it equal to HW in that case? This
>> is
>> > > > >>>> something
>> > > > >>>> that I am yet to verify. If this idea works, then we would not
>> > have
>> > > to
>> > > > >>>> iterate through the metadata for the dominant case where the
>> > > partition
>> > > > >>>> had
>> > > > >>>> no transactions at all.
>> > > > >>>>
>> > > > >>>> --
>> > > > >>>> Divij Vaidya
>> > > > >>>>
>> > > > >>>>
>> > > > >>>>
>> > > > >>>> On Tue, Jun 25, 2024 at 11:42 AM Kamal Chandraprakash <
>> > > > >>>> kamal.chandraprak...@gmail.com> wrote:
>> > > > >>>>
>> > > > >>>> > Bump. Please review this proposal.
>> > > > >>>> >
>> > > > >>>> >
>> > > > >>>> > On Mon, Jun 17, 2024 at 6:55 PM Kamal Chandraprakash <
>> > > > >>>> > kamal.chandraprak...@gmail.com> wrote:
>> > > > >>>> >
>> > > > >>>> > > Divij,
>> > > > >>>> > >
>> > > > >>>> > > Thanks for the review! Updated the KIP with 1, 2, 3, and 4
>> > > review
>> > > > >>>> > > comments.
>> > > > >>>> > >
>> > > > >>>> > > > 4. Potential alternative - Instead of having an algorithm
>> > > where
>> > > > we
>> > > > >>>> > > traverse
>> > > > >>>> > > across segment metadata and looking for isTxnIdxEmpty flag,
>> > > should
>> > > > >>>> we
>> > > > >>>> > > directly introduce a nextSegmentWithTrxInx() function? This
>> > > would
>> > > > >>>> allow
>> > > > >>>> > > implementers to optimize the otherwise linear scan across
>> > > metadata
>> > > > >>>> for
>> > > > >>>> > all
>> > > > >>>> > > segments by using techniques such as skip list etc.
>> > > > >>>> > >
>> > > > >>>> > > This is a good point to optimize the scan. We need to
>> maintain
>> > > the
>> > > > >>>> > > skip-list
>> > > > >>>> > > for each leader-epoch. With unclean leader election, some
>> > > brokers
>> > > > >>>> may not
>> > > > >>>> > > have
>> > > > >>>> > > the complete lineage. This will expand the scope of the
>> work.
>> > > > >>>> > >
>> > > > >>>> > > In this version, we plan to optimize only for the below 2
>> > cases:
>> > > > >>>> > >
>> > > > >>>> > > 1. A partition does not have the transaction index for any
>> of
>> > > the
>> > > > >>>> > uploaded
>> > > > >>>> > > segments.
>> > > > >>>> > >    The individual log segments `isTxnIdxEmpty` flag can be
>> > > reduced
>> > > > >>>> to a
>> > > > >>>> > > single flag
>> > > > >>>> > >    in RLMM (using AND operator) that can serve the query -
>> "Is
>> > > all
>> > > > >>>> the
>> > > > >>>> > > transaction indexes empty for a partition?".
>> > > > >>>> > >    If yes, then we can directly scan the local-log for
>> aborted
>> > > > >>>> > > transactions.
>> > > > >>>> > > 2. A partition is produced using the transactional
>> producer.
>> > The
>> > > > >>>> > > assumption made is that
>> > > > >>>> > >     the transaction will either commit/rollback within 15
>> > > minutes
>> > > > >>>> > >     (default transaction.max.timeout.ms = 15 mins),
>> possibly
>> > we
>> > > > >>>> may have
>> > > > >>>> > > to search only
>> > > > >>>> > >     a few consecutive remote log segments to collect the
>> > aborted
>> > > > >>>> > > transactions.
>> > > > >>>> > > 3. A partition is being produced with both normal and
>> > > > transactional
>> > > > >>>> > > producers. In this case,
>> > > > >>>> > >     we will be doing linear traversal. Maintaining a
>> skip-list
>> > > > might
>> > > > >>>> > > improve the performance but
>> > > > >>>> > >     we delegate the RLMM implementation to users. If
>> > implemented
>> > > > >>>> > > incorrectly, then it can lead
>> > > > >>>> > >     to delivery of the aborted transaction records to the
>> > > > consumer.
>> > > > >>>> > >
>> > > > >>>> > > I notice two drawbacks with the reduction method as
>> proposed
>> > in
>> > > > the
>> > > > >>>> KIP:
>> > > > >>>> > >
>> > > > >>>> > > 1. Even if one segment has a transaction index, then we
>> have
>> > to
>> > > > >>>> iterate
>> > > > >>>> > > over all the metadata events.
>> > > > >>>> > > 2. Assume that there are 10 segments and segment-5 has a
>> txn
>> > > > index.
>> > > > >>>> Once
>> > > > >>>> > > the first 6 segments are deleted,
>> > > > >>>> > >     due to breach by time/size/start-offset, then we should
>> > > return
>> > > > >>>> `true`
>> > > > >>>> > > for "Is all the transaction indexes empty for a partition?"
>> > > > >>>> > >    query but it will return `false` until the broker gets
>> > > > restarted
>> > > > >>>> and
>> > > > >>>> > we
>> > > > >>>> > > have to resort to iterate over all the metadata events.
>> > > > >>>> > >
>> > > > >>>> > > > 5. Potential alternative#2 - We know that we may want the
>> > > > indexes
>> > > > >>>> of
>> > > > >>>> > > multiple higher segments. Instead of fetching them
>> > sequentially,
>> > > > we
>> > > > >>>> could
>> > > > >>>> > > implement a parallel fetch or a pre-fetch for the indexes.
>> > This
>> > > > >>>> would
>> > > > >>>> > help
>> > > > >>>> > > hide the latency of sequentially fetching the trx indexes.
>> > > > >>>> > >
>> > > > >>>> > > We can implement parallel-fetch/prefetch once the tiered
>> > storage
>> > > > is
>> > > > >>>> GAed.
>> > > > >>>> > > Since this feature will be useful
>> > > > >>>> > > to prefetch the next remote log segment and it expands the
>> > scope
>> > > > of
>> > > > >>>> the
>> > > > >>>> > > work.
>> > > > >>>> > >
>> > > > >>>> > > > 6. Should the proposed API take "segmentId" as a
>> parameter
>> > > > >>>> instead of
>> > > > >>>> > > "topicIdPartition"? Suggesting because isTxnIdEmpty is not
>> a
>> > > > >>>> property of
>> > > > >>>> > a
>> > > > >>>> > > partition, instead it's a property of a specific segment.
>> > > > >>>> > >
>> > > > >>>> > > We propose to use the `topicIdPartition` in
>> > > > >>>> RemoteLogMetadataManager.
>> > > > >>>> > > The implementation can fold/reduce the value of the
>> individual
>> > > log
>> > > > >>>> > segment
>> > > > >>>> > > `isTxnIdEmpty` flag. This is added to avoid scanning all
>> the
>> > > > >>>> metadata
>> > > > >>>> > > events
>> > > > >>>> > > when the partition does not have a transaction index in
>> any of
>> > > the
>> > > > >>>> > > segments.
>> > > > >>>> > >
>> > > > >>>> > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya <
>> > > > >>>> divijvaidy...@gmail.com>
>> > > > >>>> > > wrote:
>> > > > >>>> > >
>> > > > >>>> > >> Hi Kamal
>> > > > >>>> > >>
>> > > > >>>> > >> Thanks for bringing this up. This is a problem worth
>> solving.
>> > > We
>> > > > >>>> have
>> > > > >>>> > >> faced
>> > > > >>>> > >> this in situations where some Kafka clients default to
>> > > > >>>> read_committed
>> > > > >>>> > mode
>> > > > >>>> > >> and end up having high latencies for remote fetches due to
>> > this
>> > > > >>>> > traversal
>> > > > >>>> > >> across all segments.
>> > > > >>>> > >>
>> > > > >>>> > >> First some nits to clarify the KIP:
>> > > > >>>> > >> 1. The motivation should make it clear that traversal of
>> all
>> > > > >>>> segments is
>> > > > >>>> > >> only in the worst case. If I am not mistaken (please
>> correct
>> > me
>> > > > if
>> > > > >>>> > wrong),
>> > > > >>>> > >> the traversal stops when it has found a segment containing
>> > LSO.
>> > > > >>>> > >> 2. There is nothing like a non-txn topic. A transaction
>> may
>> > be
>> > > > >>>> started
>> > > > >>>> > on
>> > > > >>>> > >> any topic. Perhaps, rephrase the statement in the KIP so
>> that
>> > > it
>> > > > is
>> > > > >>>> > clear
>> > > > >>>> > >> to the reader.
>> > > > >>>> > >> 3. The hyperlink in the "the broker has to traverse all
>> > the..."
>> > > > >>>> seems
>> > > > >>>> > >> incorrect. Did you want to point to
>> > > > >>>> > >>
>> > > > >>>> > >>
>> > > > >>>> >
>> > > > >>>>
>> > > >
>> > >
>> >
>> https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444
>> > > > >>>> > >> ?
>> > > > >>>> > >> 4. In the testing section, could we add a test plan? For
>> > > > example, I
>> > > > >>>> > would
>> > > > >>>> > >> list down adding a test which would verify the number of
>> > calls
>> > > > >>>> made to
>> > > > >>>> > >> RLMM. This test would have a higher number of calls
>> earlier
>> > vs.
>> > > > >>>> after
>> > > > >>>> > this
>> > > > >>>> > >> KIP.
>> > > > >>>> > >>
>> > > > >>>> > >> Other thoughts:
>> > > > >>>> > >> 4. Potential alternative - Instead of having an algorithm
>> > where
>> > > > we
>> > > > >>>> > >> traverse
>> > > > >>>> > >> across segment metadata and looking for isTxnIdxEmpty
>> flag,
>> > > > should
>> > > > >>>> we
>> > > > >>>> > >> directly introduce a nextSegmentWithTrxInx() function?
>> This
>> > > would
>> > > > >>>> allow
>> > > > >>>> > >> implementers to optimize the otherwise linear scan across
>> > > > metadata
>> > > > >>>> for
>> > > > >>>> > all
>> > > > >>>> > >> segments by using techniques such as skip list etc.
>> > > > >>>> > >> 5. Potential alternative#2 - We know that we may want the
>> > > indexes
>> > > > >>>> of
>> > > > >>>> > >> multiple higher segments. Instead of fetching them
>> > > sequentially,
>> > > > we
>> > > > >>>> > could
>> > > > >>>> > >> implement a parallel fetch or a pre-fetch for the indexes.
>> > This
>> > > > >>>> would
>> > > > >>>> > help
>> > > > >>>> > >> hide the latency of sequentially fetching the trx indexes.
>> > > > >>>> > >> 6. Should the proposed API take "segmentId" as a parameter
>> > > > instead
>> > > > >>>> of
>> > > > >>>> > >> "topicIdPartition"? Suggesting because isTxnIdEmpty is
>> not a
>> > > > >>>> property
>> > > > >>>> > of a
>> > > > >>>> > >> partition, instead it's a property of a specific segment.
>> > > > >>>> > >>
>> > > > >>>> > >> Looking forward to hearing your thoughts about the
>> > > alternatives.
>> > > > >>>> Let's
>> > > > >>>> > get
>> > > > >>>> > >> this fixed.
>> > > > >>>> > >>
>> > > > >>>> > >> --
>> > > > >>>> > >> Divij Vaidya
>> > > > >>>> > >>
>> > > > >>>> > >>
>> > > > >>>> > >>
>> > > > >>>> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal Chandraprakash <
>> > > > >>>> > >> kamal.chandraprak...@gmail.com> wrote:
>> > > > >>>> > >>
>> > > > >>>> > >> > Hi all,
>> > > > >>>> > >> >
>> > > > >>>> > >> > I have opened a KIP-1058
>> > > > >>>> > >> > <
>> > > > >>>> > >> >
>> > > > >>>> > >>
>> > > > >>>> >
>> > > > >>>>
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
>> > > > >>>> > >> > >
>> > > > >>>> > >> > to reduce the pressure on remote storage when
>> transactional
>> > > > >>>> consumers
>> > > > >>>> > >> are
>> > > > >>>> > >> > reading non-txn topics from remote storage.
>> > > > >>>> > >> >
>> > > > >>>> > >> >
>> > > > >>>> > >> >
>> > > > >>>> > >>
>> > > > >>>> >
>> > > > >>>>
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
>> > > > >>>> > >> >
>> > > > >>>> > >> > Feedbacks and suggestions are welcome.
>> > > > >>>> > >> >
>> > > > >>>> > >> > Thanks,
>> > > > >>>> > >> > Kamal
>> > > > >>>> > >> >
>> > > > >>>> > >>
>> > > > >>>> > >
>> > > > >>>> >
>> > > > >>>>
>> > > > >>>
>> > > >
>> > >
>> >
>>
>

Reply via email to