Hi Divij, The implementation does not force the plugin to have an optimization. I've updated the public API javadoc about this change. PTAL.
On Tue, Nov 5, 2024 at 12:42 AM Divij Vaidya <divijvaidy...@gmail.com> wrote: > Sorry for the churn on this one. But circling back to the API. > > Today, the API forces the plugins to have an optimization which can provide > the nextSegment with trx Idx directly. This goes against the > original invariant of KIP 405, where we wanted the plugins to be quite dumb > CRUD APIs. > > An ideal API will allow a plugin to use the optimization where possible and > fall back to usual iteration through all segment metadata where > optimization is not available in the plugin. > Could we change the contract of the API which can leave this flexibility of > the optimization to the plugin developers? (The topic based RLMM would > continue to use the optimization you authored in the PR) > > Thoughts? > > -- > Divij Vaidya > > > > On Mon, Nov 4, 2024 at 3:01 PM Divij Vaidya <divijvaidy...@gmail.com> > wrote: > > > Thanks for patiently addressing all the comments. > > > > I will add the vote in the other thread. > > > > -- > > Divij Vaidya > > > > > > > > On Mon, Nov 4, 2024 at 5:40 AM Kamal Chandraprakash < > > kamal.chandraprak...@gmail.com> wrote: > > > >> Hi all, > >> > >> If the review comments are addressed and KIP looks good, please submit > >> your > >> vote on the voting thread. > >> > >> Thanks, > >> Kamal > >> > >> On Sat, Nov 2, 2024 at 7:52 PM Kamal Chandraprakash < > >> kamal.chandraprak...@gmail.com> wrote: > >> > >> > I've opened a PR which adds an integration test > >> > < > >> > https://github.com/apache/kafka/pull/17668/files#diff-a60b518846fc0f770164d55b6d7cd31e03a002514377578c80c6e22cc120af40R88 > >> > > >> > to show the impact of this change/KIP. PTAL. > >> > > >> > -- > >> > Kamal > >> > > >> > On Fri, Nov 1, 2024 at 3:31 PM Kamal Chandraprakash < > >> > kamal.chandraprak...@gmail.com> wrote: > >> > > >> >> Thanks for the review! > >> >> > >> >> > 5 > >> >> Updated the KIP to include the `isEmpty` method in the transaction > >> index > >> >> > >> >> > 6 > >> >> You're right. The offset parameter will be equal to the > >> >> next-segment-to-consider base offset. > >> >> But, the API introduced in RLMM is for *one* epoch. The next epoch > >> >> start-offset may not be > >> >> the base-offset. You can refer to this line > >> >> < > >> > https://github.com/apache/kafka/pull/17659/files#diff-380e4d8859ea9148f21794c09039425c82d9012a392c2dbbe1ce2ec8677a1970R1857 > >> > > >> >> in the draft PR. > >> >> > >> >> > >> >> On Fri, Nov 1, 2024 at 3:16 PM Divij Vaidya <divijvaidy...@gmail.com > > > >> >> wrote: > >> >> > >> >>> Last few things - > >> >>> > >> >>> # 5 > >> >>> About setting the TrxIndexEmpty field, could we introduce an > isEmpty() > >> >>> function in TransactionIndex which has the following implementation: > >> >>> > >> >>> public boolean isEmpty() { > >> >>> return !iterable().iterator().hasNext(); > >> >>> } > >> >>> > >> >>> > >> >>> The advantages of this approach is: > >> >>> 1. It works for both cases when the file is not present and also > when > >> the > >> >>> file is present but is empty. > >> >>> 2. It prevents leaking the underlying implementation of > >> TransactionIndex > >> >>> outside via the file() method. I think that making file() as public > >> is an > >> >>> implementation leak (for example, what is the trx indx is not file > >> >>> based!). > >> >>> > >> >>> > >> >>> #6 > >> >>> In the documentation for nextSegmentWithTxnIndex, the offset > parameter > >> >>> should be equal to the next-segment-to-consider's base offset, no? > >> >>> I assume that we will add a new fetch here with > nextSegmentBaseOffset > >> >>> > >> >>> > >> > https://github.com/apache/kafka/blob/346fdbafc539bc48bb66eedae89a15e240007fd9/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1801 > >> >>> . Is there a case where the parameter "offset" will not be equal to > >> the > >> >>> baseOffset of a segment? > >> >>> > >> >>> -- > >> >>> Divij Vaidya > >> >>> > >> >>> > >> >>> > >> >>> On Fri, Nov 1, 2024 at 10:26 AM Kamal Chandraprakash < > >> >>> kamal.chandraprak...@gmail.com> wrote: > >> >>> > >> >>> > Hi Divij, > >> >>> > > >> >>> > Thanks for the detailed review! > >> >>> > > >> >>> > > 1, 2, 3, 4 > >> >>> > Updated the KIP-1058 > >> >>> > < > >> >>> > > >> >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions > >> >>> > > > >> >>> > with the feedback received and also opened a draft PR for #17659 > >> >>> > <https://github.com/apache/kafka/pull/17659/files> reference. > >> >>> > PTAL. > >> >>> > > >> >>> > > 5. How are we determining the value of the TrxIndexEmpty field > on > >> >>> segment > >> >>> > rotation? > >> >>> > Transaction index file is optional, the file does not exists when > >> >>> there are > >> >>> > no aborted txn entries for a > >> >>> > segment, we will be using the file null check. Also, updated it in > >> the > >> >>> KIP. > >> >>> > > >> >>> > Thanks, > >> >>> > Kamal > >> >>> > > >> >>> > > >> >>> > On Tue, Oct 29, 2024 at 8:47 PM Divij Vaidya < > >> divijvaidy...@gmail.com> > >> >>> > wrote: > >> >>> > > >> >>> > > A few more points to discuss (please add to the KIP as well) > >> >>> > > > >> >>> > > 5. How are we determining the value of the TrxIndexEmpty field > on > >> >>> segment > >> >>> > > rotation? > >> >>> > > > >> >>> > > One option is to do a boolean txnIdxEmpty = > >> >>> > > segment.txnIndex().allAbortedTxns().isEmpty() but this will have > >> an > >> >>> > > overhead of reading the contents of the file and storing them in > >> >>> memory, > >> >>> > > when we have a non-empty index. > >> >>> > > The other option (preferred) is to add an isEmpty() public > method > >> to > >> >>> the > >> >>> > > TransactionIndex and perform a segment.txnIndex().isEmpty() > check > >> >>> which > >> >>> > > will internally use Files.size() java API. > >> >>> > > > >> >>> > > On Tue, Oct 29, 2024 at 1:21 PM Divij Vaidya < > >> >>> divijvaidy...@gmail.com> > >> >>> > > wrote: > >> >>> > > > >> >>> > > > Let's get the ball rolling (again) on this one. > >> >>> > > > > >> >>> > > > Kamal, could you please add the following to the KIP: > >> >>> > > > 1. the API as discussed above. Please add the failure modes > for > >> >>> this > >> >>> > API > >> >>> > > > as well such as the exceptions thrown and a recommendation on > >> how a > >> >>> > > caller > >> >>> > > > is expected to handle those. I am assuming that the three > >> >>> parameters > >> >>> > for > >> >>> > > > this API will be topicPartition, epoch and offset. > >> >>> > > > 2. implementation details for Topic based RLMM. I am assuming > >> that > >> >>> the > >> >>> > > > plugin will default the field to false if this field is absent > >> >>> (case of > >> >>> > > old > >> >>> > > > metadata). > >> >>> > > > 3. In the test plan section, additionally, we need to assert > >> that > >> >>> we > >> >>> > > don't > >> >>> > > > read metadata for all segments (i.e. it is not a linear > search) > >> >>> from > >> >>> > the > >> >>> > > > Topic based RLMM. > >> >>> > > > 4. in the compatibility section, please document how the > >> existing > >> >>> > > clusters > >> >>> > > > with Tiered Storage metadata will work during/after a rolling > >> >>> upgrade > >> >>> > to > >> >>> > > a > >> >>> > > > version which contains this new change. > >> >>> > > > > >> >>> > > > -- > >> >>> > > > Divij Vaidya > >> >>> > > > > >> >>> > > > > >> >>> > > > > >> >>> > > > On Fri, Oct 11, 2024 at 12:26 PM Kamal Chandraprakash < > >> >>> > > > kamal.chandraprak...@gmail.com> wrote: > >> >>> > > > > >> >>> > > >> Bump for review. > >> >>> > > >> > >> >>> > > >> If the additional proposal looks good, I'll append them to > the > >> >>> KIP. > >> >>> > > PTAL. > >> >>> > > >> > >> >>> > > >> New API in RLMM#nextRemoteLogSegmentMetadataWithTxnIndex > >> >>> > > >> > >> >>> > > >> -- > >> >>> > > >> Kamal > >> >>> > > >> > >> >>> > > >> On Sun, Oct 6, 2024 at 7:20 PM Kamal Chandraprakash < > >> >>> > > >> kamal.chandraprak...@gmail.com> wrote: > >> >>> > > >> > >> >>> > > >> > Hi Christo, > >> >>> > > >> > > >> >>> > > >> > Thanks for the review! > >> >>> > > >> > > >> >>> > > >> > Adding the new API > >> `nextRemoteLogSegmentMetadataWithTxnIndex` in > >> >>> > RLMM > >> >>> > > >> > helps to > >> >>> > > >> > reduce the complexity of linear search. With this API, we > >> have > >> >>> to: > >> >>> > > >> > > >> >>> > > >> > 1. Maintain one more skip-list [1] for each of the epochs > in > >> the > >> >>> > > >> partition > >> >>> > > >> > in RLMM that might > >> >>> > > >> > increase the memory usage of TopicBased RLMM > >> implementation. > >> >>> > > >> > 1a) The skip-list will be empty when there are no > aborted > >> >>> txn > >> >>> > > >> entries > >> >>> > > >> > for a partition/epoch which is the predominant case. > >> >>> > > >> > 1b) The skip-list will act as a duplicate when *most* > of > >> the > >> >>> > > >> segments > >> >>> > > >> > have aborted txn entries, assuming aborted txn are quite > low, > >> >>> this > >> >>> > > >> should > >> >>> > > >> > be fine. > >> >>> > > >> > 2. Change the logic to retrieve the aborted txns (we have > to > >> >>> query > >> >>> > the > >> >>> > > >> > nextRLSMWithTxnIndex > >> >>> > > >> > for each of the leader-epoch). > >> >>> > > >> > 3. Logic divergence from how we retrieve the aborted txn > >> entries > >> >>> > > >> compared > >> >>> > > >> > to the local-log. > >> >>> > > >> > > >> >>> > > >> > The approach looks good to me. If everyone is aligned, then > >> we > >> >>> can > >> >>> > > >> proceed > >> >>> > > >> > to add this API to RLMM. > >> >>> > > >> > > >> >>> > > >> > Another option I was thinking of is to capture the > >> >>> > > `lastStableOffsetLag` > >> >>> > > >> > [2] while rotating the segment. > >> >>> > > >> > But, that is a bigger change we can take later. > >> >>> > > >> > > >> >>> > > >> > [1]: > >> >>> > > >> > > >> >>> > > >> > >> >>> > > > >> >>> > > >> >>> > >> > https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java?L43 > >> >>> > > >> > [2]: > >> >>> > > >> > > >> >>> > > >> > >> >>> > > > >> >>> > > >> >>> > >> > https://sourcegraph.com/github.com/apache/kafka/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L432 > >> >>> > > >> > > >> >>> > > >> > > >> >>> > > >> > Thanks, > >> >>> > > >> > Kamal > >> >>> > > >> > > >> >>> > > >> > On Fri, Oct 4, 2024 at 4:21 PM Christo Lolov < > >> >>> > christolo...@gmail.com> > >> >>> > > >> > wrote: > >> >>> > > >> > > >> >>> > > >> >> Heya, > >> >>> > > >> >> > >> >>> > > >> >> Apologies for the delay. I have been thinking about this > >> >>> problem > >> >>> > > >> recently > >> >>> > > >> >> as well and while I believe storing a boolean in the > >> metadata > >> >>> is > >> >>> > > good, > >> >>> > > >> I > >> >>> > > >> >> think we can do better by introducing a new method to the > >> RLMM > >> >>> > along > >> >>> > > >> the > >> >>> > > >> >> lines of > >> >>> > > >> >> > >> >>> > > >> >> Optional<RemoteLogSegmentMetadata> > >> >>> > > >> >> nextRemoteLogSegmentMetadataWithTxnIndex(TopicIdPartition > >> >>> > > >> >> topicIdPartition, > >> >>> > > >> >> int epochForOffset, long offset) throws > >> RemoteStorageException > >> >>> > > >> >> > >> >>> > > >> >> This will help plugin implementers to build optimisations > >> such > >> >>> as > >> >>> > > skip > >> >>> > > >> >> lists which will give them the next segment quicker than a > >> >>> linear > >> >>> > > >> search. > >> >>> > > >> >> > >> >>> > > >> >> I am keen to hear your thoughts! > >> >>> > > >> >> > >> >>> > > >> >> Best, > >> >>> > > >> >> Christo > >> >>> > > >> >> > >> >>> > > >> >> On Fri, 4 Oct 2024 at 10:48, Kamal Chandraprakash < > >> >>> > > >> >> kamal.chandraprak...@gmail.com> wrote: > >> >>> > > >> >> > >> >>> > > >> >> > Hi Luke, > >> >>> > > >> >> > > >> >>> > > >> >> > Thanks for the review! > >> >>> > > >> >> > > >> >>> > > >> >> > > Do you think it is helpful if we store the "least > abort > >> >>> start > >> >>> > > >> offset > >> >>> > > >> >> in > >> >>> > > >> >> > the > >> >>> > > >> >> > segment", and -1 means no txnIndex. So that we can have > a > >> >>> way to > >> >>> > > know > >> >>> > > >> >> if we > >> >>> > > >> >> > need to fetch this txn index or not. > >> >>> > > >> >> > > >> >>> > > >> >> > 1. No, this change won't have an effect. To find the > >> >>> upper-bound > >> >>> > > >> offset > >> >>> > > >> >> > [1], we have to > >> >>> > > >> >> > fetch that segment's offset index file. The > >> >>> RemoteIndexCache > >> >>> > > [2] > >> >>> > > >> >> > fetches all the 3 > >> >>> > > >> >> > index files together and caches them for subsequent > >> use, > >> >>> so > >> >>> > > this > >> >>> > > >> >> > improvement > >> >>> > > >> >> > won't have an effect as the current segment txn > index > >> >>> gets > >> >>> > > >> >> downloaded > >> >>> > > >> >> > anyway. > >> >>> > > >> >> > > >> >>> > > >> >> > 2. The reason for choosing boolean is to make the change > >> >>> backward > >> >>> > > >> >> > compatible. > >> >>> > > >> >> > There can be existing RLM events for the uploaded > >> >>> segments. > >> >>> > > The > >> >>> > > >> >> > default > >> >>> > > >> >> > value of `txnIdxEmpty` is false so the *old* RLM > >> events > >> >>> are > >> >>> > > >> >> assumed to > >> >>> > > >> >> > contain > >> >>> > > >> >> > the txn index files and those files are downloaded > if > >> >>> they > >> >>> > > >> exist. > >> >>> > > >> >> > > >> >>> > > >> >> > [1]: > >> >>> > > >> >> > > >> >>> > > >> >> > > >> >>> > > >> >> > >> >>> > > >> > >> >>> > > > >> >>> > > >> >>> > >> > https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1732 > >> >>> > > >> >> > [2]: > >> >>> > > >> >> > > >> >>> > > >> >> > > >> >>> > > >> >> > >> >>> > > >> > >> >>> > > > >> >>> > > >> >>> > >> > https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java?L383 > >> >>> > > >> >> > > >> >>> > > >> >> > Thanks, > >> >>> > > >> >> > Kamal > >> >>> > > >> >> > > >> >>> > > >> >> > On Thu, Oct 3, 2024 at 3:11 PM Luke Chen < > >> show...@gmail.com> > >> >>> > > wrote: > >> >>> > > >> >> > > >> >>> > > >> >> > > Hi Kamal, > >> >>> > > >> >> > > > >> >>> > > >> >> > > Sorry for the late review. > >> >>> > > >> >> > > Thanks for the KIP, this will improve the transaction > >> >>> reading > >> >>> > for > >> >>> > > >> >> remote > >> >>> > > >> >> > > storage. > >> >>> > > >> >> > > Overall LGTM, just one minor thought: > >> >>> > > >> >> > > > >> >>> > > >> >> > > Currently, we only store the `TxnIndexEmpty` bool > value > >> in > >> >>> the > >> >>> > > >> segment > >> >>> > > >> >> > > metadata. > >> >>> > > >> >> > > Do you think it is helpful if we store the "least > abort > >> >>> start > >> >>> > > >> offset > >> >>> > > >> >> in > >> >>> > > >> >> > the > >> >>> > > >> >> > > segment", and -1 means no txnIndex. So that we can > have > >> a > >> >>> way > >> >>> > to > >> >>> > > >> know > >> >>> > > >> >> if > >> >>> > > >> >> > we > >> >>> > > >> >> > > need to fetch this txn index or not. > >> >>> > > >> >> > > > >> >>> > > >> >> > > Thanks. > >> >>> > > >> >> > > Luke > >> >>> > > >> >> > > > >> >>> > > >> >> > > On Mon, Sep 9, 2024 at 3:26 PM Kamal Chandraprakash < > >> >>> > > >> >> > > kamal.chandraprak...@gmail.com> wrote: > >> >>> > > >> >> > > > >> >>> > > >> >> > > > Hi all, > >> >>> > > >> >> > > > > >> >>> > > >> >> > > > If there are no more comments, I'll start a voting > >> thread > >> >>> > soon. > >> >>> > > >> >> > > > > >> >>> > > >> >> > > > Thanks, > >> >>> > > >> >> > > > Kamal > >> >>> > > >> >> > > > > >> >>> > > >> >> > > > On Fri, Sep 6, 2024 at 7:28 PM Kamal Chandraprakash > < > >> >>> > > >> >> > > > kamal.chandraprak...@gmail.com> wrote: > >> >>> > > >> >> > > > > >> >>> > > >> >> > > > > Bumping this thread again for review! > >> >>> > > >> >> > > > > > >> >>> > > >> >> > > > > Reduced the scope of the proposal to minimum. We > >> will > >> >>> be > >> >>> > > adding > >> >>> > > >> >> only > >> >>> > > >> >> > > one > >> >>> > > >> >> > > > > field (txnIdxEmpty) to the > >> >>> > > >> >> > > > > RemoteLogSegmentMetadata event which is backward > >> >>> > compatible. > >> >>> > > >> PTAL. > >> >>> > > >> >> > > > > > >> >>> > > >> >> > > > > Thanks, > >> >>> > > >> >> > > > > Kamal > >> >>> > > >> >> > > > > > >> >>> > > >> >> > > > > > >> >>> > > >> >> > > > > On Tue, Aug 13, 2024 at 6:33 PM Kamal > >> Chandraprakash < > >> >>> > > >> >> > > > > kamal.chandraprak...@gmail.com> wrote: > >> >>> > > >> >> > > > > > >> >>> > > >> >> > > > >> Bumping this thread for KIP review! > >> >>> > > >> >> > > > >> > >> >>> > > >> >> > > > >> We can go for the simplest solution that is > >> proposed > >> >>> in > >> >>> > this > >> >>> > > >> KIP > >> >>> > > >> >> and > >> >>> > > >> >> > > > >> it can be improved in the subsequent iteration. > >> PTAL. > >> >>> > > >> >> > > > >> > >> >>> > > >> >> > > > >> Thanks, > >> >>> > > >> >> > > > >> Kamal > >> >>> > > >> >> > > > >> > >> >>> > > >> >> > > > >> On Fri, Aug 2, 2024 at 11:42 AM Kamal > >> Chandraprakash < > >> >>> > > >> >> > > > >> kamal.chandraprak...@gmail.com> wrote: > >> >>> > > >> >> > > > >> > >> >>> > > >> >> > > > >>> Hi Divij, > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > >>> Thanks for the review! And, sorry for the late > >> reply. > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > >>> From the UnifiedLog.scala > >> >>> > > >> >> > > > >>> < > >> >>> > > >> >> > > > > >> >>> > > >> >> > > > >> >>> > > >> >> > > >> >>> > > >> >> > >> >>> > > >> > >> >>> > > > >> >>> > > >> >>> > >> > https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L421-427 > >> >>> > > >> >> > > > > > >> >>> > > >> >> > > > >>> doc: > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > >>> """ > >> >>> > > >> >> > > > >>> The last stable offset (LSO) is defined as the > >> first > >> >>> > offset > >> >>> > > >> such > >> >>> > > >> >> > that > >> >>> > > >> >> > > > >>> all lower offsets have been "decided." > >> >>> > > >> >> > > > >>> * Non-transactional messages are considered > >> >>> decided > >> >>> > > >> >> immediately, > >> >>> > > >> >> > > but > >> >>> > > >> >> > > > >>> transactional messages are only decided when > >> >>> > > >> >> > > > >>> * the corresponding COMMIT or ABORT marker is > >> >>> written. > >> >>> > > >> This > >> >>> > > >> >> > > implies > >> >>> > > >> >> > > > >>> that the last stable offset will be equal > >> >>> > > >> >> > > > >>> * to the high watermark if there are no > >> >>> transactional > >> >>> > > >> >> messages > >> >>> > > >> >> > in > >> >>> > > >> >> > > > the > >> >>> > > >> >> > > > >>> log. Note also that the LSO cannot advance > >> >>> > > >> >> > > > >>> * beyond the high watermark. > >> >>> > > >> >> > > > >>> """ > >> >>> > > >> >> > > > >>> While rolling the active segment to passive, if > >> LSO > >> >>> > equals > >> >>> > > to > >> >>> > > >> >> HW, > >> >>> > > >> >> > > then > >> >>> > > >> >> > > > >>> all the messages in that segment are > >> >>> > > >> >> > > > >>> decided and we can store the > `lastStableOffsetLag` > >> >>> as an > >> >>> > > >> >> attribute > >> >>> > > >> >> > in > >> >>> > > >> >> > > > >>> the rolled segment. We can then propagate > >> >>> > > >> >> > > > >>> the `lastStableOffsetLag` information in the > >> >>> > > >> RemoteLogMetadata > >> >>> > > >> >> > > events. > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > >>> While reading the remote log segment, if the > >> >>> > > >> >> `lastStableOffsetLag` > >> >>> > > >> >> > is > >> >>> > > >> >> > > > 0, > >> >>> > > >> >> > > > >>> then there is no need to traverse to > >> >>> > > >> >> > > > >>> the subsequent segments for aborted transactions > >> >>> which > >> >>> > > covers > >> >>> > > >> >> the > >> >>> > > >> >> > > case > >> >>> > > >> >> > > > >>> for the dominant case where the > >> >>> > > >> >> > > > >>> partition had no transactions at all. > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > >>> With Log compaction, the shrinked segments might > >> get > >> >>> > > merged. > >> >>> > > >> One > >> >>> > > >> >> > > option > >> >>> > > >> >> > > > >>> is to take the max of `lastStableOffsetLag` > >> >>> > > >> >> > > > >>> and store it in the new LogSegment. Since, the > >> tiered > >> >>> > > storage > >> >>> > > >> >> does > >> >>> > > >> >> > > not > >> >>> > > >> >> > > > >>> support compacted topics / historical compacted > >> >>> > > >> >> > > > >>> topics, we can omit this case. > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > >>> If this approach looks good, I can update the > KIP > >> >>> with > >> >>> > the > >> >>> > > >> >> details. > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > >>> -- > >> >>> > > >> >> > > > >>> Kamal > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > >>> On Tue, Jun 25, 2024 at 4:24 PM Divij Vaidya < > >> >>> > > >> >> > > divijvaidy...@gmail.com> > >> >>> > > >> >> > > > >>> wrote: > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > >>>> Hi Kamal > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> Thanks for the bump. I have been thinking about > >> this > >> >>> > > >> passively > >> >>> > > >> >> for > >> >>> > > >> >> > > the > >> >>> > > >> >> > > > >>>> past > >> >>> > > >> >> > > > >>>> few days. > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> The simplest solution is to store a state at > >> segment > >> >>> > level > >> >>> > > >> >> > metadata. > >> >>> > > >> >> > > > The > >> >>> > > >> >> > > > >>>> state should specify whether the trx index is > >> empty > >> >>> or > >> >>> > > not. > >> >>> > > >> It > >> >>> > > >> >> > would > >> >>> > > >> >> > > > be > >> >>> > > >> >> > > > >>>> populated during segment archival. We would > then > >> >>> iterate > >> >>> > > >> over > >> >>> > > >> >> the > >> >>> > > >> >> > > > >>>> metadata > >> >>> > > >> >> > > > >>>> for future segments without having to make a > >> remote > >> >>> call > >> >>> > > to > >> >>> > > >> >> > download > >> >>> > > >> >> > > > the > >> >>> > > >> >> > > > >>>> trx index itself. > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> The other solution for storing state at a > >> partition > >> >>> > level > >> >>> > > >> >> wouldn't > >> >>> > > >> >> > > > >>>> work, as > >> >>> > > >> >> > > > >>>> you mentioned, because we will have to change > the > >> >>> state > >> >>> > on > >> >>> > > >> >> every > >> >>> > > >> >> > > > >>>> mutation > >> >>> > > >> >> > > > >>>> to the log i.e. at expiration of segments and > >> >>> append. > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> I have been thinking whether we can do > something > >> >>> better > >> >>> > > than > >> >>> > > >> >> the > >> >>> > > >> >> > > > simple > >> >>> > > >> >> > > > >>>> solution, hence the delay in replying. Let me > >> tell > >> >>> you > >> >>> > my > >> >>> > > >> half > >> >>> > > >> >> > baked > >> >>> > > >> >> > > > >>>> train > >> >>> > > >> >> > > > >>>> of thoughts, perhaps, you can explore this as > >> well. > >> >>> I > >> >>> > have > >> >>> > > >> been > >> >>> > > >> >> > > > thinking > >> >>> > > >> >> > > > >>>> about using LSO (last stable offset) to handle > >> the > >> >>> case > >> >>> > > when > >> >>> > > >> >> the > >> >>> > > >> >> > > > >>>> partition > >> >>> > > >> >> > > > >>>> never had any transactions. For a partition > which > >> >>> never > >> >>> > > had > >> >>> > > >> any > >> >>> > > >> >> > > > >>>> transaction, I would assume that the LSO is > never > >> >>> > > >> initialized > >> >>> > > >> >> (or > >> >>> > > >> >> > is > >> >>> > > >> >> > > > >>>> equal > >> >>> > > >> >> > > > >>>> to log start offset)? Or is it equal to HW in > >> that > >> >>> case? > >> >>> > > >> This > >> >>> > > >> >> is > >> >>> > > >> >> > > > >>>> something > >> >>> > > >> >> > > > >>>> that I am yet to verify. If this idea works, > >> then we > >> >>> > would > >> >>> > > >> not > >> >>> > > >> >> > have > >> >>> > > >> >> > > to > >> >>> > > >> >> > > > >>>> iterate through the metadata for the dominant > >> case > >> >>> where > >> >>> > > the > >> >>> > > >> >> > > partition > >> >>> > > >> >> > > > >>>> had > >> >>> > > >> >> > > > >>>> no transactions at all. > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> -- > >> >>> > > >> >> > > > >>>> Divij Vaidya > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> On Tue, Jun 25, 2024 at 11:42 AM Kamal > >> >>> Chandraprakash < > >> >>> > > >> >> > > > >>>> kamal.chandraprak...@gmail.com> wrote: > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>>> > Bump. Please review this proposal. > >> >>> > > >> >> > > > >>>> > > >> >>> > > >> >> > > > >>>> > > >> >>> > > >> >> > > > >>>> > On Mon, Jun 17, 2024 at 6:55 PM Kamal > >> >>> Chandraprakash < > >> >>> > > >> >> > > > >>>> > kamal.chandraprak...@gmail.com> wrote: > >> >>> > > >> >> > > > >>>> > > >> >>> > > >> >> > > > >>>> > > Divij, > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > Thanks for the review! Updated the KIP with > >> 1, > >> >>> 2, 3, > >> >>> > > >> and 4 > >> >>> > > >> >> > > review > >> >>> > > >> >> > > > >>>> > > comments. > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > > 4. Potential alternative - Instead of > >> having > >> >>> an > >> >>> > > >> algorithm > >> >>> > > >> >> > > where > >> >>> > > >> >> > > > we > >> >>> > > >> >> > > > >>>> > > traverse > >> >>> > > >> >> > > > >>>> > > across segment metadata and looking for > >> >>> > isTxnIdxEmpty > >> >>> > > >> flag, > >> >>> > > >> >> > > should > >> >>> > > >> >> > > > >>>> we > >> >>> > > >> >> > > > >>>> > > directly introduce a > nextSegmentWithTrxInx() > >> >>> > function? > >> >>> > > >> This > >> >>> > > >> >> > > would > >> >>> > > >> >> > > > >>>> allow > >> >>> > > >> >> > > > >>>> > > implementers to optimize the otherwise > linear > >> >>> scan > >> >>> > > >> across > >> >>> > > >> >> > > metadata > >> >>> > > >> >> > > > >>>> for > >> >>> > > >> >> > > > >>>> > all > >> >>> > > >> >> > > > >>>> > > segments by using techniques such as skip > >> list > >> >>> etc. > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > This is a good point to optimize the scan. > We > >> >>> need > >> >>> > to > >> >>> > > >> >> maintain > >> >>> > > >> >> > > the > >> >>> > > >> >> > > > >>>> > > skip-list > >> >>> > > >> >> > > > >>>> > > for each leader-epoch. With unclean leader > >> >>> election, > >> >>> > > >> some > >> >>> > > >> >> > > brokers > >> >>> > > >> >> > > > >>>> may not > >> >>> > > >> >> > > > >>>> > > have > >> >>> > > >> >> > > > >>>> > > the complete lineage. This will expand the > >> >>> scope of > >> >>> > > the > >> >>> > > >> >> work. > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > In this version, we plan to optimize only > for > >> >>> the > >> >>> > > below > >> >>> > > >> 2 > >> >>> > > >> >> > cases: > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > 1. A partition does not have the > transaction > >> >>> index > >> >>> > for > >> >>> > > >> any > >> >>> > > >> >> of > >> >>> > > >> >> > > the > >> >>> > > >> >> > > > >>>> > uploaded > >> >>> > > >> >> > > > >>>> > > segments. > >> >>> > > >> >> > > > >>>> > > The individual log segments > >> `isTxnIdxEmpty` > >> >>> flag > >> >>> > > can > >> >>> > > >> be > >> >>> > > >> >> > > reduced > >> >>> > > >> >> > > > >>>> to a > >> >>> > > >> >> > > > >>>> > > single flag > >> >>> > > >> >> > > > >>>> > > in RLMM (using AND operator) that can > >> serve > >> >>> the > >> >>> > > >> query - > >> >>> > > >> >> "Is > >> >>> > > >> >> > > all > >> >>> > > >> >> > > > >>>> the > >> >>> > > >> >> > > > >>>> > > transaction indexes empty for a > partition?". > >> >>> > > >> >> > > > >>>> > > If yes, then we can directly scan the > >> >>> local-log > >> >>> > for > >> >>> > > >> >> aborted > >> >>> > > >> >> > > > >>>> > > transactions. > >> >>> > > >> >> > > > >>>> > > 2. A partition is produced using the > >> >>> transactional > >> >>> > > >> >> producer. > >> >>> > > >> >> > The > >> >>> > > >> >> > > > >>>> > > assumption made is that > >> >>> > > >> >> > > > >>>> > > the transaction will either > >> commit/rollback > >> >>> > within > >> >>> > > >> 15 > >> >>> > > >> >> > > minutes > >> >>> > > >> >> > > > >>>> > > (default transaction.max.timeout.ms = > 15 > >> >>> mins), > >> >>> > > >> >> possibly > >> >>> > > >> >> > we > >> >>> > > >> >> > > > >>>> may have > >> >>> > > >> >> > > > >>>> > > to search only > >> >>> > > >> >> > > > >>>> > > a few consecutive remote log segments > to > >> >>> collect > >> >>> > > the > >> >>> > > >> >> > aborted > >> >>> > > >> >> > > > >>>> > > transactions. > >> >>> > > >> >> > > > >>>> > > 3. A partition is being produced with both > >> >>> normal > >> >>> > and > >> >>> > > >> >> > > > transactional > >> >>> > > >> >> > > > >>>> > > producers. In this case, > >> >>> > > >> >> > > > >>>> > > we will be doing linear traversal. > >> >>> Maintaining a > >> >>> > > >> >> skip-list > >> >>> > > >> >> > > > might > >> >>> > > >> >> > > > >>>> > > improve the performance but > >> >>> > > >> >> > > > >>>> > > we delegate the RLMM implementation to > >> >>> users. If > >> >>> > > >> >> > implemented > >> >>> > > >> >> > > > >>>> > > incorrectly, then it can lead > >> >>> > > >> >> > > > >>>> > > to delivery of the aborted transaction > >> >>> records > >> >>> > to > >> >>> > > >> the > >> >>> > > >> >> > > > consumer. > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > I notice two drawbacks with the reduction > >> >>> method as > >> >>> > > >> >> proposed > >> >>> > > >> >> > in > >> >>> > > >> >> > > > the > >> >>> > > >> >> > > > >>>> KIP: > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > 1. Even if one segment has a transaction > >> index, > >> >>> then > >> >>> > > we > >> >>> > > >> >> have > >> >>> > > >> >> > to > >> >>> > > >> >> > > > >>>> iterate > >> >>> > > >> >> > > > >>>> > > over all the metadata events. > >> >>> > > >> >> > > > >>>> > > 2. Assume that there are 10 segments and > >> >>> segment-5 > >> >>> > > has a > >> >>> > > >> >> txn > >> >>> > > >> >> > > > index. > >> >>> > > >> >> > > > >>>> Once > >> >>> > > >> >> > > > >>>> > > the first 6 segments are deleted, > >> >>> > > >> >> > > > >>>> > > due to breach by > time/size/start-offset, > >> >>> then we > >> >>> > > >> should > >> >>> > > >> >> > > return > >> >>> > > >> >> > > > >>>> `true` > >> >>> > > >> >> > > > >>>> > > for "Is all the transaction indexes empty > >> for a > >> >>> > > >> partition?" > >> >>> > > >> >> > > > >>>> > > query but it will return `false` until > the > >> >>> broker > >> >>> > > >> gets > >> >>> > > >> >> > > > restarted > >> >>> > > >> >> > > > >>>> and > >> >>> > > >> >> > > > >>>> > we > >> >>> > > >> >> > > > >>>> > > have to resort to iterate over all the > >> metadata > >> >>> > > events. > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > > 5. Potential alternative#2 - We know that > >> we > >> >>> may > >> >>> > > want > >> >>> > > >> the > >> >>> > > >> >> > > > indexes > >> >>> > > >> >> > > > >>>> of > >> >>> > > >> >> > > > >>>> > > multiple higher segments. Instead of > fetching > >> >>> them > >> >>> > > >> >> > sequentially, > >> >>> > > >> >> > > > we > >> >>> > > >> >> > > > >>>> could > >> >>> > > >> >> > > > >>>> > > implement a parallel fetch or a pre-fetch > for > >> >>> the > >> >>> > > >> indexes. > >> >>> > > >> >> > This > >> >>> > > >> >> > > > >>>> would > >> >>> > > >> >> > > > >>>> > help > >> >>> > > >> >> > > > >>>> > > hide the latency of sequentially fetching > the > >> >>> trx > >> >>> > > >> indexes. > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > We can implement parallel-fetch/prefetch > once > >> >>> the > >> >>> > > tiered > >> >>> > > >> >> > storage > >> >>> > > >> >> > > > is > >> >>> > > >> >> > > > >>>> GAed. > >> >>> > > >> >> > > > >>>> > > Since this feature will be useful > >> >>> > > >> >> > > > >>>> > > to prefetch the next remote log segment and > >> it > >> >>> > expands > >> >>> > > >> the > >> >>> > > >> >> > scope > >> >>> > > >> >> > > > of > >> >>> > > >> >> > > > >>>> the > >> >>> > > >> >> > > > >>>> > > work. > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > > 6. Should the proposed API take > "segmentId" > >> >>> as a > >> >>> > > >> >> parameter > >> >>> > > >> >> > > > >>>> instead of > >> >>> > > >> >> > > > >>>> > > "topicIdPartition"? Suggesting because > >> >>> isTxnIdEmpty > >> >>> > is > >> >>> > > >> not > >> >>> > > >> >> a > >> >>> > > >> >> > > > >>>> property of > >> >>> > > >> >> > > > >>>> > a > >> >>> > > >> >> > > > >>>> > > partition, instead it's a property of a > >> specific > >> >>> > > >> segment. > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > We propose to use the `topicIdPartition` in > >> >>> > > >> >> > > > >>>> RemoteLogMetadataManager. > >> >>> > > >> >> > > > >>>> > > The implementation can fold/reduce the > value > >> of > >> >>> the > >> >>> > > >> >> individual > >> >>> > > >> >> > > log > >> >>> > > >> >> > > > >>>> > segment > >> >>> > > >> >> > > > >>>> > > `isTxnIdEmpty` flag. This is added to avoid > >> >>> scanning > >> >>> > > all > >> >>> > > >> >> the > >> >>> > > >> >> > > > >>>> metadata > >> >>> > > >> >> > > > >>>> > > events > >> >>> > > >> >> > > > >>>> > > when the partition does not have a > >> transaction > >> >>> index > >> >>> > > in > >> >>> > > >> >> any of > >> >>> > > >> >> > > the > >> >>> > > >> >> > > > >>>> > > segments. > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > On Mon, Jun 17, 2024 at 4:05 PM Divij > Vaidya > >> < > >> >>> > > >> >> > > > >>>> divijvaidy...@gmail.com> > >> >>> > > >> >> > > > >>>> > > wrote: > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > >> Hi Kamal > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > >> Thanks for bringing this up. This is a > >> problem > >> >>> > worth > >> >>> > > >> >> solving. > >> >>> > > >> >> > > We > >> >>> > > >> >> > > > >>>> have > >> >>> > > >> >> > > > >>>> > >> faced > >> >>> > > >> >> > > > >>>> > >> this in situations where some Kafka > clients > >> >>> default > >> >>> > > to > >> >>> > > >> >> > > > >>>> read_committed > >> >>> > > >> >> > > > >>>> > mode > >> >>> > > >> >> > > > >>>> > >> and end up having high latencies for > remote > >> >>> fetches > >> >>> > > >> due to > >> >>> > > >> >> > this > >> >>> > > >> >> > > > >>>> > traversal > >> >>> > > >> >> > > > >>>> > >> across all segments. > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > >> First some nits to clarify the KIP: > >> >>> > > >> >> > > > >>>> > >> 1. The motivation should make it clear > that > >> >>> > traversal > >> >>> > > >> of > >> >>> > > >> >> all > >> >>> > > >> >> > > > >>>> segments is > >> >>> > > >> >> > > > >>>> > >> only in the worst case. If I am not > mistaken > >> >>> > (please > >> >>> > > >> >> correct > >> >>> > > >> >> > me > >> >>> > > >> >> > > > if > >> >>> > > >> >> > > > >>>> > wrong), > >> >>> > > >> >> > > > >>>> > >> the traversal stops when it has found a > >> segment > >> >>> > > >> containing > >> >>> > > >> >> > LSO. > >> >>> > > >> >> > > > >>>> > >> 2. There is nothing like a non-txn topic. > A > >> >>> > > transaction > >> >>> > > >> >> may > >> >>> > > >> >> > be > >> >>> > > >> >> > > > >>>> started > >> >>> > > >> >> > > > >>>> > on > >> >>> > > >> >> > > > >>>> > >> any topic. Perhaps, rephrase the statement > >> in > >> >>> the > >> >>> > KIP > >> >>> > > >> so > >> >>> > > >> >> that > >> >>> > > >> >> > > it > >> >>> > > >> >> > > > is > >> >>> > > >> >> > > > >>>> > clear > >> >>> > > >> >> > > > >>>> > >> to the reader. > >> >>> > > >> >> > > > >>>> > >> 3. The hyperlink in the "the broker has to > >> >>> traverse > >> >>> > > all > >> >>> > > >> >> > the..." > >> >>> > > >> >> > > > >>>> seems > >> >>> > > >> >> > > > >>>> > >> incorrect. Did you want to point to > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > > >> >>> > > >> >> > > > >> >>> > > >> >> > > >> >>> > > >> >> > >> >>> > > >> > >> >>> > > > >> >>> > > >> >>> > >> > https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444 > >> >>> > > >> >> > > > >>>> > >> ? > >> >>> > > >> >> > > > >>>> > >> 4. In the testing section, could we add a > >> test > >> >>> > plan? > >> >>> > > >> For > >> >>> > > >> >> > > > example, I > >> >>> > > >> >> > > > >>>> > would > >> >>> > > >> >> > > > >>>> > >> list down adding a test which would verify > >> the > >> >>> > number > >> >>> > > >> of > >> >>> > > >> >> > calls > >> >>> > > >> >> > > > >>>> made to > >> >>> > > >> >> > > > >>>> > >> RLMM. This test would have a higher number > >> of > >> >>> calls > >> >>> > > >> >> earlier > >> >>> > > >> >> > vs. > >> >>> > > >> >> > > > >>>> after > >> >>> > > >> >> > > > >>>> > this > >> >>> > > >> >> > > > >>>> > >> KIP. > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > >> Other thoughts: > >> >>> > > >> >> > > > >>>> > >> 4. Potential alternative - Instead of > >> having an > >> >>> > > >> algorithm > >> >>> > > >> >> > where > >> >>> > > >> >> > > > we > >> >>> > > >> >> > > > >>>> > >> traverse > >> >>> > > >> >> > > > >>>> > >> across segment metadata and looking for > >> >>> > isTxnIdxEmpty > >> >>> > > >> >> flag, > >> >>> > > >> >> > > > should > >> >>> > > >> >> > > > >>>> we > >> >>> > > >> >> > > > >>>> > >> directly introduce a > nextSegmentWithTrxInx() > >> >>> > > function? > >> >>> > > >> >> This > >> >>> > > >> >> > > would > >> >>> > > >> >> > > > >>>> allow > >> >>> > > >> >> > > > >>>> > >> implementers to optimize the otherwise > >> linear > >> >>> scan > >> >>> > > >> across > >> >>> > > >> >> > > > metadata > >> >>> > > >> >> > > > >>>> for > >> >>> > > >> >> > > > >>>> > all > >> >>> > > >> >> > > > >>>> > >> segments by using techniques such as skip > >> list > >> >>> etc. > >> >>> > > >> >> > > > >>>> > >> 5. Potential alternative#2 - We know that > we > >> >>> may > >> >>> > want > >> >>> > > >> the > >> >>> > > >> >> > > indexes > >> >>> > > >> >> > > > >>>> of > >> >>> > > >> >> > > > >>>> > >> multiple higher segments. Instead of > >> fetching > >> >>> them > >> >>> > > >> >> > > sequentially, > >> >>> > > >> >> > > > we > >> >>> > > >> >> > > > >>>> > could > >> >>> > > >> >> > > > >>>> > >> implement a parallel fetch or a pre-fetch > >> for > >> >>> the > >> >>> > > >> indexes. > >> >>> > > >> >> > This > >> >>> > > >> >> > > > >>>> would > >> >>> > > >> >> > > > >>>> > help > >> >>> > > >> >> > > > >>>> > >> hide the latency of sequentially fetching > >> the > >> >>> trx > >> >>> > > >> indexes. > >> >>> > > >> >> > > > >>>> > >> 6. Should the proposed API take > "segmentId" > >> as > >> >>> a > >> >>> > > >> parameter > >> >>> > > >> >> > > > instead > >> >>> > > >> >> > > > >>>> of > >> >>> > > >> >> > > > >>>> > >> "topicIdPartition"? Suggesting because > >> >>> isTxnIdEmpty > >> >>> > > is > >> >>> > > >> >> not a > >> >>> > > >> >> > > > >>>> property > >> >>> > > >> >> > > > >>>> > of a > >> >>> > > >> >> > > > >>>> > >> partition, instead it's a property of a > >> >>> specific > >> >>> > > >> segment. > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > >> Looking forward to hearing your thoughts > >> about > >> >>> the > >> >>> > > >> >> > > alternatives. > >> >>> > > >> >> > > > >>>> Let's > >> >>> > > >> >> > > > >>>> > get > >> >>> > > >> >> > > > >>>> > >> this fixed. > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > >> -- > >> >>> > > >> >> > > > >>>> > >> Divij Vaidya > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal > >> >>> > > Chandraprakash < > >> >>> > > >> >> > > > >>>> > >> kamal.chandraprak...@gmail.com> wrote: > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > >> > Hi all, > >> >>> > > >> >> > > > >>>> > >> > > >> >>> > > >> >> > > > >>>> > >> > I have opened a KIP-1058 > >> >>> > > >> >> > > > >>>> > >> > < > >> >>> > > >> >> > > > >>>> > >> > > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > > >> >>> > > >> >> > > > >> >>> > > >> >> > > >> >>> > > >> >> > >> >>> > > >> > >> >>> > > > >> >>> > > >> >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic > >> >>> > > >> >> > > > >>>> > >> > > > >> >>> > > >> >> > > > >>>> > >> > to reduce the pressure on remote storage > >> when > >> >>> > > >> >> transactional > >> >>> > > >> >> > > > >>>> consumers > >> >>> > > >> >> > > > >>>> > >> are > >> >>> > > >> >> > > > >>>> > >> > reading non-txn topics from remote > >> storage. > >> >>> > > >> >> > > > >>>> > >> > > >> >>> > > >> >> > > > >>>> > >> > > >> >>> > > >> >> > > > >>>> > >> > > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > > >> >>> > > >> >> > > > >> >>> > > >> >> > > >> >>> > > >> >> > >> >>> > > >> > >> >>> > > > >> >>> > > >> >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic > >> >>> > > >> >> > > > >>>> > >> > > >> >>> > > >> >> > > > >>>> > >> > Feedbacks and suggestions are welcome. > >> >>> > > >> >> > > > >>>> > >> > > >> >>> > > >> >> > > > >>>> > >> > Thanks, > >> >>> > > >> >> > > > >>>> > >> > Kamal > >> >>> > > >> >> > > > >>>> > >> > > >> >>> > > >> >> > > > >>>> > >> > >> >>> > > >> >> > > > >>>> > > > >> >>> > > >> >> > > > >>>> > > >> >>> > > >> >> > > > >>>> > >> >>> > > >> >> > > > >>> > >> >>> > > >> >> > > > > >> >>> > > >> >> > > > >> >>> > > >> >> > > >> >>> > > >> >> > >> >>> > > >> > > >> >>> > > >> > >> >>> > > > > >> >>> > > > >> >>> > > >> >>> > >> >> > >> > > >