Hi all,

If the review comments are addressed and KIP looks good, please submit your
vote on the voting thread.

Thanks,
Kamal

On Sat, Nov 2, 2024 at 7:52 PM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> I've opened a PR which adds an integration test
> <https://github.com/apache/kafka/pull/17668/files#diff-a60b518846fc0f770164d55b6d7cd31e03a002514377578c80c6e22cc120af40R88>
> to show the impact of this change/KIP. PTAL.
>
> --
> Kamal
>
> On Fri, Nov 1, 2024 at 3:31 PM Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
>> Thanks for the review!
>>
>> > 5
>> Updated the KIP to include the `isEmpty` method in the transaction index
>>
>> > 6
>> You're right. The offset parameter will be equal to the
>> next-segment-to-consider base offset.
>> But, the API introduced in RLMM is for *one* epoch. The next epoch
>> start-offset may not be
>> the base-offset. You can refer to this line
>> <https://github.com/apache/kafka/pull/17659/files#diff-380e4d8859ea9148f21794c09039425c82d9012a392c2dbbe1ce2ec8677a1970R1857>
>> in the draft PR.
>>
>>
>> On Fri, Nov 1, 2024 at 3:16 PM Divij Vaidya <divijvaidy...@gmail.com>
>> wrote:
>>
>>> Last few things -
>>>
>>> # 5
>>> About setting the TrxIndexEmpty field, could we introduce an isEmpty()
>>> function in TransactionIndex which has the following implementation:
>>>
>>> public boolean isEmpty() {
>>>     return !iterable().iterator().hasNext();
>>> }
>>>
>>>
>>> The advantages of this approach is:
>>> 1. It works for both cases when the file is not present and also when the
>>> file is present but is empty.
>>> 2. It prevents leaking the underlying implementation of TransactionIndex
>>> outside via the file() method. I think that making file() as public is an
>>> implementation leak (for example, what is the trx indx is not file
>>> based!).
>>>
>>>
>>> #6
>>> In the documentation for nextSegmentWithTxnIndex, the offset parameter
>>> should be equal to the next-segment-to-consider's base offset, no?
>>> I assume that we will add a new fetch here with nextSegmentBaseOffset
>>>
>>> https://github.com/apache/kafka/blob/346fdbafc539bc48bb66eedae89a15e240007fd9/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1801
>>> . Is there a case where the parameter "offset" will not be equal to the
>>> baseOffset of a segment?
>>>
>>> --
>>> Divij Vaidya
>>>
>>>
>>>
>>> On Fri, Nov 1, 2024 at 10:26 AM Kamal Chandraprakash <
>>> kamal.chandraprak...@gmail.com> wrote:
>>>
>>> > Hi Divij,
>>> >
>>> > Thanks for the detailed review!
>>> >
>>> > > 1, 2, 3, 4
>>> > Updated the KIP-1058
>>> > <
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions
>>> > >
>>> > with the feedback received and also opened a draft PR for #17659
>>> > <https://github.com/apache/kafka/pull/17659/files> reference.
>>> > PTAL.
>>> >
>>> > > 5. How are we determining the value of the TrxIndexEmpty field on
>>> segment
>>> > rotation?
>>> > Transaction index file is optional, the file does not exists when
>>> there are
>>> > no aborted txn entries for a
>>> > segment, we will be using the file null check. Also, updated it in the
>>> KIP.
>>> >
>>> > Thanks,
>>> > Kamal
>>> >
>>> >
>>> > On Tue, Oct 29, 2024 at 8:47 PM Divij Vaidya <divijvaidy...@gmail.com>
>>> > wrote:
>>> >
>>> > > A few more points to discuss (please add to the KIP as well)
>>> > >
>>> > > 5. How are we determining the value of the TrxIndexEmpty field on
>>> segment
>>> > > rotation?
>>> > >
>>> > > One option is to do a boolean txnIdxEmpty =
>>> > > segment.txnIndex().allAbortedTxns().isEmpty() but this will have an
>>> > > overhead of reading the contents of the file and storing them in
>>> memory,
>>> > > when we have a non-empty index.
>>> > > The other option (preferred) is to add an isEmpty() public method to
>>> the
>>> > > TransactionIndex and perform a segment.txnIndex().isEmpty() check
>>> which
>>> > > will internally use Files.size() java API.
>>> > >
>>> > > On Tue, Oct 29, 2024 at 1:21 PM Divij Vaidya <
>>> divijvaidy...@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Let's get the ball rolling (again) on this one.
>>> > > >
>>> > > > Kamal, could you please add the following to the KIP:
>>> > > > 1. the API as discussed above. Please add the failure modes for
>>> this
>>> > API
>>> > > > as well such as the exceptions thrown and a recommendation on how a
>>> > > caller
>>> > > > is expected to handle those. I am assuming that the three
>>> parameters
>>> > for
>>> > > > this API will be topicPartition, epoch and offset.
>>> > > > 2. implementation details for Topic based RLMM. I am assuming that
>>> the
>>> > > > plugin will default the field to false if this field is absent
>>> (case of
>>> > > old
>>> > > > metadata).
>>> > > > 3. In the test plan section, additionally, we need to assert that
>>> we
>>> > > don't
>>> > > > read metadata for all segments (i.e. it is not a linear search)
>>> from
>>> > the
>>> > > > Topic based RLMM.
>>> > > > 4. in the compatibility section, please document how the existing
>>> > > clusters
>>> > > > with Tiered Storage metadata will work during/after a rolling
>>> upgrade
>>> > to
>>> > > a
>>> > > > version which contains this new change.
>>> > > >
>>> > > > --
>>> > > > Divij Vaidya
>>> > > >
>>> > > >
>>> > > >
>>> > > > On Fri, Oct 11, 2024 at 12:26 PM Kamal Chandraprakash <
>>> > > > kamal.chandraprak...@gmail.com> wrote:
>>> > > >
>>> > > >> Bump for review.
>>> > > >>
>>> > > >> If the additional proposal looks good, I'll append them to the
>>> KIP.
>>> > > PTAL.
>>> > > >>
>>> > > >> New API in RLMM#nextRemoteLogSegmentMetadataWithTxnIndex
>>> > > >>
>>> > > >> --
>>> > > >> Kamal
>>> > > >>
>>> > > >> On Sun, Oct 6, 2024 at 7:20 PM Kamal Chandraprakash <
>>> > > >> kamal.chandraprak...@gmail.com> wrote:
>>> > > >>
>>> > > >> > Hi Christo,
>>> > > >> >
>>> > > >> > Thanks for the review!
>>> > > >> >
>>> > > >> > Adding the new API `nextRemoteLogSegmentMetadataWithTxnIndex` in
>>> > RLMM
>>> > > >> > helps to
>>> > > >> > reduce the complexity of linear search. With this API, we have
>>> to:
>>> > > >> >
>>> > > >> > 1. Maintain one more skip-list [1] for each of the epochs in the
>>> > > >> partition
>>> > > >> > in RLMM that might
>>> > > >> >     increase the memory usage of TopicBased RLMM implementation.
>>> > > >> >     1a) The skip-list will be empty when there are no aborted
>>> txn
>>> > > >> entries
>>> > > >> > for a partition/epoch which is the predominant case.
>>> > > >> >     1b) The skip-list will act as a duplicate when *most* of the
>>> > > >> segments
>>> > > >> > have aborted txn entries, assuming aborted txn are quite low,
>>> this
>>> > > >> should
>>> > > >> > be fine.
>>> > > >> > 2. Change the logic to retrieve the aborted txns (we have to
>>> query
>>> > the
>>> > > >> > nextRLSMWithTxnIndex
>>> > > >> >     for each of the leader-epoch).
>>> > > >> > 3. Logic divergence from how we retrieve the aborted txn entries
>>> > > >> compared
>>> > > >> > to the local-log.
>>> > > >> >
>>> > > >> > The approach looks good to me. If everyone is aligned, then we
>>> can
>>> > > >> proceed
>>> > > >> > to add this API to RLMM.
>>> > > >> >
>>> > > >> > Another option I was thinking of is to capture the
>>> > > `lastStableOffsetLag`
>>> > > >> > [2] while rotating the segment.
>>> > > >> > But, that is a bigger change we can take later.
>>> > > >> >
>>> > > >> > [1]:
>>> > > >> >
>>> > > >>
>>> > >
>>> >
>>> https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java?L43
>>> > > >> > [2]:
>>> > > >> >
>>> > > >>
>>> > >
>>> >
>>> https://sourcegraph.com/github.com/apache/kafka/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L432
>>> > > >> >
>>> > > >> >
>>> > > >> > Thanks,
>>> > > >> > Kamal
>>> > > >> >
>>> > > >> > On Fri, Oct 4, 2024 at 4:21 PM Christo Lolov <
>>> > christolo...@gmail.com>
>>> > > >> > wrote:
>>> > > >> >
>>> > > >> >> Heya,
>>> > > >> >>
>>> > > >> >> Apologies for the delay. I have been thinking about this
>>> problem
>>> > > >> recently
>>> > > >> >> as well and while I believe storing a boolean in the metadata
>>> is
>>> > > good,
>>> > > >> I
>>> > > >> >> think we can do better by introducing a new method to the RLMM
>>> > along
>>> > > >> the
>>> > > >> >> lines of
>>> > > >> >>
>>> > > >> >> Optional<RemoteLogSegmentMetadata>
>>> > > >> >> nextRemoteLogSegmentMetadataWithTxnIndex(TopicIdPartition
>>> > > >> >> topicIdPartition,
>>> > > >> >> int epochForOffset, long offset) throws RemoteStorageException
>>> > > >> >>
>>> > > >> >> This will help plugin implementers to build optimisations such
>>> as
>>> > > skip
>>> > > >> >> lists which will give them the next segment quicker than a
>>> linear
>>> > > >> search.
>>> > > >> >>
>>> > > >> >> I am keen to hear your thoughts!
>>> > > >> >>
>>> > > >> >> Best,
>>> > > >> >> Christo
>>> > > >> >>
>>> > > >> >> On Fri, 4 Oct 2024 at 10:48, Kamal Chandraprakash <
>>> > > >> >> kamal.chandraprak...@gmail.com> wrote:
>>> > > >> >>
>>> > > >> >> > Hi Luke,
>>> > > >> >> >
>>> > > >> >> > Thanks for the review!
>>> > > >> >> >
>>> > > >> >> > > Do you think it is helpful if we store the "least abort
>>> start
>>> > > >> offset
>>> > > >> >> in
>>> > > >> >> > the
>>> > > >> >> > segment", and -1 means no txnIndex. So that we can have a
>>> way to
>>> > > know
>>> > > >> >> if we
>>> > > >> >> > need to fetch this txn index or not.
>>> > > >> >> >
>>> > > >> >> > 1. No, this change won't have an effect. To find the
>>> upper-bound
>>> > > >> offset
>>> > > >> >> > [1], we have to
>>> > > >> >> >     fetch that segment's offset index file. The
>>> RemoteIndexCache
>>> > > [2]
>>> > > >> >> > fetches all the 3
>>> > > >> >> >     index files together and caches them for subsequent use,
>>> so
>>> > > this
>>> > > >> >> > improvement
>>> > > >> >> >     won't have an effect as the current segment txn index
>>> gets
>>> > > >> >> downloaded
>>> > > >> >> > anyway.
>>> > > >> >> >
>>> > > >> >> > 2. The reason for choosing boolean is to make the change
>>> backward
>>> > > >> >> > compatible.
>>> > > >> >> >      There can be existing RLM events for the uploaded
>>> segments.
>>> > > The
>>> > > >> >> > default
>>> > > >> >> >      value of `txnIdxEmpty` is false so the *old* RLM events
>>> are
>>> > > >> >> assumed to
>>> > > >> >> > contain
>>> > > >> >> >      the txn index files and those files are downloaded if
>>> they
>>> > > >> exist.
>>> > > >> >> >
>>> > > >> >> > [1]:
>>> > > >> >> >
>>> > > >> >> >
>>> > > >> >>
>>> > > >>
>>> > >
>>> >
>>> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1732
>>> > > >> >> > [2]:
>>> > > >> >> >
>>> > > >> >> >
>>> > > >> >>
>>> > > >>
>>> > >
>>> >
>>> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java?L383
>>> > > >> >> >
>>> > > >> >> > Thanks,
>>> > > >> >> > Kamal
>>> > > >> >> >
>>> > > >> >> > On Thu, Oct 3, 2024 at 3:11 PM Luke Chen <show...@gmail.com>
>>> > > wrote:
>>> > > >> >> >
>>> > > >> >> > > Hi Kamal,
>>> > > >> >> > >
>>> > > >> >> > > Sorry for the late review.
>>> > > >> >> > > Thanks for the KIP, this will improve the transaction
>>> reading
>>> > for
>>> > > >> >> remote
>>> > > >> >> > > storage.
>>> > > >> >> > > Overall LGTM, just one minor thought:
>>> > > >> >> > >
>>> > > >> >> > > Currently, we only store the `TxnIndexEmpty` bool value in
>>> the
>>> > > >> segment
>>> > > >> >> > > metadata.
>>> > > >> >> > > Do you think it is helpful if we store the "least abort
>>> start
>>> > > >> offset
>>> > > >> >> in
>>> > > >> >> > the
>>> > > >> >> > > segment", and -1 means no txnIndex. So that we can have a
>>> way
>>> > to
>>> > > >> know
>>> > > >> >> if
>>> > > >> >> > we
>>> > > >> >> > > need to fetch this txn index or not.
>>> > > >> >> > >
>>> > > >> >> > > Thanks.
>>> > > >> >> > > Luke
>>> > > >> >> > >
>>> > > >> >> > > On Mon, Sep 9, 2024 at 3:26 PM Kamal Chandraprakash <
>>> > > >> >> > > kamal.chandraprak...@gmail.com> wrote:
>>> > > >> >> > >
>>> > > >> >> > > > Hi all,
>>> > > >> >> > > >
>>> > > >> >> > > > If there are no more comments, I'll start a voting thread
>>> > soon.
>>> > > >> >> > > >
>>> > > >> >> > > > Thanks,
>>> > > >> >> > > > Kamal
>>> > > >> >> > > >
>>> > > >> >> > > > On Fri, Sep 6, 2024 at 7:28 PM Kamal Chandraprakash <
>>> > > >> >> > > > kamal.chandraprak...@gmail.com> wrote:
>>> > > >> >> > > >
>>> > > >> >> > > > > Bumping this thread again for review!
>>> > > >> >> > > > >
>>> > > >> >> > > > > Reduced the scope of the proposal to minimum. We will
>>> be
>>> > > adding
>>> > > >> >> only
>>> > > >> >> > > one
>>> > > >> >> > > > > field (txnIdxEmpty) to the
>>> > > >> >> > > > > RemoteLogSegmentMetadata event which is backward
>>> > compatible.
>>> > > >> PTAL.
>>> > > >> >> > > > >
>>> > > >> >> > > > > Thanks,
>>> > > >> >> > > > > Kamal
>>> > > >> >> > > > >
>>> > > >> >> > > > >
>>> > > >> >> > > > > On Tue, Aug 13, 2024 at 6:33 PM Kamal Chandraprakash <
>>> > > >> >> > > > > kamal.chandraprak...@gmail.com> wrote:
>>> > > >> >> > > > >
>>> > > >> >> > > > >> Bumping this thread for KIP review!
>>> > > >> >> > > > >>
>>> > > >> >> > > > >> We can go for the simplest solution that is proposed
>>> in
>>> > this
>>> > > >> KIP
>>> > > >> >> and
>>> > > >> >> > > > >> it can be improved in the subsequent iteration. PTAL.
>>> > > >> >> > > > >>
>>> > > >> >> > > > >> Thanks,
>>> > > >> >> > > > >> Kamal
>>> > > >> >> > > > >>
>>> > > >> >> > > > >> On Fri, Aug 2, 2024 at 11:42 AM Kamal Chandraprakash <
>>> > > >> >> > > > >> kamal.chandraprak...@gmail.com> wrote:
>>> > > >> >> > > > >>
>>> > > >> >> > > > >>> Hi Divij,
>>> > > >> >> > > > >>>
>>> > > >> >> > > > >>> Thanks for the review! And, sorry for the late reply.
>>> > > >> >> > > > >>>
>>> > > >> >> > > > >>> From the UnifiedLog.scala
>>> > > >> >> > > > >>> <
>>> > > >> >> > > >
>>> > > >> >> > >
>>> > > >> >> >
>>> > > >> >>
>>> > > >>
>>> > >
>>> >
>>> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L421-427
>>> > > >> >> > > > >
>>> > > >> >> > > > >>> doc:
>>> > > >> >> > > > >>>
>>> > > >> >> > > > >>> """
>>> > > >> >> > > > >>> The last stable offset (LSO) is defined as the first
>>> > offset
>>> > > >> such
>>> > > >> >> > that
>>> > > >> >> > > > >>> all lower offsets have been "decided."
>>> > > >> >> > > > >>>    * Non-transactional messages are considered
>>> decided
>>> > > >> >> immediately,
>>> > > >> >> > > but
>>> > > >> >> > > > >>> transactional messages are only decided when
>>> > > >> >> > > > >>>    * the corresponding COMMIT or ABORT marker is
>>> written.
>>> > > >> This
>>> > > >> >> > > implies
>>> > > >> >> > > > >>> that the last stable offset will be equal
>>> > > >> >> > > > >>>    * to the high watermark if there are no
>>> transactional
>>> > > >> >> messages
>>> > > >> >> > in
>>> > > >> >> > > > the
>>> > > >> >> > > > >>> log. Note also that the LSO cannot advance
>>> > > >> >> > > > >>>    * beyond the high watermark.
>>> > > >> >> > > > >>> """
>>> > > >> >> > > > >>> While rolling the active segment to passive, if LSO
>>> > equals
>>> > > to
>>> > > >> >> HW,
>>> > > >> >> > > then
>>> > > >> >> > > > >>> all the messages in that segment are
>>> > > >> >> > > > >>> decided and we can store the `lastStableOffsetLag`
>>> as an
>>> > > >> >> attribute
>>> > > >> >> > in
>>> > > >> >> > > > >>> the rolled segment. We can then propagate
>>> > > >> >> > > > >>> the `lastStableOffsetLag` information in the
>>> > > >> RemoteLogMetadata
>>> > > >> >> > > events.
>>> > > >> >> > > > >>>
>>> > > >> >> > > > >>> While reading the remote log segment, if the
>>> > > >> >> `lastStableOffsetLag`
>>> > > >> >> > is
>>> > > >> >> > > > 0,
>>> > > >> >> > > > >>> then there is no need to traverse to
>>> > > >> >> > > > >>> the subsequent segments for aborted transactions
>>> which
>>> > > covers
>>> > > >> >> the
>>> > > >> >> > > case
>>> > > >> >> > > > >>> for the dominant case where the
>>> > > >> >> > > > >>> partition had no transactions at all.
>>> > > >> >> > > > >>>
>>> > > >> >> > > > >>> With Log compaction, the shrinked segments might get
>>> > > merged.
>>> > > >> One
>>> > > >> >> > > option
>>> > > >> >> > > > >>> is to take the max of `lastStableOffsetLag`
>>> > > >> >> > > > >>> and store it in the new LogSegment. Since, the tiered
>>> > > storage
>>> > > >> >> does
>>> > > >> >> > > not
>>> > > >> >> > > > >>> support compacted topics / historical compacted
>>> > > >> >> > > > >>> topics, we can omit this case.
>>> > > >> >> > > > >>>
>>> > > >> >> > > > >>> If this approach looks good, I can update the KIP
>>> with
>>> > the
>>> > > >> >> details.
>>> > > >> >> > > > >>>
>>> > > >> >> > > > >>> --
>>> > > >> >> > > > >>> Kamal
>>> > > >> >> > > > >>>
>>> > > >> >> > > > >>>
>>> > > >> >> > > > >>>
>>> > > >> >> > > > >>>
>>> > > >> >> > > > >>> On Tue, Jun 25, 2024 at 4:24 PM Divij Vaidya <
>>> > > >> >> > > divijvaidy...@gmail.com>
>>> > > >> >> > > > >>> wrote:
>>> > > >> >> > > > >>>
>>> > > >> >> > > > >>>> Hi Kamal
>>> > > >> >> > > > >>>>
>>> > > >> >> > > > >>>> Thanks for the bump. I have been thinking about this
>>> > > >> passively
>>> > > >> >> for
>>> > > >> >> > > the
>>> > > >> >> > > > >>>> past
>>> > > >> >> > > > >>>> few days.
>>> > > >> >> > > > >>>>
>>> > > >> >> > > > >>>> The simplest solution is to store a state at segment
>>> > level
>>> > > >> >> > metadata.
>>> > > >> >> > > > The
>>> > > >> >> > > > >>>> state should specify whether the trx index is empty
>>> or
>>> > > not.
>>> > > >> It
>>> > > >> >> > would
>>> > > >> >> > > > be
>>> > > >> >> > > > >>>> populated during segment archival. We would then
>>> iterate
>>> > > >> over
>>> > > >> >> the
>>> > > >> >> > > > >>>> metadata
>>> > > >> >> > > > >>>> for future segments without having to make a remote
>>> call
>>> > > to
>>> > > >> >> > download
>>> > > >> >> > > > the
>>> > > >> >> > > > >>>> trx index itself.
>>> > > >> >> > > > >>>>
>>> > > >> >> > > > >>>> The other solution for storing state at a partition
>>> > level
>>> > > >> >> wouldn't
>>> > > >> >> > > > >>>> work, as
>>> > > >> >> > > > >>>> you mentioned, because we will have to change the
>>> state
>>> > on
>>> > > >> >> every
>>> > > >> >> > > > >>>> mutation
>>> > > >> >> > > > >>>> to the log i.e. at expiration of segments and
>>> append.
>>> > > >> >> > > > >>>>
>>> > > >> >> > > > >>>> I have been thinking whether we can do something
>>> better
>>> > > than
>>> > > >> >> the
>>> > > >> >> > > > simple
>>> > > >> >> > > > >>>> solution, hence the delay in replying. Let me tell
>>> you
>>> > my
>>> > > >> half
>>> > > >> >> > baked
>>> > > >> >> > > > >>>> train
>>> > > >> >> > > > >>>> of thoughts, perhaps, you can explore this as well.
>>> I
>>> > have
>>> > > >> been
>>> > > >> >> > > > thinking
>>> > > >> >> > > > >>>> about using LSO (last stable offset) to handle the
>>> case
>>> > > when
>>> > > >> >> the
>>> > > >> >> > > > >>>> partition
>>> > > >> >> > > > >>>> never had any transactions. For a partition which
>>> never
>>> > > had
>>> > > >> any
>>> > > >> >> > > > >>>> transaction, I would assume that the LSO is never
>>> > > >> initialized
>>> > > >> >> (or
>>> > > >> >> > is
>>> > > >> >> > > > >>>> equal
>>> > > >> >> > > > >>>> to log start offset)? Or is it equal to HW in that
>>> case?
>>> > > >> This
>>> > > >> >> is
>>> > > >> >> > > > >>>> something
>>> > > >> >> > > > >>>> that I am yet to verify. If this idea works, then we
>>> > would
>>> > > >> not
>>> > > >> >> > have
>>> > > >> >> > > to
>>> > > >> >> > > > >>>> iterate through the metadata for the dominant case
>>> where
>>> > > the
>>> > > >> >> > > partition
>>> > > >> >> > > > >>>> had
>>> > > >> >> > > > >>>> no transactions at all.
>>> > > >> >> > > > >>>>
>>> > > >> >> > > > >>>> --
>>> > > >> >> > > > >>>> Divij Vaidya
>>> > > >> >> > > > >>>>
>>> > > >> >> > > > >>>>
>>> > > >> >> > > > >>>>
>>> > > >> >> > > > >>>> On Tue, Jun 25, 2024 at 11:42 AM Kamal
>>> Chandraprakash <
>>> > > >> >> > > > >>>> kamal.chandraprak...@gmail.com> wrote:
>>> > > >> >> > > > >>>>
>>> > > >> >> > > > >>>> > Bump. Please review this proposal.
>>> > > >> >> > > > >>>> >
>>> > > >> >> > > > >>>> >
>>> > > >> >> > > > >>>> > On Mon, Jun 17, 2024 at 6:55 PM Kamal
>>> Chandraprakash <
>>> > > >> >> > > > >>>> > kamal.chandraprak...@gmail.com> wrote:
>>> > > >> >> > > > >>>> >
>>> > > >> >> > > > >>>> > > Divij,
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > > Thanks for the review! Updated the KIP with 1,
>>> 2, 3,
>>> > > >> and 4
>>> > > >> >> > > review
>>> > > >> >> > > > >>>> > > comments.
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > > > 4. Potential alternative - Instead of having
>>> an
>>> > > >> algorithm
>>> > > >> >> > > where
>>> > > >> >> > > > we
>>> > > >> >> > > > >>>> > > traverse
>>> > > >> >> > > > >>>> > > across segment metadata and looking for
>>> > isTxnIdxEmpty
>>> > > >> flag,
>>> > > >> >> > > should
>>> > > >> >> > > > >>>> we
>>> > > >> >> > > > >>>> > > directly introduce a nextSegmentWithTrxInx()
>>> > function?
>>> > > >> This
>>> > > >> >> > > would
>>> > > >> >> > > > >>>> allow
>>> > > >> >> > > > >>>> > > implementers to optimize the otherwise linear
>>> scan
>>> > > >> across
>>> > > >> >> > > metadata
>>> > > >> >> > > > >>>> for
>>> > > >> >> > > > >>>> > all
>>> > > >> >> > > > >>>> > > segments by using techniques such as skip list
>>> etc.
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > > This is a good point to optimize the scan. We
>>> need
>>> > to
>>> > > >> >> maintain
>>> > > >> >> > > the
>>> > > >> >> > > > >>>> > > skip-list
>>> > > >> >> > > > >>>> > > for each leader-epoch. With unclean leader
>>> election,
>>> > > >> some
>>> > > >> >> > > brokers
>>> > > >> >> > > > >>>> may not
>>> > > >> >> > > > >>>> > > have
>>> > > >> >> > > > >>>> > > the complete lineage. This will expand the
>>> scope of
>>> > > the
>>> > > >> >> work.
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > > In this version, we plan to optimize only for
>>> the
>>> > > below
>>> > > >> 2
>>> > > >> >> > cases:
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > > 1. A partition does not have the transaction
>>> index
>>> > for
>>> > > >> any
>>> > > >> >> of
>>> > > >> >> > > the
>>> > > >> >> > > > >>>> > uploaded
>>> > > >> >> > > > >>>> > > segments.
>>> > > >> >> > > > >>>> > >    The individual log segments `isTxnIdxEmpty`
>>> flag
>>> > > can
>>> > > >> be
>>> > > >> >> > > reduced
>>> > > >> >> > > > >>>> to a
>>> > > >> >> > > > >>>> > > single flag
>>> > > >> >> > > > >>>> > >    in RLMM (using AND operator) that can serve
>>> the
>>> > > >> query -
>>> > > >> >> "Is
>>> > > >> >> > > all
>>> > > >> >> > > > >>>> the
>>> > > >> >> > > > >>>> > > transaction indexes empty for a partition?".
>>> > > >> >> > > > >>>> > >    If yes, then we can directly scan the
>>> local-log
>>> > for
>>> > > >> >> aborted
>>> > > >> >> > > > >>>> > > transactions.
>>> > > >> >> > > > >>>> > > 2. A partition is produced using the
>>> transactional
>>> > > >> >> producer.
>>> > > >> >> > The
>>> > > >> >> > > > >>>> > > assumption made is that
>>> > > >> >> > > > >>>> > >     the transaction will either commit/rollback
>>> > within
>>> > > >> 15
>>> > > >> >> > > minutes
>>> > > >> >> > > > >>>> > >     (default transaction.max.timeout.ms = 15
>>> mins),
>>> > > >> >> possibly
>>> > > >> >> > we
>>> > > >> >> > > > >>>> may have
>>> > > >> >> > > > >>>> > > to search only
>>> > > >> >> > > > >>>> > >     a few consecutive remote log segments to
>>> collect
>>> > > the
>>> > > >> >> > aborted
>>> > > >> >> > > > >>>> > > transactions.
>>> > > >> >> > > > >>>> > > 3. A partition is being produced with both
>>> normal
>>> > and
>>> > > >> >> > > > transactional
>>> > > >> >> > > > >>>> > > producers. In this case,
>>> > > >> >> > > > >>>> > >     we will be doing linear traversal.
>>> Maintaining a
>>> > > >> >> skip-list
>>> > > >> >> > > > might
>>> > > >> >> > > > >>>> > > improve the performance but
>>> > > >> >> > > > >>>> > >     we delegate the RLMM implementation to
>>> users. If
>>> > > >> >> > implemented
>>> > > >> >> > > > >>>> > > incorrectly, then it can lead
>>> > > >> >> > > > >>>> > >     to delivery of the aborted transaction
>>> records
>>> > to
>>> > > >> the
>>> > > >> >> > > > consumer.
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > > I notice two drawbacks with the reduction
>>> method as
>>> > > >> >> proposed
>>> > > >> >> > in
>>> > > >> >> > > > the
>>> > > >> >> > > > >>>> KIP:
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > > 1. Even if one segment has a transaction index,
>>> then
>>> > > we
>>> > > >> >> have
>>> > > >> >> > to
>>> > > >> >> > > > >>>> iterate
>>> > > >> >> > > > >>>> > > over all the metadata events.
>>> > > >> >> > > > >>>> > > 2. Assume that there are 10 segments and
>>> segment-5
>>> > > has a
>>> > > >> >> txn
>>> > > >> >> > > > index.
>>> > > >> >> > > > >>>> Once
>>> > > >> >> > > > >>>> > > the first 6 segments are deleted,
>>> > > >> >> > > > >>>> > >     due to breach by time/size/start-offset,
>>> then we
>>> > > >> should
>>> > > >> >> > > return
>>> > > >> >> > > > >>>> `true`
>>> > > >> >> > > > >>>> > > for "Is all the transaction indexes empty for a
>>> > > >> partition?"
>>> > > >> >> > > > >>>> > >    query but it will return `false` until the
>>> broker
>>> > > >> gets
>>> > > >> >> > > > restarted
>>> > > >> >> > > > >>>> and
>>> > > >> >> > > > >>>> > we
>>> > > >> >> > > > >>>> > > have to resort to iterate over all the metadata
>>> > > events.
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > > > 5. Potential alternative#2 - We know that we
>>> may
>>> > > want
>>> > > >> the
>>> > > >> >> > > > indexes
>>> > > >> >> > > > >>>> of
>>> > > >> >> > > > >>>> > > multiple higher segments. Instead of fetching
>>> them
>>> > > >> >> > sequentially,
>>> > > >> >> > > > we
>>> > > >> >> > > > >>>> could
>>> > > >> >> > > > >>>> > > implement a parallel fetch or a pre-fetch for
>>> the
>>> > > >> indexes.
>>> > > >> >> > This
>>> > > >> >> > > > >>>> would
>>> > > >> >> > > > >>>> > help
>>> > > >> >> > > > >>>> > > hide the latency of sequentially fetching the
>>> trx
>>> > > >> indexes.
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > > We can implement parallel-fetch/prefetch once
>>> the
>>> > > tiered
>>> > > >> >> > storage
>>> > > >> >> > > > is
>>> > > >> >> > > > >>>> GAed.
>>> > > >> >> > > > >>>> > > Since this feature will be useful
>>> > > >> >> > > > >>>> > > to prefetch the next remote log segment and it
>>> > expands
>>> > > >> the
>>> > > >> >> > scope
>>> > > >> >> > > > of
>>> > > >> >> > > > >>>> the
>>> > > >> >> > > > >>>> > > work.
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > > > 6. Should the proposed API take "segmentId"
>>> as a
>>> > > >> >> parameter
>>> > > >> >> > > > >>>> instead of
>>> > > >> >> > > > >>>> > > "topicIdPartition"? Suggesting because
>>> isTxnIdEmpty
>>> > is
>>> > > >> not
>>> > > >> >> a
>>> > > >> >> > > > >>>> property of
>>> > > >> >> > > > >>>> > a
>>> > > >> >> > > > >>>> > > partition, instead it's a property of a specific
>>> > > >> segment.
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > > We propose to use the `topicIdPartition` in
>>> > > >> >> > > > >>>> RemoteLogMetadataManager.
>>> > > >> >> > > > >>>> > > The implementation can fold/reduce the value of
>>> the
>>> > > >> >> individual
>>> > > >> >> > > log
>>> > > >> >> > > > >>>> > segment
>>> > > >> >> > > > >>>> > > `isTxnIdEmpty` flag. This is added to avoid
>>> scanning
>>> > > all
>>> > > >> >> the
>>> > > >> >> > > > >>>> metadata
>>> > > >> >> > > > >>>> > > events
>>> > > >> >> > > > >>>> > > when the partition does not have a transaction
>>> index
>>> > > in
>>> > > >> >> any of
>>> > > >> >> > > the
>>> > > >> >> > > > >>>> > > segments.
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya <
>>> > > >> >> > > > >>>> divijvaidy...@gmail.com>
>>> > > >> >> > > > >>>> > > wrote:
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> > >> Hi Kamal
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> > >> Thanks for bringing this up. This is a problem
>>> > worth
>>> > > >> >> solving.
>>> > > >> >> > > We
>>> > > >> >> > > > >>>> have
>>> > > >> >> > > > >>>> > >> faced
>>> > > >> >> > > > >>>> > >> this in situations where some Kafka clients
>>> default
>>> > > to
>>> > > >> >> > > > >>>> read_committed
>>> > > >> >> > > > >>>> > mode
>>> > > >> >> > > > >>>> > >> and end up having high latencies for remote
>>> fetches
>>> > > >> due to
>>> > > >> >> > this
>>> > > >> >> > > > >>>> > traversal
>>> > > >> >> > > > >>>> > >> across all segments.
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> > >> First some nits to clarify the KIP:
>>> > > >> >> > > > >>>> > >> 1. The motivation should make it clear that
>>> > traversal
>>> > > >> of
>>> > > >> >> all
>>> > > >> >> > > > >>>> segments is
>>> > > >> >> > > > >>>> > >> only in the worst case. If I am not mistaken
>>> > (please
>>> > > >> >> correct
>>> > > >> >> > me
>>> > > >> >> > > > if
>>> > > >> >> > > > >>>> > wrong),
>>> > > >> >> > > > >>>> > >> the traversal stops when it has found a segment
>>> > > >> containing
>>> > > >> >> > LSO.
>>> > > >> >> > > > >>>> > >> 2. There is nothing like a non-txn topic. A
>>> > > transaction
>>> > > >> >> may
>>> > > >> >> > be
>>> > > >> >> > > > >>>> started
>>> > > >> >> > > > >>>> > on
>>> > > >> >> > > > >>>> > >> any topic. Perhaps, rephrase the statement in
>>> the
>>> > KIP
>>> > > >> so
>>> > > >> >> that
>>> > > >> >> > > it
>>> > > >> >> > > > is
>>> > > >> >> > > > >>>> > clear
>>> > > >> >> > > > >>>> > >> to the reader.
>>> > > >> >> > > > >>>> > >> 3. The hyperlink in the "the broker has to
>>> traverse
>>> > > all
>>> > > >> >> > the..."
>>> > > >> >> > > > >>>> seems
>>> > > >> >> > > > >>>> > >> incorrect. Did you want to point to
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> >
>>> > > >> >> > > > >>>>
>>> > > >> >> > > >
>>> > > >> >> > >
>>> > > >> >> >
>>> > > >> >>
>>> > > >>
>>> > >
>>> >
>>> https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444
>>> > > >> >> > > > >>>> > >> ?
>>> > > >> >> > > > >>>> > >> 4. In the testing section, could we add a test
>>> > plan?
>>> > > >> For
>>> > > >> >> > > > example, I
>>> > > >> >> > > > >>>> > would
>>> > > >> >> > > > >>>> > >> list down adding a test which would verify the
>>> > number
>>> > > >> of
>>> > > >> >> > calls
>>> > > >> >> > > > >>>> made to
>>> > > >> >> > > > >>>> > >> RLMM. This test would have a higher number of
>>> calls
>>> > > >> >> earlier
>>> > > >> >> > vs.
>>> > > >> >> > > > >>>> after
>>> > > >> >> > > > >>>> > this
>>> > > >> >> > > > >>>> > >> KIP.
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> > >> Other thoughts:
>>> > > >> >> > > > >>>> > >> 4. Potential alternative - Instead of having an
>>> > > >> algorithm
>>> > > >> >> > where
>>> > > >> >> > > > we
>>> > > >> >> > > > >>>> > >> traverse
>>> > > >> >> > > > >>>> > >> across segment metadata and looking for
>>> > isTxnIdxEmpty
>>> > > >> >> flag,
>>> > > >> >> > > > should
>>> > > >> >> > > > >>>> we
>>> > > >> >> > > > >>>> > >> directly introduce a nextSegmentWithTrxInx()
>>> > > function?
>>> > > >> >> This
>>> > > >> >> > > would
>>> > > >> >> > > > >>>> allow
>>> > > >> >> > > > >>>> > >> implementers to optimize the otherwise linear
>>> scan
>>> > > >> across
>>> > > >> >> > > > metadata
>>> > > >> >> > > > >>>> for
>>> > > >> >> > > > >>>> > all
>>> > > >> >> > > > >>>> > >> segments by using techniques such as skip list
>>> etc.
>>> > > >> >> > > > >>>> > >> 5. Potential alternative#2 - We know that we
>>> may
>>> > want
>>> > > >> the
>>> > > >> >> > > indexes
>>> > > >> >> > > > >>>> of
>>> > > >> >> > > > >>>> > >> multiple higher segments. Instead of fetching
>>> them
>>> > > >> >> > > sequentially,
>>> > > >> >> > > > we
>>> > > >> >> > > > >>>> > could
>>> > > >> >> > > > >>>> > >> implement a parallel fetch or a pre-fetch for
>>> the
>>> > > >> indexes.
>>> > > >> >> > This
>>> > > >> >> > > > >>>> would
>>> > > >> >> > > > >>>> > help
>>> > > >> >> > > > >>>> > >> hide the latency of sequentially fetching the
>>> trx
>>> > > >> indexes.
>>> > > >> >> > > > >>>> > >> 6. Should the proposed API take "segmentId" as
>>> a
>>> > > >> parameter
>>> > > >> >> > > > instead
>>> > > >> >> > > > >>>> of
>>> > > >> >> > > > >>>> > >> "topicIdPartition"? Suggesting because
>>> isTxnIdEmpty
>>> > > is
>>> > > >> >> not a
>>> > > >> >> > > > >>>> property
>>> > > >> >> > > > >>>> > of a
>>> > > >> >> > > > >>>> > >> partition, instead it's a property of a
>>> specific
>>> > > >> segment.
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> > >> Looking forward to hearing your thoughts about
>>> the
>>> > > >> >> > > alternatives.
>>> > > >> >> > > > >>>> Let's
>>> > > >> >> > > > >>>> > get
>>> > > >> >> > > > >>>> > >> this fixed.
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> > >> --
>>> > > >> >> > > > >>>> > >> Divij Vaidya
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal
>>> > > Chandraprakash <
>>> > > >> >> > > > >>>> > >> kamal.chandraprak...@gmail.com> wrote:
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> > >> > Hi all,
>>> > > >> >> > > > >>>> > >> >
>>> > > >> >> > > > >>>> > >> > I have opened a KIP-1058
>>> > > >> >> > > > >>>> > >> > <
>>> > > >> >> > > > >>>> > >> >
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> >
>>> > > >> >> > > > >>>>
>>> > > >> >> > > >
>>> > > >> >> > >
>>> > > >> >> >
>>> > > >> >>
>>> > > >>
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
>>> > > >> >> > > > >>>> > >> > >
>>> > > >> >> > > > >>>> > >> > to reduce the pressure on remote storage when
>>> > > >> >> transactional
>>> > > >> >> > > > >>>> consumers
>>> > > >> >> > > > >>>> > >> are
>>> > > >> >> > > > >>>> > >> > reading non-txn topics from remote storage.
>>> > > >> >> > > > >>>> > >> >
>>> > > >> >> > > > >>>> > >> >
>>> > > >> >> > > > >>>> > >> >
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> >
>>> > > >> >> > > > >>>>
>>> > > >> >> > > >
>>> > > >> >> > >
>>> > > >> >> >
>>> > > >> >>
>>> > > >>
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
>>> > > >> >> > > > >>>> > >> >
>>> > > >> >> > > > >>>> > >> > Feedbacks and suggestions are welcome.
>>> > > >> >> > > > >>>> > >> >
>>> > > >> >> > > > >>>> > >> > Thanks,
>>> > > >> >> > > > >>>> > >> > Kamal
>>> > > >> >> > > > >>>> > >> >
>>> > > >> >> > > > >>>> > >>
>>> > > >> >> > > > >>>> > >
>>> > > >> >> > > > >>>> >
>>> > > >> >> > > > >>>>
>>> > > >> >> > > > >>>
>>> > > >> >> > > >
>>> > > >> >> > >
>>> > > >> >> >
>>> > > >> >>
>>> > > >> >
>>> > > >>
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to