Thanks for the review! > 5 Updated the KIP to include the `isEmpty` method in the transaction index
> 6 You're right. The offset parameter will be equal to the next-segment-to-consider base offset. But, the API introduced in RLMM is for *one* epoch. The next epoch start-offset may not be the base-offset. You can refer to this line <https://github.com/apache/kafka/pull/17659/files#diff-380e4d8859ea9148f21794c09039425c82d9012a392c2dbbe1ce2ec8677a1970R1857> in the draft PR. On Fri, Nov 1, 2024 at 3:16 PM Divij Vaidya <divijvaidy...@gmail.com> wrote: > Last few things - > > # 5 > About setting the TrxIndexEmpty field, could we introduce an isEmpty() > function in TransactionIndex which has the following implementation: > > public boolean isEmpty() { > return !iterable().iterator().hasNext(); > } > > > The advantages of this approach is: > 1. It works for both cases when the file is not present and also when the > file is present but is empty. > 2. It prevents leaking the underlying implementation of TransactionIndex > outside via the file() method. I think that making file() as public is an > implementation leak (for example, what is the trx indx is not file based!). > > > #6 > In the documentation for nextSegmentWithTxnIndex, the offset parameter > should be equal to the next-segment-to-consider's base offset, no? > I assume that we will add a new fetch here with nextSegmentBaseOffset > > https://github.com/apache/kafka/blob/346fdbafc539bc48bb66eedae89a15e240007fd9/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1801 > . Is there a case where the parameter "offset" will not be equal to the > baseOffset of a segment? > > -- > Divij Vaidya > > > > On Fri, Nov 1, 2024 at 10:26 AM Kamal Chandraprakash < > kamal.chandraprak...@gmail.com> wrote: > > > Hi Divij, > > > > Thanks for the detailed review! > > > > > 1, 2, 3, 4 > > Updated the KIP-1058 > > < > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions > > > > > with the feedback received and also opened a draft PR for #17659 > > <https://github.com/apache/kafka/pull/17659/files> reference. > > PTAL. > > > > > 5. How are we determining the value of the TrxIndexEmpty field on > segment > > rotation? > > Transaction index file is optional, the file does not exists when there > are > > no aborted txn entries for a > > segment, we will be using the file null check. Also, updated it in the > KIP. > > > > Thanks, > > Kamal > > > > > > On Tue, Oct 29, 2024 at 8:47 PM Divij Vaidya <divijvaidy...@gmail.com> > > wrote: > > > > > A few more points to discuss (please add to the KIP as well) > > > > > > 5. How are we determining the value of the TrxIndexEmpty field on > segment > > > rotation? > > > > > > One option is to do a boolean txnIdxEmpty = > > > segment.txnIndex().allAbortedTxns().isEmpty() but this will have an > > > overhead of reading the contents of the file and storing them in > memory, > > > when we have a non-empty index. > > > The other option (preferred) is to add an isEmpty() public method to > the > > > TransactionIndex and perform a segment.txnIndex().isEmpty() check which > > > will internally use Files.size() java API. > > > > > > On Tue, Oct 29, 2024 at 1:21 PM Divij Vaidya <divijvaidy...@gmail.com> > > > wrote: > > > > > > > Let's get the ball rolling (again) on this one. > > > > > > > > Kamal, could you please add the following to the KIP: > > > > 1. the API as discussed above. Please add the failure modes for this > > API > > > > as well such as the exceptions thrown and a recommendation on how a > > > caller > > > > is expected to handle those. I am assuming that the three parameters > > for > > > > this API will be topicPartition, epoch and offset. > > > > 2. implementation details for Topic based RLMM. I am assuming that > the > > > > plugin will default the field to false if this field is absent (case > of > > > old > > > > metadata). > > > > 3. In the test plan section, additionally, we need to assert that we > > > don't > > > > read metadata for all segments (i.e. it is not a linear search) from > > the > > > > Topic based RLMM. > > > > 4. in the compatibility section, please document how the existing > > > clusters > > > > with Tiered Storage metadata will work during/after a rolling upgrade > > to > > > a > > > > version which contains this new change. > > > > > > > > -- > > > > Divij Vaidya > > > > > > > > > > > > > > > > On Fri, Oct 11, 2024 at 12:26 PM Kamal Chandraprakash < > > > > kamal.chandraprak...@gmail.com> wrote: > > > > > > > >> Bump for review. > > > >> > > > >> If the additional proposal looks good, I'll append them to the KIP. > > > PTAL. > > > >> > > > >> New API in RLMM#nextRemoteLogSegmentMetadataWithTxnIndex > > > >> > > > >> -- > > > >> Kamal > > > >> > > > >> On Sun, Oct 6, 2024 at 7:20 PM Kamal Chandraprakash < > > > >> kamal.chandraprak...@gmail.com> wrote: > > > >> > > > >> > Hi Christo, > > > >> > > > > >> > Thanks for the review! > > > >> > > > > >> > Adding the new API `nextRemoteLogSegmentMetadataWithTxnIndex` in > > RLMM > > > >> > helps to > > > >> > reduce the complexity of linear search. With this API, we have to: > > > >> > > > > >> > 1. Maintain one more skip-list [1] for each of the epochs in the > > > >> partition > > > >> > in RLMM that might > > > >> > increase the memory usage of TopicBased RLMM implementation. > > > >> > 1a) The skip-list will be empty when there are no aborted txn > > > >> entries > > > >> > for a partition/epoch which is the predominant case. > > > >> > 1b) The skip-list will act as a duplicate when *most* of the > > > >> segments > > > >> > have aborted txn entries, assuming aborted txn are quite low, this > > > >> should > > > >> > be fine. > > > >> > 2. Change the logic to retrieve the aborted txns (we have to query > > the > > > >> > nextRLSMWithTxnIndex > > > >> > for each of the leader-epoch). > > > >> > 3. Logic divergence from how we retrieve the aborted txn entries > > > >> compared > > > >> > to the local-log. > > > >> > > > > >> > The approach looks good to me. If everyone is aligned, then we can > > > >> proceed > > > >> > to add this API to RLMM. > > > >> > > > > >> > Another option I was thinking of is to capture the > > > `lastStableOffsetLag` > > > >> > [2] while rotating the segment. > > > >> > But, that is a bigger change we can take later. > > > >> > > > > >> > [1]: > > > >> > > > > >> > > > > > > https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java?L43 > > > >> > [2]: > > > >> > > > > >> > > > > > > https://sourcegraph.com/github.com/apache/kafka/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L432 > > > >> > > > > >> > > > > >> > Thanks, > > > >> > Kamal > > > >> > > > > >> > On Fri, Oct 4, 2024 at 4:21 PM Christo Lolov < > > christolo...@gmail.com> > > > >> > wrote: > > > >> > > > > >> >> Heya, > > > >> >> > > > >> >> Apologies for the delay. I have been thinking about this problem > > > >> recently > > > >> >> as well and while I believe storing a boolean in the metadata is > > > good, > > > >> I > > > >> >> think we can do better by introducing a new method to the RLMM > > along > > > >> the > > > >> >> lines of > > > >> >> > > > >> >> Optional<RemoteLogSegmentMetadata> > > > >> >> nextRemoteLogSegmentMetadataWithTxnIndex(TopicIdPartition > > > >> >> topicIdPartition, > > > >> >> int epochForOffset, long offset) throws RemoteStorageException > > > >> >> > > > >> >> This will help plugin implementers to build optimisations such as > > > skip > > > >> >> lists which will give them the next segment quicker than a linear > > > >> search. > > > >> >> > > > >> >> I am keen to hear your thoughts! > > > >> >> > > > >> >> Best, > > > >> >> Christo > > > >> >> > > > >> >> On Fri, 4 Oct 2024 at 10:48, Kamal Chandraprakash < > > > >> >> kamal.chandraprak...@gmail.com> wrote: > > > >> >> > > > >> >> > Hi Luke, > > > >> >> > > > > >> >> > Thanks for the review! > > > >> >> > > > > >> >> > > Do you think it is helpful if we store the "least abort start > > > >> offset > > > >> >> in > > > >> >> > the > > > >> >> > segment", and -1 means no txnIndex. So that we can have a way > to > > > know > > > >> >> if we > > > >> >> > need to fetch this txn index or not. > > > >> >> > > > > >> >> > 1. No, this change won't have an effect. To find the > upper-bound > > > >> offset > > > >> >> > [1], we have to > > > >> >> > fetch that segment's offset index file. The > RemoteIndexCache > > > [2] > > > >> >> > fetches all the 3 > > > >> >> > index files together and caches them for subsequent use, so > > > this > > > >> >> > improvement > > > >> >> > won't have an effect as the current segment txn index gets > > > >> >> downloaded > > > >> >> > anyway. > > > >> >> > > > > >> >> > 2. The reason for choosing boolean is to make the change > backward > > > >> >> > compatible. > > > >> >> > There can be existing RLM events for the uploaded > segments. > > > The > > > >> >> > default > > > >> >> > value of `txnIdxEmpty` is false so the *old* RLM events > are > > > >> >> assumed to > > > >> >> > contain > > > >> >> > the txn index files and those files are downloaded if they > > > >> exist. > > > >> >> > > > > >> >> > [1]: > > > >> >> > > > > >> >> > > > > >> >> > > > >> > > > > > > https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1732 > > > >> >> > [2]: > > > >> >> > > > > >> >> > > > > >> >> > > > >> > > > > > > https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java?L383 > > > >> >> > > > > >> >> > Thanks, > > > >> >> > Kamal > > > >> >> > > > > >> >> > On Thu, Oct 3, 2024 at 3:11 PM Luke Chen <show...@gmail.com> > > > wrote: > > > >> >> > > > > >> >> > > Hi Kamal, > > > >> >> > > > > > >> >> > > Sorry for the late review. > > > >> >> > > Thanks for the KIP, this will improve the transaction reading > > for > > > >> >> remote > > > >> >> > > storage. > > > >> >> > > Overall LGTM, just one minor thought: > > > >> >> > > > > > >> >> > > Currently, we only store the `TxnIndexEmpty` bool value in > the > > > >> segment > > > >> >> > > metadata. > > > >> >> > > Do you think it is helpful if we store the "least abort start > > > >> offset > > > >> >> in > > > >> >> > the > > > >> >> > > segment", and -1 means no txnIndex. So that we can have a way > > to > > > >> know > > > >> >> if > > > >> >> > we > > > >> >> > > need to fetch this txn index or not. > > > >> >> > > > > > >> >> > > Thanks. > > > >> >> > > Luke > > > >> >> > > > > > >> >> > > On Mon, Sep 9, 2024 at 3:26 PM Kamal Chandraprakash < > > > >> >> > > kamal.chandraprak...@gmail.com> wrote: > > > >> >> > > > > > >> >> > > > Hi all, > > > >> >> > > > > > > >> >> > > > If there are no more comments, I'll start a voting thread > > soon. > > > >> >> > > > > > > >> >> > > > Thanks, > > > >> >> > > > Kamal > > > >> >> > > > > > > >> >> > > > On Fri, Sep 6, 2024 at 7:28 PM Kamal Chandraprakash < > > > >> >> > > > kamal.chandraprak...@gmail.com> wrote: > > > >> >> > > > > > > >> >> > > > > Bumping this thread again for review! > > > >> >> > > > > > > > >> >> > > > > Reduced the scope of the proposal to minimum. We will be > > > adding > > > >> >> only > > > >> >> > > one > > > >> >> > > > > field (txnIdxEmpty) to the > > > >> >> > > > > RemoteLogSegmentMetadata event which is backward > > compatible. > > > >> PTAL. > > > >> >> > > > > > > > >> >> > > > > Thanks, > > > >> >> > > > > Kamal > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > On Tue, Aug 13, 2024 at 6:33 PM Kamal Chandraprakash < > > > >> >> > > > > kamal.chandraprak...@gmail.com> wrote: > > > >> >> > > > > > > > >> >> > > > >> Bumping this thread for KIP review! > > > >> >> > > > >> > > > >> >> > > > >> We can go for the simplest solution that is proposed in > > this > > > >> KIP > > > >> >> and > > > >> >> > > > >> it can be improved in the subsequent iteration. PTAL. > > > >> >> > > > >> > > > >> >> > > > >> Thanks, > > > >> >> > > > >> Kamal > > > >> >> > > > >> > > > >> >> > > > >> On Fri, Aug 2, 2024 at 11:42 AM Kamal Chandraprakash < > > > >> >> > > > >> kamal.chandraprak...@gmail.com> wrote: > > > >> >> > > > >> > > > >> >> > > > >>> Hi Divij, > > > >> >> > > > >>> > > > >> >> > > > >>> Thanks for the review! And, sorry for the late reply. > > > >> >> > > > >>> > > > >> >> > > > >>> From the UnifiedLog.scala > > > >> >> > > > >>> < > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> > > > > > > https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L421-427 > > > >> >> > > > > > > > >> >> > > > >>> doc: > > > >> >> > > > >>> > > > >> >> > > > >>> """ > > > >> >> > > > >>> The last stable offset (LSO) is defined as the first > > offset > > > >> such > > > >> >> > that > > > >> >> > > > >>> all lower offsets have been "decided." > > > >> >> > > > >>> * Non-transactional messages are considered decided > > > >> >> immediately, > > > >> >> > > but > > > >> >> > > > >>> transactional messages are only decided when > > > >> >> > > > >>> * the corresponding COMMIT or ABORT marker is > written. > > > >> This > > > >> >> > > implies > > > >> >> > > > >>> that the last stable offset will be equal > > > >> >> > > > >>> * to the high watermark if there are no > transactional > > > >> >> messages > > > >> >> > in > > > >> >> > > > the > > > >> >> > > > >>> log. Note also that the LSO cannot advance > > > >> >> > > > >>> * beyond the high watermark. > > > >> >> > > > >>> """ > > > >> >> > > > >>> While rolling the active segment to passive, if LSO > > equals > > > to > > > >> >> HW, > > > >> >> > > then > > > >> >> > > > >>> all the messages in that segment are > > > >> >> > > > >>> decided and we can store the `lastStableOffsetLag` as > an > > > >> >> attribute > > > >> >> > in > > > >> >> > > > >>> the rolled segment. We can then propagate > > > >> >> > > > >>> the `lastStableOffsetLag` information in the > > > >> RemoteLogMetadata > > > >> >> > > events. > > > >> >> > > > >>> > > > >> >> > > > >>> While reading the remote log segment, if the > > > >> >> `lastStableOffsetLag` > > > >> >> > is > > > >> >> > > > 0, > > > >> >> > > > >>> then there is no need to traverse to > > > >> >> > > > >>> the subsequent segments for aborted transactions which > > > covers > > > >> >> the > > > >> >> > > case > > > >> >> > > > >>> for the dominant case where the > > > >> >> > > > >>> partition had no transactions at all. > > > >> >> > > > >>> > > > >> >> > > > >>> With Log compaction, the shrinked segments might get > > > merged. > > > >> One > > > >> >> > > option > > > >> >> > > > >>> is to take the max of `lastStableOffsetLag` > > > >> >> > > > >>> and store it in the new LogSegment. Since, the tiered > > > storage > > > >> >> does > > > >> >> > > not > > > >> >> > > > >>> support compacted topics / historical compacted > > > >> >> > > > >>> topics, we can omit this case. > > > >> >> > > > >>> > > > >> >> > > > >>> If this approach looks good, I can update the KIP with > > the > > > >> >> details. > > > >> >> > > > >>> > > > >> >> > > > >>> -- > > > >> >> > > > >>> Kamal > > > >> >> > > > >>> > > > >> >> > > > >>> > > > >> >> > > > >>> > > > >> >> > > > >>> > > > >> >> > > > >>> On Tue, Jun 25, 2024 at 4:24 PM Divij Vaidya < > > > >> >> > > divijvaidy...@gmail.com> > > > >> >> > > > >>> wrote: > > > >> >> > > > >>> > > > >> >> > > > >>>> Hi Kamal > > > >> >> > > > >>>> > > > >> >> > > > >>>> Thanks for the bump. I have been thinking about this > > > >> passively > > > >> >> for > > > >> >> > > the > > > >> >> > > > >>>> past > > > >> >> > > > >>>> few days. > > > >> >> > > > >>>> > > > >> >> > > > >>>> The simplest solution is to store a state at segment > > level > > > >> >> > metadata. > > > >> >> > > > The > > > >> >> > > > >>>> state should specify whether the trx index is empty or > > > not. > > > >> It > > > >> >> > would > > > >> >> > > > be > > > >> >> > > > >>>> populated during segment archival. We would then > iterate > > > >> over > > > >> >> the > > > >> >> > > > >>>> metadata > > > >> >> > > > >>>> for future segments without having to make a remote > call > > > to > > > >> >> > download > > > >> >> > > > the > > > >> >> > > > >>>> trx index itself. > > > >> >> > > > >>>> > > > >> >> > > > >>>> The other solution for storing state at a partition > > level > > > >> >> wouldn't > > > >> >> > > > >>>> work, as > > > >> >> > > > >>>> you mentioned, because we will have to change the > state > > on > > > >> >> every > > > >> >> > > > >>>> mutation > > > >> >> > > > >>>> to the log i.e. at expiration of segments and append. > > > >> >> > > > >>>> > > > >> >> > > > >>>> I have been thinking whether we can do something > better > > > than > > > >> >> the > > > >> >> > > > simple > > > >> >> > > > >>>> solution, hence the delay in replying. Let me tell you > > my > > > >> half > > > >> >> > baked > > > >> >> > > > >>>> train > > > >> >> > > > >>>> of thoughts, perhaps, you can explore this as well. I > > have > > > >> been > > > >> >> > > > thinking > > > >> >> > > > >>>> about using LSO (last stable offset) to handle the > case > > > when > > > >> >> the > > > >> >> > > > >>>> partition > > > >> >> > > > >>>> never had any transactions. For a partition which > never > > > had > > > >> any > > > >> >> > > > >>>> transaction, I would assume that the LSO is never > > > >> initialized > > > >> >> (or > > > >> >> > is > > > >> >> > > > >>>> equal > > > >> >> > > > >>>> to log start offset)? Or is it equal to HW in that > case? > > > >> This > > > >> >> is > > > >> >> > > > >>>> something > > > >> >> > > > >>>> that I am yet to verify. If this idea works, then we > > would > > > >> not > > > >> >> > have > > > >> >> > > to > > > >> >> > > > >>>> iterate through the metadata for the dominant case > where > > > the > > > >> >> > > partition > > > >> >> > > > >>>> had > > > >> >> > > > >>>> no transactions at all. > > > >> >> > > > >>>> > > > >> >> > > > >>>> -- > > > >> >> > > > >>>> Divij Vaidya > > > >> >> > > > >>>> > > > >> >> > > > >>>> > > > >> >> > > > >>>> > > > >> >> > > > >>>> On Tue, Jun 25, 2024 at 11:42 AM Kamal Chandraprakash > < > > > >> >> > > > >>>> kamal.chandraprak...@gmail.com> wrote: > > > >> >> > > > >>>> > > > >> >> > > > >>>> > Bump. Please review this proposal. > > > >> >> > > > >>>> > > > > >> >> > > > >>>> > > > > >> >> > > > >>>> > On Mon, Jun 17, 2024 at 6:55 PM Kamal > Chandraprakash < > > > >> >> > > > >>>> > kamal.chandraprak...@gmail.com> wrote: > > > >> >> > > > >>>> > > > > >> >> > > > >>>> > > Divij, > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > Thanks for the review! Updated the KIP with 1, 2, > 3, > > > >> and 4 > > > >> >> > > review > > > >> >> > > > >>>> > > comments. > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > > 4. Potential alternative - Instead of having an > > > >> algorithm > > > >> >> > > where > > > >> >> > > > we > > > >> >> > > > >>>> > > traverse > > > >> >> > > > >>>> > > across segment metadata and looking for > > isTxnIdxEmpty > > > >> flag, > > > >> >> > > should > > > >> >> > > > >>>> we > > > >> >> > > > >>>> > > directly introduce a nextSegmentWithTrxInx() > > function? > > > >> This > > > >> >> > > would > > > >> >> > > > >>>> allow > > > >> >> > > > >>>> > > implementers to optimize the otherwise linear scan > > > >> across > > > >> >> > > metadata > > > >> >> > > > >>>> for > > > >> >> > > > >>>> > all > > > >> >> > > > >>>> > > segments by using techniques such as skip list > etc. > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > This is a good point to optimize the scan. We need > > to > > > >> >> maintain > > > >> >> > > the > > > >> >> > > > >>>> > > skip-list > > > >> >> > > > >>>> > > for each leader-epoch. With unclean leader > election, > > > >> some > > > >> >> > > brokers > > > >> >> > > > >>>> may not > > > >> >> > > > >>>> > > have > > > >> >> > > > >>>> > > the complete lineage. This will expand the scope > of > > > the > > > >> >> work. > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > In this version, we plan to optimize only for the > > > below > > > >> 2 > > > >> >> > cases: > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > 1. A partition does not have the transaction index > > for > > > >> any > > > >> >> of > > > >> >> > > the > > > >> >> > > > >>>> > uploaded > > > >> >> > > > >>>> > > segments. > > > >> >> > > > >>>> > > The individual log segments `isTxnIdxEmpty` > flag > > > can > > > >> be > > > >> >> > > reduced > > > >> >> > > > >>>> to a > > > >> >> > > > >>>> > > single flag > > > >> >> > > > >>>> > > in RLMM (using AND operator) that can serve the > > > >> query - > > > >> >> "Is > > > >> >> > > all > > > >> >> > > > >>>> the > > > >> >> > > > >>>> > > transaction indexes empty for a partition?". > > > >> >> > > > >>>> > > If yes, then we can directly scan the local-log > > for > > > >> >> aborted > > > >> >> > > > >>>> > > transactions. > > > >> >> > > > >>>> > > 2. A partition is produced using the transactional > > > >> >> producer. > > > >> >> > The > > > >> >> > > > >>>> > > assumption made is that > > > >> >> > > > >>>> > > the transaction will either commit/rollback > > within > > > >> 15 > > > >> >> > > minutes > > > >> >> > > > >>>> > > (default transaction.max.timeout.ms = 15 > mins), > > > >> >> possibly > > > >> >> > we > > > >> >> > > > >>>> may have > > > >> >> > > > >>>> > > to search only > > > >> >> > > > >>>> > > a few consecutive remote log segments to > collect > > > the > > > >> >> > aborted > > > >> >> > > > >>>> > > transactions. > > > >> >> > > > >>>> > > 3. A partition is being produced with both normal > > and > > > >> >> > > > transactional > > > >> >> > > > >>>> > > producers. In this case, > > > >> >> > > > >>>> > > we will be doing linear traversal. > Maintaining a > > > >> >> skip-list > > > >> >> > > > might > > > >> >> > > > >>>> > > improve the performance but > > > >> >> > > > >>>> > > we delegate the RLMM implementation to users. > If > > > >> >> > implemented > > > >> >> > > > >>>> > > incorrectly, then it can lead > > > >> >> > > > >>>> > > to delivery of the aborted transaction records > > to > > > >> the > > > >> >> > > > consumer. > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > I notice two drawbacks with the reduction method > as > > > >> >> proposed > > > >> >> > in > > > >> >> > > > the > > > >> >> > > > >>>> KIP: > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > 1. Even if one segment has a transaction index, > then > > > we > > > >> >> have > > > >> >> > to > > > >> >> > > > >>>> iterate > > > >> >> > > > >>>> > > over all the metadata events. > > > >> >> > > > >>>> > > 2. Assume that there are 10 segments and segment-5 > > > has a > > > >> >> txn > > > >> >> > > > index. > > > >> >> > > > >>>> Once > > > >> >> > > > >>>> > > the first 6 segments are deleted, > > > >> >> > > > >>>> > > due to breach by time/size/start-offset, then > we > > > >> should > > > >> >> > > return > > > >> >> > > > >>>> `true` > > > >> >> > > > >>>> > > for "Is all the transaction indexes empty for a > > > >> partition?" > > > >> >> > > > >>>> > > query but it will return `false` until the > broker > > > >> gets > > > >> >> > > > restarted > > > >> >> > > > >>>> and > > > >> >> > > > >>>> > we > > > >> >> > > > >>>> > > have to resort to iterate over all the metadata > > > events. > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > > 5. Potential alternative#2 - We know that we may > > > want > > > >> the > > > >> >> > > > indexes > > > >> >> > > > >>>> of > > > >> >> > > > >>>> > > multiple higher segments. Instead of fetching them > > > >> >> > sequentially, > > > >> >> > > > we > > > >> >> > > > >>>> could > > > >> >> > > > >>>> > > implement a parallel fetch or a pre-fetch for the > > > >> indexes. > > > >> >> > This > > > >> >> > > > >>>> would > > > >> >> > > > >>>> > help > > > >> >> > > > >>>> > > hide the latency of sequentially fetching the trx > > > >> indexes. > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > We can implement parallel-fetch/prefetch once the > > > tiered > > > >> >> > storage > > > >> >> > > > is > > > >> >> > > > >>>> GAed. > > > >> >> > > > >>>> > > Since this feature will be useful > > > >> >> > > > >>>> > > to prefetch the next remote log segment and it > > expands > > > >> the > > > >> >> > scope > > > >> >> > > > of > > > >> >> > > > >>>> the > > > >> >> > > > >>>> > > work. > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > > 6. Should the proposed API take "segmentId" as a > > > >> >> parameter > > > >> >> > > > >>>> instead of > > > >> >> > > > >>>> > > "topicIdPartition"? Suggesting because > isTxnIdEmpty > > is > > > >> not > > > >> >> a > > > >> >> > > > >>>> property of > > > >> >> > > > >>>> > a > > > >> >> > > > >>>> > > partition, instead it's a property of a specific > > > >> segment. > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > We propose to use the `topicIdPartition` in > > > >> >> > > > >>>> RemoteLogMetadataManager. > > > >> >> > > > >>>> > > The implementation can fold/reduce the value of > the > > > >> >> individual > > > >> >> > > log > > > >> >> > > > >>>> > segment > > > >> >> > > > >>>> > > `isTxnIdEmpty` flag. This is added to avoid > scanning > > > all > > > >> >> the > > > >> >> > > > >>>> metadata > > > >> >> > > > >>>> > > events > > > >> >> > > > >>>> > > when the partition does not have a transaction > index > > > in > > > >> >> any of > > > >> >> > > the > > > >> >> > > > >>>> > > segments. > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya < > > > >> >> > > > >>>> divijvaidy...@gmail.com> > > > >> >> > > > >>>> > > wrote: > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > >> Hi Kamal > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > >> Thanks for bringing this up. This is a problem > > worth > > > >> >> solving. > > > >> >> > > We > > > >> >> > > > >>>> have > > > >> >> > > > >>>> > >> faced > > > >> >> > > > >>>> > >> this in situations where some Kafka clients > default > > > to > > > >> >> > > > >>>> read_committed > > > >> >> > > > >>>> > mode > > > >> >> > > > >>>> > >> and end up having high latencies for remote > fetches > > > >> due to > > > >> >> > this > > > >> >> > > > >>>> > traversal > > > >> >> > > > >>>> > >> across all segments. > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > >> First some nits to clarify the KIP: > > > >> >> > > > >>>> > >> 1. The motivation should make it clear that > > traversal > > > >> of > > > >> >> all > > > >> >> > > > >>>> segments is > > > >> >> > > > >>>> > >> only in the worst case. If I am not mistaken > > (please > > > >> >> correct > > > >> >> > me > > > >> >> > > > if > > > >> >> > > > >>>> > wrong), > > > >> >> > > > >>>> > >> the traversal stops when it has found a segment > > > >> containing > > > >> >> > LSO. > > > >> >> > > > >>>> > >> 2. There is nothing like a non-txn topic. A > > > transaction > > > >> >> may > > > >> >> > be > > > >> >> > > > >>>> started > > > >> >> > > > >>>> > on > > > >> >> > > > >>>> > >> any topic. Perhaps, rephrase the statement in the > > KIP > > > >> so > > > >> >> that > > > >> >> > > it > > > >> >> > > > is > > > >> >> > > > >>>> > clear > > > >> >> > > > >>>> > >> to the reader. > > > >> >> > > > >>>> > >> 3. The hyperlink in the "the broker has to > traverse > > > all > > > >> >> > the..." > > > >> >> > > > >>>> seems > > > >> >> > > > >>>> > >> incorrect. Did you want to point to > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > > > > >> >> > > > >>>> > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> > > > > > > https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444 > > > >> >> > > > >>>> > >> ? > > > >> >> > > > >>>> > >> 4. In the testing section, could we add a test > > plan? > > > >> For > > > >> >> > > > example, I > > > >> >> > > > >>>> > would > > > >> >> > > > >>>> > >> list down adding a test which would verify the > > number > > > >> of > > > >> >> > calls > > > >> >> > > > >>>> made to > > > >> >> > > > >>>> > >> RLMM. This test would have a higher number of > calls > > > >> >> earlier > > > >> >> > vs. > > > >> >> > > > >>>> after > > > >> >> > > > >>>> > this > > > >> >> > > > >>>> > >> KIP. > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > >> Other thoughts: > > > >> >> > > > >>>> > >> 4. Potential alternative - Instead of having an > > > >> algorithm > > > >> >> > where > > > >> >> > > > we > > > >> >> > > > >>>> > >> traverse > > > >> >> > > > >>>> > >> across segment metadata and looking for > > isTxnIdxEmpty > > > >> >> flag, > > > >> >> > > > should > > > >> >> > > > >>>> we > > > >> >> > > > >>>> > >> directly introduce a nextSegmentWithTrxInx() > > > function? > > > >> >> This > > > >> >> > > would > > > >> >> > > > >>>> allow > > > >> >> > > > >>>> > >> implementers to optimize the otherwise linear > scan > > > >> across > > > >> >> > > > metadata > > > >> >> > > > >>>> for > > > >> >> > > > >>>> > all > > > >> >> > > > >>>> > >> segments by using techniques such as skip list > etc. > > > >> >> > > > >>>> > >> 5. Potential alternative#2 - We know that we may > > want > > > >> the > > > >> >> > > indexes > > > >> >> > > > >>>> of > > > >> >> > > > >>>> > >> multiple higher segments. Instead of fetching > them > > > >> >> > > sequentially, > > > >> >> > > > we > > > >> >> > > > >>>> > could > > > >> >> > > > >>>> > >> implement a parallel fetch or a pre-fetch for the > > > >> indexes. > > > >> >> > This > > > >> >> > > > >>>> would > > > >> >> > > > >>>> > help > > > >> >> > > > >>>> > >> hide the latency of sequentially fetching the trx > > > >> indexes. > > > >> >> > > > >>>> > >> 6. Should the proposed API take "segmentId" as a > > > >> parameter > > > >> >> > > > instead > > > >> >> > > > >>>> of > > > >> >> > > > >>>> > >> "topicIdPartition"? Suggesting because > isTxnIdEmpty > > > is > > > >> >> not a > > > >> >> > > > >>>> property > > > >> >> > > > >>>> > of a > > > >> >> > > > >>>> > >> partition, instead it's a property of a specific > > > >> segment. > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > >> Looking forward to hearing your thoughts about > the > > > >> >> > > alternatives. > > > >> >> > > > >>>> Let's > > > >> >> > > > >>>> > get > > > >> >> > > > >>>> > >> this fixed. > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > >> -- > > > >> >> > > > >>>> > >> Divij Vaidya > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal > > > Chandraprakash < > > > >> >> > > > >>>> > >> kamal.chandraprak...@gmail.com> wrote: > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > >> > Hi all, > > > >> >> > > > >>>> > >> > > > > >> >> > > > >>>> > >> > I have opened a KIP-1058 > > > >> >> > > > >>>> > >> > < > > > >> >> > > > >>>> > >> > > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > > > > >> >> > > > >>>> > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic > > > >> >> > > > >>>> > >> > > > > > >> >> > > > >>>> > >> > to reduce the pressure on remote storage when > > > >> >> transactional > > > >> >> > > > >>>> consumers > > > >> >> > > > >>>> > >> are > > > >> >> > > > >>>> > >> > reading non-txn topics from remote storage. > > > >> >> > > > >>>> > >> > > > > >> >> > > > >>>> > >> > > > > >> >> > > > >>>> > >> > > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > > > > >> >> > > > >>>> > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic > > > >> >> > > > >>>> > >> > > > > >> >> > > > >>>> > >> > Feedbacks and suggestions are welcome. > > > >> >> > > > >>>> > >> > > > > >> >> > > > >>>> > >> > Thanks, > > > >> >> > > > >>>> > >> > Kamal > > > >> >> > > > >>>> > >> > > > > >> >> > > > >>>> > >> > > > >> >> > > > >>>> > > > > > >> >> > > > >>>> > > > > >> >> > > > >>>> > > > >> >> > > > >>> > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> > > > > >> > > > > > > > > > >