Hi all, If the review comments are addressed and KIP looks good, please submit your vote on the voting thread.
Thanks, Kamal On Sat, Nov 2, 2024 at 7:52 PM Kamal Chandraprakash < kamal.chandraprak...@gmail.com> wrote: > I've opened a PR which adds an integration test > <https://github.com/apache/kafka/pull/17668/files#diff-a60b518846fc0f770164d55b6d7cd31e03a002514377578c80c6e22cc120af40R88> > to show the impact of this change/KIP. PTAL. > > -- > Kamal > > On Fri, Nov 1, 2024 at 3:31 PM Kamal Chandraprakash < > kamal.chandraprak...@gmail.com> wrote: > >> Thanks for the review! >> >> > 5 >> Updated the KIP to include the `isEmpty` method in the transaction index >> >> > 6 >> You're right. The offset parameter will be equal to the >> next-segment-to-consider base offset. >> But, the API introduced in RLMM is for *one* epoch. The next epoch >> start-offset may not be >> the base-offset. You can refer to this line >> <https://github.com/apache/kafka/pull/17659/files#diff-380e4d8859ea9148f21794c09039425c82d9012a392c2dbbe1ce2ec8677a1970R1857> >> in the draft PR. >> >> >> On Fri, Nov 1, 2024 at 3:16 PM Divij Vaidya <divijvaidy...@gmail.com> >> wrote: >> >>> Last few things - >>> >>> # 5 >>> About setting the TrxIndexEmpty field, could we introduce an isEmpty() >>> function in TransactionIndex which has the following implementation: >>> >>> public boolean isEmpty() { >>> return !iterable().iterator().hasNext(); >>> } >>> >>> >>> The advantages of this approach is: >>> 1. It works for both cases when the file is not present and also when the >>> file is present but is empty. >>> 2. It prevents leaking the underlying implementation of TransactionIndex >>> outside via the file() method. I think that making file() as public is an >>> implementation leak (for example, what is the trx indx is not file >>> based!). >>> >>> >>> #6 >>> In the documentation for nextSegmentWithTxnIndex, the offset parameter >>> should be equal to the next-segment-to-consider's base offset, no? >>> I assume that we will add a new fetch here with nextSegmentBaseOffset >>> >>> https://github.com/apache/kafka/blob/346fdbafc539bc48bb66eedae89a15e240007fd9/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1801 >>> . Is there a case where the parameter "offset" will not be equal to the >>> baseOffset of a segment? >>> >>> -- >>> Divij Vaidya >>> >>> >>> >>> On Fri, Nov 1, 2024 at 10:26 AM Kamal Chandraprakash < >>> kamal.chandraprak...@gmail.com> wrote: >>> >>> > Hi Divij, >>> > >>> > Thanks for the detailed review! >>> > >>> > > 1, 2, 3, 4 >>> > Updated the KIP-1058 >>> > < >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions >>> > > >>> > with the feedback received and also opened a draft PR for #17659 >>> > <https://github.com/apache/kafka/pull/17659/files> reference. >>> > PTAL. >>> > >>> > > 5. How are we determining the value of the TrxIndexEmpty field on >>> segment >>> > rotation? >>> > Transaction index file is optional, the file does not exists when >>> there are >>> > no aborted txn entries for a >>> > segment, we will be using the file null check. Also, updated it in the >>> KIP. >>> > >>> > Thanks, >>> > Kamal >>> > >>> > >>> > On Tue, Oct 29, 2024 at 8:47 PM Divij Vaidya <divijvaidy...@gmail.com> >>> > wrote: >>> > >>> > > A few more points to discuss (please add to the KIP as well) >>> > > >>> > > 5. How are we determining the value of the TrxIndexEmpty field on >>> segment >>> > > rotation? >>> > > >>> > > One option is to do a boolean txnIdxEmpty = >>> > > segment.txnIndex().allAbortedTxns().isEmpty() but this will have an >>> > > overhead of reading the contents of the file and storing them in >>> memory, >>> > > when we have a non-empty index. >>> > > The other option (preferred) is to add an isEmpty() public method to >>> the >>> > > TransactionIndex and perform a segment.txnIndex().isEmpty() check >>> which >>> > > will internally use Files.size() java API. >>> > > >>> > > On Tue, Oct 29, 2024 at 1:21 PM Divij Vaidya < >>> divijvaidy...@gmail.com> >>> > > wrote: >>> > > >>> > > > Let's get the ball rolling (again) on this one. >>> > > > >>> > > > Kamal, could you please add the following to the KIP: >>> > > > 1. the API as discussed above. Please add the failure modes for >>> this >>> > API >>> > > > as well such as the exceptions thrown and a recommendation on how a >>> > > caller >>> > > > is expected to handle those. I am assuming that the three >>> parameters >>> > for >>> > > > this API will be topicPartition, epoch and offset. >>> > > > 2. implementation details for Topic based RLMM. I am assuming that >>> the >>> > > > plugin will default the field to false if this field is absent >>> (case of >>> > > old >>> > > > metadata). >>> > > > 3. In the test plan section, additionally, we need to assert that >>> we >>> > > don't >>> > > > read metadata for all segments (i.e. it is not a linear search) >>> from >>> > the >>> > > > Topic based RLMM. >>> > > > 4. in the compatibility section, please document how the existing >>> > > clusters >>> > > > with Tiered Storage metadata will work during/after a rolling >>> upgrade >>> > to >>> > > a >>> > > > version which contains this new change. >>> > > > >>> > > > -- >>> > > > Divij Vaidya >>> > > > >>> > > > >>> > > > >>> > > > On Fri, Oct 11, 2024 at 12:26 PM Kamal Chandraprakash < >>> > > > kamal.chandraprak...@gmail.com> wrote: >>> > > > >>> > > >> Bump for review. >>> > > >> >>> > > >> If the additional proposal looks good, I'll append them to the >>> KIP. >>> > > PTAL. >>> > > >> >>> > > >> New API in RLMM#nextRemoteLogSegmentMetadataWithTxnIndex >>> > > >> >>> > > >> -- >>> > > >> Kamal >>> > > >> >>> > > >> On Sun, Oct 6, 2024 at 7:20 PM Kamal Chandraprakash < >>> > > >> kamal.chandraprak...@gmail.com> wrote: >>> > > >> >>> > > >> > Hi Christo, >>> > > >> > >>> > > >> > Thanks for the review! >>> > > >> > >>> > > >> > Adding the new API `nextRemoteLogSegmentMetadataWithTxnIndex` in >>> > RLMM >>> > > >> > helps to >>> > > >> > reduce the complexity of linear search. With this API, we have >>> to: >>> > > >> > >>> > > >> > 1. Maintain one more skip-list [1] for each of the epochs in the >>> > > >> partition >>> > > >> > in RLMM that might >>> > > >> > increase the memory usage of TopicBased RLMM implementation. >>> > > >> > 1a) The skip-list will be empty when there are no aborted >>> txn >>> > > >> entries >>> > > >> > for a partition/epoch which is the predominant case. >>> > > >> > 1b) The skip-list will act as a duplicate when *most* of the >>> > > >> segments >>> > > >> > have aborted txn entries, assuming aborted txn are quite low, >>> this >>> > > >> should >>> > > >> > be fine. >>> > > >> > 2. Change the logic to retrieve the aborted txns (we have to >>> query >>> > the >>> > > >> > nextRLSMWithTxnIndex >>> > > >> > for each of the leader-epoch). >>> > > >> > 3. Logic divergence from how we retrieve the aborted txn entries >>> > > >> compared >>> > > >> > to the local-log. >>> > > >> > >>> > > >> > The approach looks good to me. If everyone is aligned, then we >>> can >>> > > >> proceed >>> > > >> > to add this API to RLMM. >>> > > >> > >>> > > >> > Another option I was thinking of is to capture the >>> > > `lastStableOffsetLag` >>> > > >> > [2] while rotating the segment. >>> > > >> > But, that is a bigger change we can take later. >>> > > >> > >>> > > >> > [1]: >>> > > >> > >>> > > >> >>> > > >>> > >>> https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java?L43 >>> > > >> > [2]: >>> > > >> > >>> > > >> >>> > > >>> > >>> https://sourcegraph.com/github.com/apache/kafka/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L432 >>> > > >> > >>> > > >> > >>> > > >> > Thanks, >>> > > >> > Kamal >>> > > >> > >>> > > >> > On Fri, Oct 4, 2024 at 4:21 PM Christo Lolov < >>> > christolo...@gmail.com> >>> > > >> > wrote: >>> > > >> > >>> > > >> >> Heya, >>> > > >> >> >>> > > >> >> Apologies for the delay. I have been thinking about this >>> problem >>> > > >> recently >>> > > >> >> as well and while I believe storing a boolean in the metadata >>> is >>> > > good, >>> > > >> I >>> > > >> >> think we can do better by introducing a new method to the RLMM >>> > along >>> > > >> the >>> > > >> >> lines of >>> > > >> >> >>> > > >> >> Optional<RemoteLogSegmentMetadata> >>> > > >> >> nextRemoteLogSegmentMetadataWithTxnIndex(TopicIdPartition >>> > > >> >> topicIdPartition, >>> > > >> >> int epochForOffset, long offset) throws RemoteStorageException >>> > > >> >> >>> > > >> >> This will help plugin implementers to build optimisations such >>> as >>> > > skip >>> > > >> >> lists which will give them the next segment quicker than a >>> linear >>> > > >> search. >>> > > >> >> >>> > > >> >> I am keen to hear your thoughts! >>> > > >> >> >>> > > >> >> Best, >>> > > >> >> Christo >>> > > >> >> >>> > > >> >> On Fri, 4 Oct 2024 at 10:48, Kamal Chandraprakash < >>> > > >> >> kamal.chandraprak...@gmail.com> wrote: >>> > > >> >> >>> > > >> >> > Hi Luke, >>> > > >> >> > >>> > > >> >> > Thanks for the review! >>> > > >> >> > >>> > > >> >> > > Do you think it is helpful if we store the "least abort >>> start >>> > > >> offset >>> > > >> >> in >>> > > >> >> > the >>> > > >> >> > segment", and -1 means no txnIndex. So that we can have a >>> way to >>> > > know >>> > > >> >> if we >>> > > >> >> > need to fetch this txn index or not. >>> > > >> >> > >>> > > >> >> > 1. No, this change won't have an effect. To find the >>> upper-bound >>> > > >> offset >>> > > >> >> > [1], we have to >>> > > >> >> > fetch that segment's offset index file. The >>> RemoteIndexCache >>> > > [2] >>> > > >> >> > fetches all the 3 >>> > > >> >> > index files together and caches them for subsequent use, >>> so >>> > > this >>> > > >> >> > improvement >>> > > >> >> > won't have an effect as the current segment txn index >>> gets >>> > > >> >> downloaded >>> > > >> >> > anyway. >>> > > >> >> > >>> > > >> >> > 2. The reason for choosing boolean is to make the change >>> backward >>> > > >> >> > compatible. >>> > > >> >> > There can be existing RLM events for the uploaded >>> segments. >>> > > The >>> > > >> >> > default >>> > > >> >> > value of `txnIdxEmpty` is false so the *old* RLM events >>> are >>> > > >> >> assumed to >>> > > >> >> > contain >>> > > >> >> > the txn index files and those files are downloaded if >>> they >>> > > >> exist. >>> > > >> >> > >>> > > >> >> > [1]: >>> > > >> >> > >>> > > >> >> > >>> > > >> >> >>> > > >> >>> > > >>> > >>> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1732 >>> > > >> >> > [2]: >>> > > >> >> > >>> > > >> >> > >>> > > >> >> >>> > > >> >>> > > >>> > >>> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java?L383 >>> > > >> >> > >>> > > >> >> > Thanks, >>> > > >> >> > Kamal >>> > > >> >> > >>> > > >> >> > On Thu, Oct 3, 2024 at 3:11 PM Luke Chen <show...@gmail.com> >>> > > wrote: >>> > > >> >> > >>> > > >> >> > > Hi Kamal, >>> > > >> >> > > >>> > > >> >> > > Sorry for the late review. >>> > > >> >> > > Thanks for the KIP, this will improve the transaction >>> reading >>> > for >>> > > >> >> remote >>> > > >> >> > > storage. >>> > > >> >> > > Overall LGTM, just one minor thought: >>> > > >> >> > > >>> > > >> >> > > Currently, we only store the `TxnIndexEmpty` bool value in >>> the >>> > > >> segment >>> > > >> >> > > metadata. >>> > > >> >> > > Do you think it is helpful if we store the "least abort >>> start >>> > > >> offset >>> > > >> >> in >>> > > >> >> > the >>> > > >> >> > > segment", and -1 means no txnIndex. So that we can have a >>> way >>> > to >>> > > >> know >>> > > >> >> if >>> > > >> >> > we >>> > > >> >> > > need to fetch this txn index or not. >>> > > >> >> > > >>> > > >> >> > > Thanks. >>> > > >> >> > > Luke >>> > > >> >> > > >>> > > >> >> > > On Mon, Sep 9, 2024 at 3:26 PM Kamal Chandraprakash < >>> > > >> >> > > kamal.chandraprak...@gmail.com> wrote: >>> > > >> >> > > >>> > > >> >> > > > Hi all, >>> > > >> >> > > > >>> > > >> >> > > > If there are no more comments, I'll start a voting thread >>> > soon. >>> > > >> >> > > > >>> > > >> >> > > > Thanks, >>> > > >> >> > > > Kamal >>> > > >> >> > > > >>> > > >> >> > > > On Fri, Sep 6, 2024 at 7:28 PM Kamal Chandraprakash < >>> > > >> >> > > > kamal.chandraprak...@gmail.com> wrote: >>> > > >> >> > > > >>> > > >> >> > > > > Bumping this thread again for review! >>> > > >> >> > > > > >>> > > >> >> > > > > Reduced the scope of the proposal to minimum. We will >>> be >>> > > adding >>> > > >> >> only >>> > > >> >> > > one >>> > > >> >> > > > > field (txnIdxEmpty) to the >>> > > >> >> > > > > RemoteLogSegmentMetadata event which is backward >>> > compatible. >>> > > >> PTAL. >>> > > >> >> > > > > >>> > > >> >> > > > > Thanks, >>> > > >> >> > > > > Kamal >>> > > >> >> > > > > >>> > > >> >> > > > > >>> > > >> >> > > > > On Tue, Aug 13, 2024 at 6:33 PM Kamal Chandraprakash < >>> > > >> >> > > > > kamal.chandraprak...@gmail.com> wrote: >>> > > >> >> > > > > >>> > > >> >> > > > >> Bumping this thread for KIP review! >>> > > >> >> > > > >> >>> > > >> >> > > > >> We can go for the simplest solution that is proposed >>> in >>> > this >>> > > >> KIP >>> > > >> >> and >>> > > >> >> > > > >> it can be improved in the subsequent iteration. PTAL. >>> > > >> >> > > > >> >>> > > >> >> > > > >> Thanks, >>> > > >> >> > > > >> Kamal >>> > > >> >> > > > >> >>> > > >> >> > > > >> On Fri, Aug 2, 2024 at 11:42 AM Kamal Chandraprakash < >>> > > >> >> > > > >> kamal.chandraprak...@gmail.com> wrote: >>> > > >> >> > > > >> >>> > > >> >> > > > >>> Hi Divij, >>> > > >> >> > > > >>> >>> > > >> >> > > > >>> Thanks for the review! And, sorry for the late reply. >>> > > >> >> > > > >>> >>> > > >> >> > > > >>> From the UnifiedLog.scala >>> > > >> >> > > > >>> < >>> > > >> >> > > > >>> > > >> >> > > >>> > > >> >> > >>> > > >> >> >>> > > >> >>> > > >>> > >>> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L421-427 >>> > > >> >> > > > > >>> > > >> >> > > > >>> doc: >>> > > >> >> > > > >>> >>> > > >> >> > > > >>> """ >>> > > >> >> > > > >>> The last stable offset (LSO) is defined as the first >>> > offset >>> > > >> such >>> > > >> >> > that >>> > > >> >> > > > >>> all lower offsets have been "decided." >>> > > >> >> > > > >>> * Non-transactional messages are considered >>> decided >>> > > >> >> immediately, >>> > > >> >> > > but >>> > > >> >> > > > >>> transactional messages are only decided when >>> > > >> >> > > > >>> * the corresponding COMMIT or ABORT marker is >>> written. >>> > > >> This >>> > > >> >> > > implies >>> > > >> >> > > > >>> that the last stable offset will be equal >>> > > >> >> > > > >>> * to the high watermark if there are no >>> transactional >>> > > >> >> messages >>> > > >> >> > in >>> > > >> >> > > > the >>> > > >> >> > > > >>> log. Note also that the LSO cannot advance >>> > > >> >> > > > >>> * beyond the high watermark. >>> > > >> >> > > > >>> """ >>> > > >> >> > > > >>> While rolling the active segment to passive, if LSO >>> > equals >>> > > to >>> > > >> >> HW, >>> > > >> >> > > then >>> > > >> >> > > > >>> all the messages in that segment are >>> > > >> >> > > > >>> decided and we can store the `lastStableOffsetLag` >>> as an >>> > > >> >> attribute >>> > > >> >> > in >>> > > >> >> > > > >>> the rolled segment. We can then propagate >>> > > >> >> > > > >>> the `lastStableOffsetLag` information in the >>> > > >> RemoteLogMetadata >>> > > >> >> > > events. >>> > > >> >> > > > >>> >>> > > >> >> > > > >>> While reading the remote log segment, if the >>> > > >> >> `lastStableOffsetLag` >>> > > >> >> > is >>> > > >> >> > > > 0, >>> > > >> >> > > > >>> then there is no need to traverse to >>> > > >> >> > > > >>> the subsequent segments for aborted transactions >>> which >>> > > covers >>> > > >> >> the >>> > > >> >> > > case >>> > > >> >> > > > >>> for the dominant case where the >>> > > >> >> > > > >>> partition had no transactions at all. >>> > > >> >> > > > >>> >>> > > >> >> > > > >>> With Log compaction, the shrinked segments might get >>> > > merged. >>> > > >> One >>> > > >> >> > > option >>> > > >> >> > > > >>> is to take the max of `lastStableOffsetLag` >>> > > >> >> > > > >>> and store it in the new LogSegment. Since, the tiered >>> > > storage >>> > > >> >> does >>> > > >> >> > > not >>> > > >> >> > > > >>> support compacted topics / historical compacted >>> > > >> >> > > > >>> topics, we can omit this case. >>> > > >> >> > > > >>> >>> > > >> >> > > > >>> If this approach looks good, I can update the KIP >>> with >>> > the >>> > > >> >> details. >>> > > >> >> > > > >>> >>> > > >> >> > > > >>> -- >>> > > >> >> > > > >>> Kamal >>> > > >> >> > > > >>> >>> > > >> >> > > > >>> >>> > > >> >> > > > >>> >>> > > >> >> > > > >>> >>> > > >> >> > > > >>> On Tue, Jun 25, 2024 at 4:24 PM Divij Vaidya < >>> > > >> >> > > divijvaidy...@gmail.com> >>> > > >> >> > > > >>> wrote: >>> > > >> >> > > > >>> >>> > > >> >> > > > >>>> Hi Kamal >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>>> Thanks for the bump. I have been thinking about this >>> > > >> passively >>> > > >> >> for >>> > > >> >> > > the >>> > > >> >> > > > >>>> past >>> > > >> >> > > > >>>> few days. >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>>> The simplest solution is to store a state at segment >>> > level >>> > > >> >> > metadata. >>> > > >> >> > > > The >>> > > >> >> > > > >>>> state should specify whether the trx index is empty >>> or >>> > > not. >>> > > >> It >>> > > >> >> > would >>> > > >> >> > > > be >>> > > >> >> > > > >>>> populated during segment archival. We would then >>> iterate >>> > > >> over >>> > > >> >> the >>> > > >> >> > > > >>>> metadata >>> > > >> >> > > > >>>> for future segments without having to make a remote >>> call >>> > > to >>> > > >> >> > download >>> > > >> >> > > > the >>> > > >> >> > > > >>>> trx index itself. >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>>> The other solution for storing state at a partition >>> > level >>> > > >> >> wouldn't >>> > > >> >> > > > >>>> work, as >>> > > >> >> > > > >>>> you mentioned, because we will have to change the >>> state >>> > on >>> > > >> >> every >>> > > >> >> > > > >>>> mutation >>> > > >> >> > > > >>>> to the log i.e. at expiration of segments and >>> append. >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>>> I have been thinking whether we can do something >>> better >>> > > than >>> > > >> >> the >>> > > >> >> > > > simple >>> > > >> >> > > > >>>> solution, hence the delay in replying. Let me tell >>> you >>> > my >>> > > >> half >>> > > >> >> > baked >>> > > >> >> > > > >>>> train >>> > > >> >> > > > >>>> of thoughts, perhaps, you can explore this as well. >>> I >>> > have >>> > > >> been >>> > > >> >> > > > thinking >>> > > >> >> > > > >>>> about using LSO (last stable offset) to handle the >>> case >>> > > when >>> > > >> >> the >>> > > >> >> > > > >>>> partition >>> > > >> >> > > > >>>> never had any transactions. For a partition which >>> never >>> > > had >>> > > >> any >>> > > >> >> > > > >>>> transaction, I would assume that the LSO is never >>> > > >> initialized >>> > > >> >> (or >>> > > >> >> > is >>> > > >> >> > > > >>>> equal >>> > > >> >> > > > >>>> to log start offset)? Or is it equal to HW in that >>> case? >>> > > >> This >>> > > >> >> is >>> > > >> >> > > > >>>> something >>> > > >> >> > > > >>>> that I am yet to verify. If this idea works, then we >>> > would >>> > > >> not >>> > > >> >> > have >>> > > >> >> > > to >>> > > >> >> > > > >>>> iterate through the metadata for the dominant case >>> where >>> > > the >>> > > >> >> > > partition >>> > > >> >> > > > >>>> had >>> > > >> >> > > > >>>> no transactions at all. >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>>> -- >>> > > >> >> > > > >>>> Divij Vaidya >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>>> On Tue, Jun 25, 2024 at 11:42 AM Kamal >>> Chandraprakash < >>> > > >> >> > > > >>>> kamal.chandraprak...@gmail.com> wrote: >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>>> > Bump. Please review this proposal. >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> > On Mon, Jun 17, 2024 at 6:55 PM Kamal >>> Chandraprakash < >>> > > >> >> > > > >>>> > kamal.chandraprak...@gmail.com> wrote: >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> > > Divij, >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > Thanks for the review! Updated the KIP with 1, >>> 2, 3, >>> > > >> and 4 >>> > > >> >> > > review >>> > > >> >> > > > >>>> > > comments. >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > > 4. Potential alternative - Instead of having >>> an >>> > > >> algorithm >>> > > >> >> > > where >>> > > >> >> > > > we >>> > > >> >> > > > >>>> > > traverse >>> > > >> >> > > > >>>> > > across segment metadata and looking for >>> > isTxnIdxEmpty >>> > > >> flag, >>> > > >> >> > > should >>> > > >> >> > > > >>>> we >>> > > >> >> > > > >>>> > > directly introduce a nextSegmentWithTrxInx() >>> > function? >>> > > >> This >>> > > >> >> > > would >>> > > >> >> > > > >>>> allow >>> > > >> >> > > > >>>> > > implementers to optimize the otherwise linear >>> scan >>> > > >> across >>> > > >> >> > > metadata >>> > > >> >> > > > >>>> for >>> > > >> >> > > > >>>> > all >>> > > >> >> > > > >>>> > > segments by using techniques such as skip list >>> etc. >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > This is a good point to optimize the scan. We >>> need >>> > to >>> > > >> >> maintain >>> > > >> >> > > the >>> > > >> >> > > > >>>> > > skip-list >>> > > >> >> > > > >>>> > > for each leader-epoch. With unclean leader >>> election, >>> > > >> some >>> > > >> >> > > brokers >>> > > >> >> > > > >>>> may not >>> > > >> >> > > > >>>> > > have >>> > > >> >> > > > >>>> > > the complete lineage. This will expand the >>> scope of >>> > > the >>> > > >> >> work. >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > In this version, we plan to optimize only for >>> the >>> > > below >>> > > >> 2 >>> > > >> >> > cases: >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > 1. A partition does not have the transaction >>> index >>> > for >>> > > >> any >>> > > >> >> of >>> > > >> >> > > the >>> > > >> >> > > > >>>> > uploaded >>> > > >> >> > > > >>>> > > segments. >>> > > >> >> > > > >>>> > > The individual log segments `isTxnIdxEmpty` >>> flag >>> > > can >>> > > >> be >>> > > >> >> > > reduced >>> > > >> >> > > > >>>> to a >>> > > >> >> > > > >>>> > > single flag >>> > > >> >> > > > >>>> > > in RLMM (using AND operator) that can serve >>> the >>> > > >> query - >>> > > >> >> "Is >>> > > >> >> > > all >>> > > >> >> > > > >>>> the >>> > > >> >> > > > >>>> > > transaction indexes empty for a partition?". >>> > > >> >> > > > >>>> > > If yes, then we can directly scan the >>> local-log >>> > for >>> > > >> >> aborted >>> > > >> >> > > > >>>> > > transactions. >>> > > >> >> > > > >>>> > > 2. A partition is produced using the >>> transactional >>> > > >> >> producer. >>> > > >> >> > The >>> > > >> >> > > > >>>> > > assumption made is that >>> > > >> >> > > > >>>> > > the transaction will either commit/rollback >>> > within >>> > > >> 15 >>> > > >> >> > > minutes >>> > > >> >> > > > >>>> > > (default transaction.max.timeout.ms = 15 >>> mins), >>> > > >> >> possibly >>> > > >> >> > we >>> > > >> >> > > > >>>> may have >>> > > >> >> > > > >>>> > > to search only >>> > > >> >> > > > >>>> > > a few consecutive remote log segments to >>> collect >>> > > the >>> > > >> >> > aborted >>> > > >> >> > > > >>>> > > transactions. >>> > > >> >> > > > >>>> > > 3. A partition is being produced with both >>> normal >>> > and >>> > > >> >> > > > transactional >>> > > >> >> > > > >>>> > > producers. In this case, >>> > > >> >> > > > >>>> > > we will be doing linear traversal. >>> Maintaining a >>> > > >> >> skip-list >>> > > >> >> > > > might >>> > > >> >> > > > >>>> > > improve the performance but >>> > > >> >> > > > >>>> > > we delegate the RLMM implementation to >>> users. If >>> > > >> >> > implemented >>> > > >> >> > > > >>>> > > incorrectly, then it can lead >>> > > >> >> > > > >>>> > > to delivery of the aborted transaction >>> records >>> > to >>> > > >> the >>> > > >> >> > > > consumer. >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > I notice two drawbacks with the reduction >>> method as >>> > > >> >> proposed >>> > > >> >> > in >>> > > >> >> > > > the >>> > > >> >> > > > >>>> KIP: >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > 1. Even if one segment has a transaction index, >>> then >>> > > we >>> > > >> >> have >>> > > >> >> > to >>> > > >> >> > > > >>>> iterate >>> > > >> >> > > > >>>> > > over all the metadata events. >>> > > >> >> > > > >>>> > > 2. Assume that there are 10 segments and >>> segment-5 >>> > > has a >>> > > >> >> txn >>> > > >> >> > > > index. >>> > > >> >> > > > >>>> Once >>> > > >> >> > > > >>>> > > the first 6 segments are deleted, >>> > > >> >> > > > >>>> > > due to breach by time/size/start-offset, >>> then we >>> > > >> should >>> > > >> >> > > return >>> > > >> >> > > > >>>> `true` >>> > > >> >> > > > >>>> > > for "Is all the transaction indexes empty for a >>> > > >> partition?" >>> > > >> >> > > > >>>> > > query but it will return `false` until the >>> broker >>> > > >> gets >>> > > >> >> > > > restarted >>> > > >> >> > > > >>>> and >>> > > >> >> > > > >>>> > we >>> > > >> >> > > > >>>> > > have to resort to iterate over all the metadata >>> > > events. >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > > 5. Potential alternative#2 - We know that we >>> may >>> > > want >>> > > >> the >>> > > >> >> > > > indexes >>> > > >> >> > > > >>>> of >>> > > >> >> > > > >>>> > > multiple higher segments. Instead of fetching >>> them >>> > > >> >> > sequentially, >>> > > >> >> > > > we >>> > > >> >> > > > >>>> could >>> > > >> >> > > > >>>> > > implement a parallel fetch or a pre-fetch for >>> the >>> > > >> indexes. >>> > > >> >> > This >>> > > >> >> > > > >>>> would >>> > > >> >> > > > >>>> > help >>> > > >> >> > > > >>>> > > hide the latency of sequentially fetching the >>> trx >>> > > >> indexes. >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > We can implement parallel-fetch/prefetch once >>> the >>> > > tiered >>> > > >> >> > storage >>> > > >> >> > > > is >>> > > >> >> > > > >>>> GAed. >>> > > >> >> > > > >>>> > > Since this feature will be useful >>> > > >> >> > > > >>>> > > to prefetch the next remote log segment and it >>> > expands >>> > > >> the >>> > > >> >> > scope >>> > > >> >> > > > of >>> > > >> >> > > > >>>> the >>> > > >> >> > > > >>>> > > work. >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > > 6. Should the proposed API take "segmentId" >>> as a >>> > > >> >> parameter >>> > > >> >> > > > >>>> instead of >>> > > >> >> > > > >>>> > > "topicIdPartition"? Suggesting because >>> isTxnIdEmpty >>> > is >>> > > >> not >>> > > >> >> a >>> > > >> >> > > > >>>> property of >>> > > >> >> > > > >>>> > a >>> > > >> >> > > > >>>> > > partition, instead it's a property of a specific >>> > > >> segment. >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > We propose to use the `topicIdPartition` in >>> > > >> >> > > > >>>> RemoteLogMetadataManager. >>> > > >> >> > > > >>>> > > The implementation can fold/reduce the value of >>> the >>> > > >> >> individual >>> > > >> >> > > log >>> > > >> >> > > > >>>> > segment >>> > > >> >> > > > >>>> > > `isTxnIdEmpty` flag. This is added to avoid >>> scanning >>> > > all >>> > > >> >> the >>> > > >> >> > > > >>>> metadata >>> > > >> >> > > > >>>> > > events >>> > > >> >> > > > >>>> > > when the partition does not have a transaction >>> index >>> > > in >>> > > >> >> any of >>> > > >> >> > > the >>> > > >> >> > > > >>>> > > segments. >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya < >>> > > >> >> > > > >>>> divijvaidy...@gmail.com> >>> > > >> >> > > > >>>> > > wrote: >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > >> Hi Kamal >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >> Thanks for bringing this up. This is a problem >>> > worth >>> > > >> >> solving. >>> > > >> >> > > We >>> > > >> >> > > > >>>> have >>> > > >> >> > > > >>>> > >> faced >>> > > >> >> > > > >>>> > >> this in situations where some Kafka clients >>> default >>> > > to >>> > > >> >> > > > >>>> read_committed >>> > > >> >> > > > >>>> > mode >>> > > >> >> > > > >>>> > >> and end up having high latencies for remote >>> fetches >>> > > >> due to >>> > > >> >> > this >>> > > >> >> > > > >>>> > traversal >>> > > >> >> > > > >>>> > >> across all segments. >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >> First some nits to clarify the KIP: >>> > > >> >> > > > >>>> > >> 1. The motivation should make it clear that >>> > traversal >>> > > >> of >>> > > >> >> all >>> > > >> >> > > > >>>> segments is >>> > > >> >> > > > >>>> > >> only in the worst case. If I am not mistaken >>> > (please >>> > > >> >> correct >>> > > >> >> > me >>> > > >> >> > > > if >>> > > >> >> > > > >>>> > wrong), >>> > > >> >> > > > >>>> > >> the traversal stops when it has found a segment >>> > > >> containing >>> > > >> >> > LSO. >>> > > >> >> > > > >>>> > >> 2. There is nothing like a non-txn topic. A >>> > > transaction >>> > > >> >> may >>> > > >> >> > be >>> > > >> >> > > > >>>> started >>> > > >> >> > > > >>>> > on >>> > > >> >> > > > >>>> > >> any topic. Perhaps, rephrase the statement in >>> the >>> > KIP >>> > > >> so >>> > > >> >> that >>> > > >> >> > > it >>> > > >> >> > > > is >>> > > >> >> > > > >>>> > clear >>> > > >> >> > > > >>>> > >> to the reader. >>> > > >> >> > > > >>>> > >> 3. The hyperlink in the "the broker has to >>> traverse >>> > > all >>> > > >> >> > the..." >>> > > >> >> > > > >>>> seems >>> > > >> >> > > > >>>> > >> incorrect. Did you want to point to >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>> > > >> >> > > >>> > > >> >> > >>> > > >> >> >>> > > >> >>> > > >>> > >>> https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444 >>> > > >> >> > > > >>>> > >> ? >>> > > >> >> > > > >>>> > >> 4. In the testing section, could we add a test >>> > plan? >>> > > >> For >>> > > >> >> > > > example, I >>> > > >> >> > > > >>>> > would >>> > > >> >> > > > >>>> > >> list down adding a test which would verify the >>> > number >>> > > >> of >>> > > >> >> > calls >>> > > >> >> > > > >>>> made to >>> > > >> >> > > > >>>> > >> RLMM. This test would have a higher number of >>> calls >>> > > >> >> earlier >>> > > >> >> > vs. >>> > > >> >> > > > >>>> after >>> > > >> >> > > > >>>> > this >>> > > >> >> > > > >>>> > >> KIP. >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >> Other thoughts: >>> > > >> >> > > > >>>> > >> 4. Potential alternative - Instead of having an >>> > > >> algorithm >>> > > >> >> > where >>> > > >> >> > > > we >>> > > >> >> > > > >>>> > >> traverse >>> > > >> >> > > > >>>> > >> across segment metadata and looking for >>> > isTxnIdxEmpty >>> > > >> >> flag, >>> > > >> >> > > > should >>> > > >> >> > > > >>>> we >>> > > >> >> > > > >>>> > >> directly introduce a nextSegmentWithTrxInx() >>> > > function? >>> > > >> >> This >>> > > >> >> > > would >>> > > >> >> > > > >>>> allow >>> > > >> >> > > > >>>> > >> implementers to optimize the otherwise linear >>> scan >>> > > >> across >>> > > >> >> > > > metadata >>> > > >> >> > > > >>>> for >>> > > >> >> > > > >>>> > all >>> > > >> >> > > > >>>> > >> segments by using techniques such as skip list >>> etc. >>> > > >> >> > > > >>>> > >> 5. Potential alternative#2 - We know that we >>> may >>> > want >>> > > >> the >>> > > >> >> > > indexes >>> > > >> >> > > > >>>> of >>> > > >> >> > > > >>>> > >> multiple higher segments. Instead of fetching >>> them >>> > > >> >> > > sequentially, >>> > > >> >> > > > we >>> > > >> >> > > > >>>> > could >>> > > >> >> > > > >>>> > >> implement a parallel fetch or a pre-fetch for >>> the >>> > > >> indexes. >>> > > >> >> > This >>> > > >> >> > > > >>>> would >>> > > >> >> > > > >>>> > help >>> > > >> >> > > > >>>> > >> hide the latency of sequentially fetching the >>> trx >>> > > >> indexes. >>> > > >> >> > > > >>>> > >> 6. Should the proposed API take "segmentId" as >>> a >>> > > >> parameter >>> > > >> >> > > > instead >>> > > >> >> > > > >>>> of >>> > > >> >> > > > >>>> > >> "topicIdPartition"? Suggesting because >>> isTxnIdEmpty >>> > > is >>> > > >> >> not a >>> > > >> >> > > > >>>> property >>> > > >> >> > > > >>>> > of a >>> > > >> >> > > > >>>> > >> partition, instead it's a property of a >>> specific >>> > > >> segment. >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >> Looking forward to hearing your thoughts about >>> the >>> > > >> >> > > alternatives. >>> > > >> >> > > > >>>> Let's >>> > > >> >> > > > >>>> > get >>> > > >> >> > > > >>>> > >> this fixed. >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >> -- >>> > > >> >> > > > >>>> > >> Divij Vaidya >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal >>> > > Chandraprakash < >>> > > >> >> > > > >>>> > >> kamal.chandraprak...@gmail.com> wrote: >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >> > Hi all, >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> > I have opened a KIP-1058 >>> > > >> >> > > > >>>> > >> > < >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>> > > >> >> > > >>> > > >> >> > >>> > > >> >> >>> > > >> >>> > > >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic >>> > > >> >> > > > >>>> > >> > > >>> > > >> >> > > > >>>> > >> > to reduce the pressure on remote storage when >>> > > >> >> transactional >>> > > >> >> > > > >>>> consumers >>> > > >> >> > > > >>>> > >> are >>> > > >> >> > > > >>>> > >> > reading non-txn topics from remote storage. >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>> > > >> >> > > >>> > > >> >> > >>> > > >> >> >>> > > >> >>> > > >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> > Feedbacks and suggestions are welcome. >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> > Thanks, >>> > > >> >> > > > >>>> > >> > Kamal >>> > > >> >> > > > >>>> > >> > >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > > >>> > > >> >> > > > >>>> > >>> > > >> >> > > > >>>> >>> > > >> >> > > > >>> >>> > > >> >> > > > >>> > > >> >> > > >>> > > >> >> > >>> > > >> >> >>> > > >> > >>> > > >> >>> > > > >>> > > >>> > >>> >>