Thanks for patiently addressing all the comments. I will add the vote in the other thread.
-- Divij Vaidya On Mon, Nov 4, 2024 at 5:40 AM Kamal Chandraprakash < kamal.chandraprak...@gmail.com> wrote: > Hi all, > > If the review comments are addressed and KIP looks good, please submit your > vote on the voting thread. > > Thanks, > Kamal > > On Sat, Nov 2, 2024 at 7:52 PM Kamal Chandraprakash < > kamal.chandraprak...@gmail.com> wrote: > > > I've opened a PR which adds an integration test > > < > https://github.com/apache/kafka/pull/17668/files#diff-a60b518846fc0f770164d55b6d7cd31e03a002514377578c80c6e22cc120af40R88 > > > > to show the impact of this change/KIP. PTAL. > > > > -- > > Kamal > > > > On Fri, Nov 1, 2024 at 3:31 PM Kamal Chandraprakash < > > kamal.chandraprak...@gmail.com> wrote: > > > >> Thanks for the review! > >> > >> > 5 > >> Updated the KIP to include the `isEmpty` method in the transaction index > >> > >> > 6 > >> You're right. The offset parameter will be equal to the > >> next-segment-to-consider base offset. > >> But, the API introduced in RLMM is for *one* epoch. The next epoch > >> start-offset may not be > >> the base-offset. You can refer to this line > >> < > https://github.com/apache/kafka/pull/17659/files#diff-380e4d8859ea9148f21794c09039425c82d9012a392c2dbbe1ce2ec8677a1970R1857 > > > >> in the draft PR. > >> > >> > >> On Fri, Nov 1, 2024 at 3:16 PM Divij Vaidya <divijvaidy...@gmail.com> > >> wrote: > >> > >>> Last few things - > >>> > >>> # 5 > >>> About setting the TrxIndexEmpty field, could we introduce an isEmpty() > >>> function in TransactionIndex which has the following implementation: > >>> > >>> public boolean isEmpty() { > >>> return !iterable().iterator().hasNext(); > >>> } > >>> > >>> > >>> The advantages of this approach is: > >>> 1. It works for both cases when the file is not present and also when > the > >>> file is present but is empty. > >>> 2. It prevents leaking the underlying implementation of > TransactionIndex > >>> outside via the file() method. I think that making file() as public is > an > >>> implementation leak (for example, what is the trx indx is not file > >>> based!). > >>> > >>> > >>> #6 > >>> In the documentation for nextSegmentWithTxnIndex, the offset parameter > >>> should be equal to the next-segment-to-consider's base offset, no? > >>> I assume that we will add a new fetch here with nextSegmentBaseOffset > >>> > >>> > https://github.com/apache/kafka/blob/346fdbafc539bc48bb66eedae89a15e240007fd9/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1801 > >>> . Is there a case where the parameter "offset" will not be equal to the > >>> baseOffset of a segment? > >>> > >>> -- > >>> Divij Vaidya > >>> > >>> > >>> > >>> On Fri, Nov 1, 2024 at 10:26 AM Kamal Chandraprakash < > >>> kamal.chandraprak...@gmail.com> wrote: > >>> > >>> > Hi Divij, > >>> > > >>> > Thanks for the detailed review! > >>> > > >>> > > 1, 2, 3, 4 > >>> > Updated the KIP-1058 > >>> > < > >>> > > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions > >>> > > > >>> > with the feedback received and also opened a draft PR for #17659 > >>> > <https://github.com/apache/kafka/pull/17659/files> reference. > >>> > PTAL. > >>> > > >>> > > 5. How are we determining the value of the TrxIndexEmpty field on > >>> segment > >>> > rotation? > >>> > Transaction index file is optional, the file does not exists when > >>> there are > >>> > no aborted txn entries for a > >>> > segment, we will be using the file null check. Also, updated it in > the > >>> KIP. > >>> > > >>> > Thanks, > >>> > Kamal > >>> > > >>> > > >>> > On Tue, Oct 29, 2024 at 8:47 PM Divij Vaidya < > divijvaidy...@gmail.com> > >>> > wrote: > >>> > > >>> > > A few more points to discuss (please add to the KIP as well) > >>> > > > >>> > > 5. How are we determining the value of the TrxIndexEmpty field on > >>> segment > >>> > > rotation? > >>> > > > >>> > > One option is to do a boolean txnIdxEmpty = > >>> > > segment.txnIndex().allAbortedTxns().isEmpty() but this will have an > >>> > > overhead of reading the contents of the file and storing them in > >>> memory, > >>> > > when we have a non-empty index. > >>> > > The other option (preferred) is to add an isEmpty() public method > to > >>> the > >>> > > TransactionIndex and perform a segment.txnIndex().isEmpty() check > >>> which > >>> > > will internally use Files.size() java API. > >>> > > > >>> > > On Tue, Oct 29, 2024 at 1:21 PM Divij Vaidya < > >>> divijvaidy...@gmail.com> > >>> > > wrote: > >>> > > > >>> > > > Let's get the ball rolling (again) on this one. > >>> > > > > >>> > > > Kamal, could you please add the following to the KIP: > >>> > > > 1. the API as discussed above. Please add the failure modes for > >>> this > >>> > API > >>> > > > as well such as the exceptions thrown and a recommendation on > how a > >>> > > caller > >>> > > > is expected to handle those. I am assuming that the three > >>> parameters > >>> > for > >>> > > > this API will be topicPartition, epoch and offset. > >>> > > > 2. implementation details for Topic based RLMM. I am assuming > that > >>> the > >>> > > > plugin will default the field to false if this field is absent > >>> (case of > >>> > > old > >>> > > > metadata). > >>> > > > 3. In the test plan section, additionally, we need to assert that > >>> we > >>> > > don't > >>> > > > read metadata for all segments (i.e. it is not a linear search) > >>> from > >>> > the > >>> > > > Topic based RLMM. > >>> > > > 4. in the compatibility section, please document how the existing > >>> > > clusters > >>> > > > with Tiered Storage metadata will work during/after a rolling > >>> upgrade > >>> > to > >>> > > a > >>> > > > version which contains this new change. > >>> > > > > >>> > > > -- > >>> > > > Divij Vaidya > >>> > > > > >>> > > > > >>> > > > > >>> > > > On Fri, Oct 11, 2024 at 12:26 PM Kamal Chandraprakash < > >>> > > > kamal.chandraprak...@gmail.com> wrote: > >>> > > > > >>> > > >> Bump for review. > >>> > > >> > >>> > > >> If the additional proposal looks good, I'll append them to the > >>> KIP. > >>> > > PTAL. > >>> > > >> > >>> > > >> New API in RLMM#nextRemoteLogSegmentMetadataWithTxnIndex > >>> > > >> > >>> > > >> -- > >>> > > >> Kamal > >>> > > >> > >>> > > >> On Sun, Oct 6, 2024 at 7:20 PM Kamal Chandraprakash < > >>> > > >> kamal.chandraprak...@gmail.com> wrote: > >>> > > >> > >>> > > >> > Hi Christo, > >>> > > >> > > >>> > > >> > Thanks for the review! > >>> > > >> > > >>> > > >> > Adding the new API `nextRemoteLogSegmentMetadataWithTxnIndex` > in > >>> > RLMM > >>> > > >> > helps to > >>> > > >> > reduce the complexity of linear search. With this API, we have > >>> to: > >>> > > >> > > >>> > > >> > 1. Maintain one more skip-list [1] for each of the epochs in > the > >>> > > >> partition > >>> > > >> > in RLMM that might > >>> > > >> > increase the memory usage of TopicBased RLMM > implementation. > >>> > > >> > 1a) The skip-list will be empty when there are no aborted > >>> txn > >>> > > >> entries > >>> > > >> > for a partition/epoch which is the predominant case. > >>> > > >> > 1b) The skip-list will act as a duplicate when *most* of > the > >>> > > >> segments > >>> > > >> > have aborted txn entries, assuming aborted txn are quite low, > >>> this > >>> > > >> should > >>> > > >> > be fine. > >>> > > >> > 2. Change the logic to retrieve the aborted txns (we have to > >>> query > >>> > the > >>> > > >> > nextRLSMWithTxnIndex > >>> > > >> > for each of the leader-epoch). > >>> > > >> > 3. Logic divergence from how we retrieve the aborted txn > entries > >>> > > >> compared > >>> > > >> > to the local-log. > >>> > > >> > > >>> > > >> > The approach looks good to me. If everyone is aligned, then we > >>> can > >>> > > >> proceed > >>> > > >> > to add this API to RLMM. > >>> > > >> > > >>> > > >> > Another option I was thinking of is to capture the > >>> > > `lastStableOffsetLag` > >>> > > >> > [2] while rotating the segment. > >>> > > >> > But, that is a bigger change we can take later. > >>> > > >> > > >>> > > >> > [1]: > >>> > > >> > > >>> > > >> > >>> > > > >>> > > >>> > https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java?L43 > >>> > > >> > [2]: > >>> > > >> > > >>> > > >> > >>> > > > >>> > > >>> > https://sourcegraph.com/github.com/apache/kafka/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L432 > >>> > > >> > > >>> > > >> > > >>> > > >> > Thanks, > >>> > > >> > Kamal > >>> > > >> > > >>> > > >> > On Fri, Oct 4, 2024 at 4:21 PM Christo Lolov < > >>> > christolo...@gmail.com> > >>> > > >> > wrote: > >>> > > >> > > >>> > > >> >> Heya, > >>> > > >> >> > >>> > > >> >> Apologies for the delay. I have been thinking about this > >>> problem > >>> > > >> recently > >>> > > >> >> as well and while I believe storing a boolean in the metadata > >>> is > >>> > > good, > >>> > > >> I > >>> > > >> >> think we can do better by introducing a new method to the > RLMM > >>> > along > >>> > > >> the > >>> > > >> >> lines of > >>> > > >> >> > >>> > > >> >> Optional<RemoteLogSegmentMetadata> > >>> > > >> >> nextRemoteLogSegmentMetadataWithTxnIndex(TopicIdPartition > >>> > > >> >> topicIdPartition, > >>> > > >> >> int epochForOffset, long offset) throws > RemoteStorageException > >>> > > >> >> > >>> > > >> >> This will help plugin implementers to build optimisations > such > >>> as > >>> > > skip > >>> > > >> >> lists which will give them the next segment quicker than a > >>> linear > >>> > > >> search. > >>> > > >> >> > >>> > > >> >> I am keen to hear your thoughts! > >>> > > >> >> > >>> > > >> >> Best, > >>> > > >> >> Christo > >>> > > >> >> > >>> > > >> >> On Fri, 4 Oct 2024 at 10:48, Kamal Chandraprakash < > >>> > > >> >> kamal.chandraprak...@gmail.com> wrote: > >>> > > >> >> > >>> > > >> >> > Hi Luke, > >>> > > >> >> > > >>> > > >> >> > Thanks for the review! > >>> > > >> >> > > >>> > > >> >> > > Do you think it is helpful if we store the "least abort > >>> start > >>> > > >> offset > >>> > > >> >> in > >>> > > >> >> > the > >>> > > >> >> > segment", and -1 means no txnIndex. So that we can have a > >>> way to > >>> > > know > >>> > > >> >> if we > >>> > > >> >> > need to fetch this txn index or not. > >>> > > >> >> > > >>> > > >> >> > 1. No, this change won't have an effect. To find the > >>> upper-bound > >>> > > >> offset > >>> > > >> >> > [1], we have to > >>> > > >> >> > fetch that segment's offset index file. The > >>> RemoteIndexCache > >>> > > [2] > >>> > > >> >> > fetches all the 3 > >>> > > >> >> > index files together and caches them for subsequent > use, > >>> so > >>> > > this > >>> > > >> >> > improvement > >>> > > >> >> > won't have an effect as the current segment txn index > >>> gets > >>> > > >> >> downloaded > >>> > > >> >> > anyway. > >>> > > >> >> > > >>> > > >> >> > 2. The reason for choosing boolean is to make the change > >>> backward > >>> > > >> >> > compatible. > >>> > > >> >> > There can be existing RLM events for the uploaded > >>> segments. > >>> > > The > >>> > > >> >> > default > >>> > > >> >> > value of `txnIdxEmpty` is false so the *old* RLM > events > >>> are > >>> > > >> >> assumed to > >>> > > >> >> > contain > >>> > > >> >> > the txn index files and those files are downloaded if > >>> they > >>> > > >> exist. > >>> > > >> >> > > >>> > > >> >> > [1]: > >>> > > >> >> > > >>> > > >> >> > > >>> > > >> >> > >>> > > >> > >>> > > > >>> > > >>> > https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1732 > >>> > > >> >> > [2]: > >>> > > >> >> > > >>> > > >> >> > > >>> > > >> >> > >>> > > >> > >>> > > > >>> > > >>> > https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java?L383 > >>> > > >> >> > > >>> > > >> >> > Thanks, > >>> > > >> >> > Kamal > >>> > > >> >> > > >>> > > >> >> > On Thu, Oct 3, 2024 at 3:11 PM Luke Chen < > show...@gmail.com> > >>> > > wrote: > >>> > > >> >> > > >>> > > >> >> > > Hi Kamal, > >>> > > >> >> > > > >>> > > >> >> > > Sorry for the late review. > >>> > > >> >> > > Thanks for the KIP, this will improve the transaction > >>> reading > >>> > for > >>> > > >> >> remote > >>> > > >> >> > > storage. > >>> > > >> >> > > Overall LGTM, just one minor thought: > >>> > > >> >> > > > >>> > > >> >> > > Currently, we only store the `TxnIndexEmpty` bool value > in > >>> the > >>> > > >> segment > >>> > > >> >> > > metadata. > >>> > > >> >> > > Do you think it is helpful if we store the "least abort > >>> start > >>> > > >> offset > >>> > > >> >> in > >>> > > >> >> > the > >>> > > >> >> > > segment", and -1 means no txnIndex. So that we can have a > >>> way > >>> > to > >>> > > >> know > >>> > > >> >> if > >>> > > >> >> > we > >>> > > >> >> > > need to fetch this txn index or not. > >>> > > >> >> > > > >>> > > >> >> > > Thanks. > >>> > > >> >> > > Luke > >>> > > >> >> > > > >>> > > >> >> > > On Mon, Sep 9, 2024 at 3:26 PM Kamal Chandraprakash < > >>> > > >> >> > > kamal.chandraprak...@gmail.com> wrote: > >>> > > >> >> > > > >>> > > >> >> > > > Hi all, > >>> > > >> >> > > > > >>> > > >> >> > > > If there are no more comments, I'll start a voting > thread > >>> > soon. > >>> > > >> >> > > > > >>> > > >> >> > > > Thanks, > >>> > > >> >> > > > Kamal > >>> > > >> >> > > > > >>> > > >> >> > > > On Fri, Sep 6, 2024 at 7:28 PM Kamal Chandraprakash < > >>> > > >> >> > > > kamal.chandraprak...@gmail.com> wrote: > >>> > > >> >> > > > > >>> > > >> >> > > > > Bumping this thread again for review! > >>> > > >> >> > > > > > >>> > > >> >> > > > > Reduced the scope of the proposal to minimum. We will > >>> be > >>> > > adding > >>> > > >> >> only > >>> > > >> >> > > one > >>> > > >> >> > > > > field (txnIdxEmpty) to the > >>> > > >> >> > > > > RemoteLogSegmentMetadata event which is backward > >>> > compatible. > >>> > > >> PTAL. > >>> > > >> >> > > > > > >>> > > >> >> > > > > Thanks, > >>> > > >> >> > > > > Kamal > >>> > > >> >> > > > > > >>> > > >> >> > > > > > >>> > > >> >> > > > > On Tue, Aug 13, 2024 at 6:33 PM Kamal Chandraprakash > < > >>> > > >> >> > > > > kamal.chandraprak...@gmail.com> wrote: > >>> > > >> >> > > > > > >>> > > >> >> > > > >> Bumping this thread for KIP review! > >>> > > >> >> > > > >> > >>> > > >> >> > > > >> We can go for the simplest solution that is proposed > >>> in > >>> > this > >>> > > >> KIP > >>> > > >> >> and > >>> > > >> >> > > > >> it can be improved in the subsequent iteration. > PTAL. > >>> > > >> >> > > > >> > >>> > > >> >> > > > >> Thanks, > >>> > > >> >> > > > >> Kamal > >>> > > >> >> > > > >> > >>> > > >> >> > > > >> On Fri, Aug 2, 2024 at 11:42 AM Kamal > Chandraprakash < > >>> > > >> >> > > > >> kamal.chandraprak...@gmail.com> wrote: > >>> > > >> >> > > > >> > >>> > > >> >> > > > >>> Hi Divij, > >>> > > >> >> > > > >>> > >>> > > >> >> > > > >>> Thanks for the review! And, sorry for the late > reply. > >>> > > >> >> > > > >>> > >>> > > >> >> > > > >>> From the UnifiedLog.scala > >>> > > >> >> > > > >>> < > >>> > > >> >> > > > > >>> > > >> >> > > > >>> > > >> >> > > >>> > > >> >> > >>> > > >> > >>> > > > >>> > > >>> > https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L421-427 > >>> > > >> >> > > > > > >>> > > >> >> > > > >>> doc: > >>> > > >> >> > > > >>> > >>> > > >> >> > > > >>> """ > >>> > > >> >> > > > >>> The last stable offset (LSO) is defined as the > first > >>> > offset > >>> > > >> such > >>> > > >> >> > that > >>> > > >> >> > > > >>> all lower offsets have been "decided." > >>> > > >> >> > > > >>> * Non-transactional messages are considered > >>> decided > >>> > > >> >> immediately, > >>> > > >> >> > > but > >>> > > >> >> > > > >>> transactional messages are only decided when > >>> > > >> >> > > > >>> * the corresponding COMMIT or ABORT marker is > >>> written. > >>> > > >> This > >>> > > >> >> > > implies > >>> > > >> >> > > > >>> that the last stable offset will be equal > >>> > > >> >> > > > >>> * to the high watermark if there are no > >>> transactional > >>> > > >> >> messages > >>> > > >> >> > in > >>> > > >> >> > > > the > >>> > > >> >> > > > >>> log. Note also that the LSO cannot advance > >>> > > >> >> > > > >>> * beyond the high watermark. > >>> > > >> >> > > > >>> """ > >>> > > >> >> > > > >>> While rolling the active segment to passive, if LSO > >>> > equals > >>> > > to > >>> > > >> >> HW, > >>> > > >> >> > > then > >>> > > >> >> > > > >>> all the messages in that segment are > >>> > > >> >> > > > >>> decided and we can store the `lastStableOffsetLag` > >>> as an > >>> > > >> >> attribute > >>> > > >> >> > in > >>> > > >> >> > > > >>> the rolled segment. We can then propagate > >>> > > >> >> > > > >>> the `lastStableOffsetLag` information in the > >>> > > >> RemoteLogMetadata > >>> > > >> >> > > events. > >>> > > >> >> > > > >>> > >>> > > >> >> > > > >>> While reading the remote log segment, if the > >>> > > >> >> `lastStableOffsetLag` > >>> > > >> >> > is > >>> > > >> >> > > > 0, > >>> > > >> >> > > > >>> then there is no need to traverse to > >>> > > >> >> > > > >>> the subsequent segments for aborted transactions > >>> which > >>> > > covers > >>> > > >> >> the > >>> > > >> >> > > case > >>> > > >> >> > > > >>> for the dominant case where the > >>> > > >> >> > > > >>> partition had no transactions at all. > >>> > > >> >> > > > >>> > >>> > > >> >> > > > >>> With Log compaction, the shrinked segments might > get > >>> > > merged. > >>> > > >> One > >>> > > >> >> > > option > >>> > > >> >> > > > >>> is to take the max of `lastStableOffsetLag` > >>> > > >> >> > > > >>> and store it in the new LogSegment. Since, the > tiered > >>> > > storage > >>> > > >> >> does > >>> > > >> >> > > not > >>> > > >> >> > > > >>> support compacted topics / historical compacted > >>> > > >> >> > > > >>> topics, we can omit this case. > >>> > > >> >> > > > >>> > >>> > > >> >> > > > >>> If this approach looks good, I can update the KIP > >>> with > >>> > the > >>> > > >> >> details. > >>> > > >> >> > > > >>> > >>> > > >> >> > > > >>> -- > >>> > > >> >> > > > >>> Kamal > >>> > > >> >> > > > >>> > >>> > > >> >> > > > >>> > >>> > > >> >> > > > >>> > >>> > > >> >> > > > >>> > >>> > > >> >> > > > >>> On Tue, Jun 25, 2024 at 4:24 PM Divij Vaidya < > >>> > > >> >> > > divijvaidy...@gmail.com> > >>> > > >> >> > > > >>> wrote: > >>> > > >> >> > > > >>> > >>> > > >> >> > > > >>>> Hi Kamal > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> Thanks for the bump. I have been thinking about > this > >>> > > >> passively > >>> > > >> >> for > >>> > > >> >> > > the > >>> > > >> >> > > > >>>> past > >>> > > >> >> > > > >>>> few days. > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> The simplest solution is to store a state at > segment > >>> > level > >>> > > >> >> > metadata. > >>> > > >> >> > > > The > >>> > > >> >> > > > >>>> state should specify whether the trx index is > empty > >>> or > >>> > > not. > >>> > > >> It > >>> > > >> >> > would > >>> > > >> >> > > > be > >>> > > >> >> > > > >>>> populated during segment archival. We would then > >>> iterate > >>> > > >> over > >>> > > >> >> the > >>> > > >> >> > > > >>>> metadata > >>> > > >> >> > > > >>>> for future segments without having to make a > remote > >>> call > >>> > > to > >>> > > >> >> > download > >>> > > >> >> > > > the > >>> > > >> >> > > > >>>> trx index itself. > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> The other solution for storing state at a > partition > >>> > level > >>> > > >> >> wouldn't > >>> > > >> >> > > > >>>> work, as > >>> > > >> >> > > > >>>> you mentioned, because we will have to change the > >>> state > >>> > on > >>> > > >> >> every > >>> > > >> >> > > > >>>> mutation > >>> > > >> >> > > > >>>> to the log i.e. at expiration of segments and > >>> append. > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> I have been thinking whether we can do something > >>> better > >>> > > than > >>> > > >> >> the > >>> > > >> >> > > > simple > >>> > > >> >> > > > >>>> solution, hence the delay in replying. Let me tell > >>> you > >>> > my > >>> > > >> half > >>> > > >> >> > baked > >>> > > >> >> > > > >>>> train > >>> > > >> >> > > > >>>> of thoughts, perhaps, you can explore this as > well. > >>> I > >>> > have > >>> > > >> been > >>> > > >> >> > > > thinking > >>> > > >> >> > > > >>>> about using LSO (last stable offset) to handle the > >>> case > >>> > > when > >>> > > >> >> the > >>> > > >> >> > > > >>>> partition > >>> > > >> >> > > > >>>> never had any transactions. For a partition which > >>> never > >>> > > had > >>> > > >> any > >>> > > >> >> > > > >>>> transaction, I would assume that the LSO is never > >>> > > >> initialized > >>> > > >> >> (or > >>> > > >> >> > is > >>> > > >> >> > > > >>>> equal > >>> > > >> >> > > > >>>> to log start offset)? Or is it equal to HW in that > >>> case? > >>> > > >> This > >>> > > >> >> is > >>> > > >> >> > > > >>>> something > >>> > > >> >> > > > >>>> that I am yet to verify. If this idea works, then > we > >>> > would > >>> > > >> not > >>> > > >> >> > have > >>> > > >> >> > > to > >>> > > >> >> > > > >>>> iterate through the metadata for the dominant case > >>> where > >>> > > the > >>> > > >> >> > > partition > >>> > > >> >> > > > >>>> had > >>> > > >> >> > > > >>>> no transactions at all. > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> -- > >>> > > >> >> > > > >>>> Divij Vaidya > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> On Tue, Jun 25, 2024 at 11:42 AM Kamal > >>> Chandraprakash < > >>> > > >> >> > > > >>>> kamal.chandraprak...@gmail.com> wrote: > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> > Bump. Please review this proposal. > >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > On Mon, Jun 17, 2024 at 6:55 PM Kamal > >>> Chandraprakash < > >>> > > >> >> > > > >>>> > kamal.chandraprak...@gmail.com> wrote: > >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > Divij, > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > Thanks for the review! Updated the KIP with 1, > >>> 2, 3, > >>> > > >> and 4 > >>> > > >> >> > > review > >>> > > >> >> > > > >>>> > > comments. > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > > 4. Potential alternative - Instead of having > >>> an > >>> > > >> algorithm > >>> > > >> >> > > where > >>> > > >> >> > > > we > >>> > > >> >> > > > >>>> > > traverse > >>> > > >> >> > > > >>>> > > across segment metadata and looking for > >>> > isTxnIdxEmpty > >>> > > >> flag, > >>> > > >> >> > > should > >>> > > >> >> > > > >>>> we > >>> > > >> >> > > > >>>> > > directly introduce a nextSegmentWithTrxInx() > >>> > function? > >>> > > >> This > >>> > > >> >> > > would > >>> > > >> >> > > > >>>> allow > >>> > > >> >> > > > >>>> > > implementers to optimize the otherwise linear > >>> scan > >>> > > >> across > >>> > > >> >> > > metadata > >>> > > >> >> > > > >>>> for > >>> > > >> >> > > > >>>> > all > >>> > > >> >> > > > >>>> > > segments by using techniques such as skip list > >>> etc. > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > This is a good point to optimize the scan. We > >>> need > >>> > to > >>> > > >> >> maintain > >>> > > >> >> > > the > >>> > > >> >> > > > >>>> > > skip-list > >>> > > >> >> > > > >>>> > > for each leader-epoch. With unclean leader > >>> election, > >>> > > >> some > >>> > > >> >> > > brokers > >>> > > >> >> > > > >>>> may not > >>> > > >> >> > > > >>>> > > have > >>> > > >> >> > > > >>>> > > the complete lineage. This will expand the > >>> scope of > >>> > > the > >>> > > >> >> work. > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > In this version, we plan to optimize only for > >>> the > >>> > > below > >>> > > >> 2 > >>> > > >> >> > cases: > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > 1. A partition does not have the transaction > >>> index > >>> > for > >>> > > >> any > >>> > > >> >> of > >>> > > >> >> > > the > >>> > > >> >> > > > >>>> > uploaded > >>> > > >> >> > > > >>>> > > segments. > >>> > > >> >> > > > >>>> > > The individual log segments `isTxnIdxEmpty` > >>> flag > >>> > > can > >>> > > >> be > >>> > > >> >> > > reduced > >>> > > >> >> > > > >>>> to a > >>> > > >> >> > > > >>>> > > single flag > >>> > > >> >> > > > >>>> > > in RLMM (using AND operator) that can serve > >>> the > >>> > > >> query - > >>> > > >> >> "Is > >>> > > >> >> > > all > >>> > > >> >> > > > >>>> the > >>> > > >> >> > > > >>>> > > transaction indexes empty for a partition?". > >>> > > >> >> > > > >>>> > > If yes, then we can directly scan the > >>> local-log > >>> > for > >>> > > >> >> aborted > >>> > > >> >> > > > >>>> > > transactions. > >>> > > >> >> > > > >>>> > > 2. A partition is produced using the > >>> transactional > >>> > > >> >> producer. > >>> > > >> >> > The > >>> > > >> >> > > > >>>> > > assumption made is that > >>> > > >> >> > > > >>>> > > the transaction will either > commit/rollback > >>> > within > >>> > > >> 15 > >>> > > >> >> > > minutes > >>> > > >> >> > > > >>>> > > (default transaction.max.timeout.ms = 15 > >>> mins), > >>> > > >> >> possibly > >>> > > >> >> > we > >>> > > >> >> > > > >>>> may have > >>> > > >> >> > > > >>>> > > to search only > >>> > > >> >> > > > >>>> > > a few consecutive remote log segments to > >>> collect > >>> > > the > >>> > > >> >> > aborted > >>> > > >> >> > > > >>>> > > transactions. > >>> > > >> >> > > > >>>> > > 3. A partition is being produced with both > >>> normal > >>> > and > >>> > > >> >> > > > transactional > >>> > > >> >> > > > >>>> > > producers. In this case, > >>> > > >> >> > > > >>>> > > we will be doing linear traversal. > >>> Maintaining a > >>> > > >> >> skip-list > >>> > > >> >> > > > might > >>> > > >> >> > > > >>>> > > improve the performance but > >>> > > >> >> > > > >>>> > > we delegate the RLMM implementation to > >>> users. If > >>> > > >> >> > implemented > >>> > > >> >> > > > >>>> > > incorrectly, then it can lead > >>> > > >> >> > > > >>>> > > to delivery of the aborted transaction > >>> records > >>> > to > >>> > > >> the > >>> > > >> >> > > > consumer. > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > I notice two drawbacks with the reduction > >>> method as > >>> > > >> >> proposed > >>> > > >> >> > in > >>> > > >> >> > > > the > >>> > > >> >> > > > >>>> KIP: > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > 1. Even if one segment has a transaction > index, > >>> then > >>> > > we > >>> > > >> >> have > >>> > > >> >> > to > >>> > > >> >> > > > >>>> iterate > >>> > > >> >> > > > >>>> > > over all the metadata events. > >>> > > >> >> > > > >>>> > > 2. Assume that there are 10 segments and > >>> segment-5 > >>> > > has a > >>> > > >> >> txn > >>> > > >> >> > > > index. > >>> > > >> >> > > > >>>> Once > >>> > > >> >> > > > >>>> > > the first 6 segments are deleted, > >>> > > >> >> > > > >>>> > > due to breach by time/size/start-offset, > >>> then we > >>> > > >> should > >>> > > >> >> > > return > >>> > > >> >> > > > >>>> `true` > >>> > > >> >> > > > >>>> > > for "Is all the transaction indexes empty for > a > >>> > > >> partition?" > >>> > > >> >> > > > >>>> > > query but it will return `false` until the > >>> broker > >>> > > >> gets > >>> > > >> >> > > > restarted > >>> > > >> >> > > > >>>> and > >>> > > >> >> > > > >>>> > we > >>> > > >> >> > > > >>>> > > have to resort to iterate over all the > metadata > >>> > > events. > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > > 5. Potential alternative#2 - We know that we > >>> may > >>> > > want > >>> > > >> the > >>> > > >> >> > > > indexes > >>> > > >> >> > > > >>>> of > >>> > > >> >> > > > >>>> > > multiple higher segments. Instead of fetching > >>> them > >>> > > >> >> > sequentially, > >>> > > >> >> > > > we > >>> > > >> >> > > > >>>> could > >>> > > >> >> > > > >>>> > > implement a parallel fetch or a pre-fetch for > >>> the > >>> > > >> indexes. > >>> > > >> >> > This > >>> > > >> >> > > > >>>> would > >>> > > >> >> > > > >>>> > help > >>> > > >> >> > > > >>>> > > hide the latency of sequentially fetching the > >>> trx > >>> > > >> indexes. > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > We can implement parallel-fetch/prefetch once > >>> the > >>> > > tiered > >>> > > >> >> > storage > >>> > > >> >> > > > is > >>> > > >> >> > > > >>>> GAed. > >>> > > >> >> > > > >>>> > > Since this feature will be useful > >>> > > >> >> > > > >>>> > > to prefetch the next remote log segment and it > >>> > expands > >>> > > >> the > >>> > > >> >> > scope > >>> > > >> >> > > > of > >>> > > >> >> > > > >>>> the > >>> > > >> >> > > > >>>> > > work. > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > > 6. Should the proposed API take "segmentId" > >>> as a > >>> > > >> >> parameter > >>> > > >> >> > > > >>>> instead of > >>> > > >> >> > > > >>>> > > "topicIdPartition"? Suggesting because > >>> isTxnIdEmpty > >>> > is > >>> > > >> not > >>> > > >> >> a > >>> > > >> >> > > > >>>> property of > >>> > > >> >> > > > >>>> > a > >>> > > >> >> > > > >>>> > > partition, instead it's a property of a > specific > >>> > > >> segment. > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > We propose to use the `topicIdPartition` in > >>> > > >> >> > > > >>>> RemoteLogMetadataManager. > >>> > > >> >> > > > >>>> > > The implementation can fold/reduce the value > of > >>> the > >>> > > >> >> individual > >>> > > >> >> > > log > >>> > > >> >> > > > >>>> > segment > >>> > > >> >> > > > >>>> > > `isTxnIdEmpty` flag. This is added to avoid > >>> scanning > >>> > > all > >>> > > >> >> the > >>> > > >> >> > > > >>>> metadata > >>> > > >> >> > > > >>>> > > events > >>> > > >> >> > > > >>>> > > when the partition does not have a transaction > >>> index > >>> > > in > >>> > > >> >> any of > >>> > > >> >> > > the > >>> > > >> >> > > > >>>> > > segments. > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya < > >>> > > >> >> > > > >>>> divijvaidy...@gmail.com> > >>> > > >> >> > > > >>>> > > wrote: > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > >> Hi Kamal > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> Thanks for bringing this up. This is a > problem > >>> > worth > >>> > > >> >> solving. > >>> > > >> >> > > We > >>> > > >> >> > > > >>>> have > >>> > > >> >> > > > >>>> > >> faced > >>> > > >> >> > > > >>>> > >> this in situations where some Kafka clients > >>> default > >>> > > to > >>> > > >> >> > > > >>>> read_committed > >>> > > >> >> > > > >>>> > mode > >>> > > >> >> > > > >>>> > >> and end up having high latencies for remote > >>> fetches > >>> > > >> due to > >>> > > >> >> > this > >>> > > >> >> > > > >>>> > traversal > >>> > > >> >> > > > >>>> > >> across all segments. > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> First some nits to clarify the KIP: > >>> > > >> >> > > > >>>> > >> 1. The motivation should make it clear that > >>> > traversal > >>> > > >> of > >>> > > >> >> all > >>> > > >> >> > > > >>>> segments is > >>> > > >> >> > > > >>>> > >> only in the worst case. If I am not mistaken > >>> > (please > >>> > > >> >> correct > >>> > > >> >> > me > >>> > > >> >> > > > if > >>> > > >> >> > > > >>>> > wrong), > >>> > > >> >> > > > >>>> > >> the traversal stops when it has found a > segment > >>> > > >> containing > >>> > > >> >> > LSO. > >>> > > >> >> > > > >>>> > >> 2. There is nothing like a non-txn topic. A > >>> > > transaction > >>> > > >> >> may > >>> > > >> >> > be > >>> > > >> >> > > > >>>> started > >>> > > >> >> > > > >>>> > on > >>> > > >> >> > > > >>>> > >> any topic. Perhaps, rephrase the statement in > >>> the > >>> > KIP > >>> > > >> so > >>> > > >> >> that > >>> > > >> >> > > it > >>> > > >> >> > > > is > >>> > > >> >> > > > >>>> > clear > >>> > > >> >> > > > >>>> > >> to the reader. > >>> > > >> >> > > > >>>> > >> 3. The hyperlink in the "the broker has to > >>> traverse > >>> > > all > >>> > > >> >> > the..." > >>> > > >> >> > > > >>>> seems > >>> > > >> >> > > > >>>> > >> incorrect. Did you want to point to > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > > >>> > > >> >> > > > >>> > > >> >> > > >>> > > >> >> > >>> > > >> > >>> > > > >>> > > >>> > https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444 > >>> > > >> >> > > > >>>> > >> ? > >>> > > >> >> > > > >>>> > >> 4. In the testing section, could we add a > test > >>> > plan? > >>> > > >> For > >>> > > >> >> > > > example, I > >>> > > >> >> > > > >>>> > would > >>> > > >> >> > > > >>>> > >> list down adding a test which would verify > the > >>> > number > >>> > > >> of > >>> > > >> >> > calls > >>> > > >> >> > > > >>>> made to > >>> > > >> >> > > > >>>> > >> RLMM. This test would have a higher number of > >>> calls > >>> > > >> >> earlier > >>> > > >> >> > vs. > >>> > > >> >> > > > >>>> after > >>> > > >> >> > > > >>>> > this > >>> > > >> >> > > > >>>> > >> KIP. > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> Other thoughts: > >>> > > >> >> > > > >>>> > >> 4. Potential alternative - Instead of having > an > >>> > > >> algorithm > >>> > > >> >> > where > >>> > > >> >> > > > we > >>> > > >> >> > > > >>>> > >> traverse > >>> > > >> >> > > > >>>> > >> across segment metadata and looking for > >>> > isTxnIdxEmpty > >>> > > >> >> flag, > >>> > > >> >> > > > should > >>> > > >> >> > > > >>>> we > >>> > > >> >> > > > >>>> > >> directly introduce a nextSegmentWithTrxInx() > >>> > > function? > >>> > > >> >> This > >>> > > >> >> > > would > >>> > > >> >> > > > >>>> allow > >>> > > >> >> > > > >>>> > >> implementers to optimize the otherwise linear > >>> scan > >>> > > >> across > >>> > > >> >> > > > metadata > >>> > > >> >> > > > >>>> for > >>> > > >> >> > > > >>>> > all > >>> > > >> >> > > > >>>> > >> segments by using techniques such as skip > list > >>> etc. > >>> > > >> >> > > > >>>> > >> 5. Potential alternative#2 - We know that we > >>> may > >>> > want > >>> > > >> the > >>> > > >> >> > > indexes > >>> > > >> >> > > > >>>> of > >>> > > >> >> > > > >>>> > >> multiple higher segments. Instead of fetching > >>> them > >>> > > >> >> > > sequentially, > >>> > > >> >> > > > we > >>> > > >> >> > > > >>>> > could > >>> > > >> >> > > > >>>> > >> implement a parallel fetch or a pre-fetch for > >>> the > >>> > > >> indexes. > >>> > > >> >> > This > >>> > > >> >> > > > >>>> would > >>> > > >> >> > > > >>>> > help > >>> > > >> >> > > > >>>> > >> hide the latency of sequentially fetching the > >>> trx > >>> > > >> indexes. > >>> > > >> >> > > > >>>> > >> 6. Should the proposed API take "segmentId" > as > >>> a > >>> > > >> parameter > >>> > > >> >> > > > instead > >>> > > >> >> > > > >>>> of > >>> > > >> >> > > > >>>> > >> "topicIdPartition"? Suggesting because > >>> isTxnIdEmpty > >>> > > is > >>> > > >> >> not a > >>> > > >> >> > > > >>>> property > >>> > > >> >> > > > >>>> > of a > >>> > > >> >> > > > >>>> > >> partition, instead it's a property of a > >>> specific > >>> > > >> segment. > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> Looking forward to hearing your thoughts > about > >>> the > >>> > > >> >> > > alternatives. > >>> > > >> >> > > > >>>> Let's > >>> > > >> >> > > > >>>> > get > >>> > > >> >> > > > >>>> > >> this fixed. > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> -- > >>> > > >> >> > > > >>>> > >> Divij Vaidya > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal > >>> > > Chandraprakash < > >>> > > >> >> > > > >>>> > >> kamal.chandraprak...@gmail.com> wrote: > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> > Hi all, > >>> > > >> >> > > > >>>> > >> > > >>> > > >> >> > > > >>>> > >> > I have opened a KIP-1058 > >>> > > >> >> > > > >>>> > >> > < > >>> > > >> >> > > > >>>> > >> > > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > > >>> > > >> >> > > > >>> > > >> >> > > >>> > > >> >> > >>> > > >> > >>> > > > >>> > > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic > >>> > > >> >> > > > >>>> > >> > > > >>> > > >> >> > > > >>>> > >> > to reduce the pressure on remote storage > when > >>> > > >> >> transactional > >>> > > >> >> > > > >>>> consumers > >>> > > >> >> > > > >>>> > >> are > >>> > > >> >> > > > >>>> > >> > reading non-txn topics from remote storage. > >>> > > >> >> > > > >>>> > >> > > >>> > > >> >> > > > >>>> > >> > > >>> > > >> >> > > > >>>> > >> > > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > > >>> > > >> >> > > > >>> > > >> >> > > >>> > > >> >> > >>> > > >> > >>> > > > >>> > > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic > >>> > > >> >> > > > >>>> > >> > > >>> > > >> >> > > > >>>> > >> > Feedbacks and suggestions are welcome. > >>> > > >> >> > > > >>>> > >> > > >>> > > >> >> > > > >>>> > >> > Thanks, > >>> > > >> >> > > > >>>> > >> > Kamal > >>> > > >> >> > > > >>>> > >> > > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > > > >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>> > >>> > > >> >> > > > > >>> > > >> >> > > > >>> > > >> >> > > >>> > > >> >> > >>> > > >> > > >>> > > >> > >>> > > > > >>> > > > >>> > > >>> > >> >