Heya, Apologies for the delay. I have been thinking about this problem recently as well and while I believe storing a boolean in the metadata is good, I think we can do better by introducing a new method to the RLMM along the lines of
Optional<RemoteLogSegmentMetadata> nextRemoteLogSegmentMetadataWithTxnIndex(TopicIdPartition topicIdPartition, int epochForOffset, long offset) throws RemoteStorageException This will help plugin implementers to build optimisations such as skip lists which will give them the next segment quicker than a linear search. I am keen to hear your thoughts! Best, Christo On Fri, 4 Oct 2024 at 10:48, Kamal Chandraprakash < kamal.chandraprak...@gmail.com> wrote: > Hi Luke, > > Thanks for the review! > > > Do you think it is helpful if we store the "least abort start offset in > the > segment", and -1 means no txnIndex. So that we can have a way to know if we > need to fetch this txn index or not. > > 1. No, this change won't have an effect. To find the upper-bound offset > [1], we have to > fetch that segment's offset index file. The RemoteIndexCache [2] > fetches all the 3 > index files together and caches them for subsequent use, so this > improvement > won't have an effect as the current segment txn index gets downloaded > anyway. > > 2. The reason for choosing boolean is to make the change backward > compatible. > There can be existing RLM events for the uploaded segments. The > default > value of `txnIdxEmpty` is false so the *old* RLM events are assumed to > contain > the txn index files and those files are downloaded if they exist. > > [1]: > > https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1732 > [2]: > > https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java?L383 > > Thanks, > Kamal > > On Thu, Oct 3, 2024 at 3:11 PM Luke Chen <show...@gmail.com> wrote: > > > Hi Kamal, > > > > Sorry for the late review. > > Thanks for the KIP, this will improve the transaction reading for remote > > storage. > > Overall LGTM, just one minor thought: > > > > Currently, we only store the `TxnIndexEmpty` bool value in the segment > > metadata. > > Do you think it is helpful if we store the "least abort start offset in > the > > segment", and -1 means no txnIndex. So that we can have a way to know if > we > > need to fetch this txn index or not. > > > > Thanks. > > Luke > > > > On Mon, Sep 9, 2024 at 3:26 PM Kamal Chandraprakash < > > kamal.chandraprak...@gmail.com> wrote: > > > > > Hi all, > > > > > > If there are no more comments, I'll start a voting thread soon. > > > > > > Thanks, > > > Kamal > > > > > > On Fri, Sep 6, 2024 at 7:28 PM Kamal Chandraprakash < > > > kamal.chandraprak...@gmail.com> wrote: > > > > > > > Bumping this thread again for review! > > > > > > > > Reduced the scope of the proposal to minimum. We will be adding only > > one > > > > field (txnIdxEmpty) to the > > > > RemoteLogSegmentMetadata event which is backward compatible. PTAL. > > > > > > > > Thanks, > > > > Kamal > > > > > > > > > > > > On Tue, Aug 13, 2024 at 6:33 PM Kamal Chandraprakash < > > > > kamal.chandraprak...@gmail.com> wrote: > > > > > > > >> Bumping this thread for KIP review! > > > >> > > > >> We can go for the simplest solution that is proposed in this KIP and > > > >> it can be improved in the subsequent iteration. PTAL. > > > >> > > > >> Thanks, > > > >> Kamal > > > >> > > > >> On Fri, Aug 2, 2024 at 11:42 AM Kamal Chandraprakash < > > > >> kamal.chandraprak...@gmail.com> wrote: > > > >> > > > >>> Hi Divij, > > > >>> > > > >>> Thanks for the review! And, sorry for the late reply. > > > >>> > > > >>> From the UnifiedLog.scala > > > >>> < > > > > > > https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L421-427 > > > > > > > >>> doc: > > > >>> > > > >>> """ > > > >>> The last stable offset (LSO) is defined as the first offset such > that > > > >>> all lower offsets have been "decided." > > > >>> * Non-transactional messages are considered decided immediately, > > but > > > >>> transactional messages are only decided when > > > >>> * the corresponding COMMIT or ABORT marker is written. This > > implies > > > >>> that the last stable offset will be equal > > > >>> * to the high watermark if there are no transactional messages > in > > > the > > > >>> log. Note also that the LSO cannot advance > > > >>> * beyond the high watermark. > > > >>> """ > > > >>> While rolling the active segment to passive, if LSO equals to HW, > > then > > > >>> all the messages in that segment are > > > >>> decided and we can store the `lastStableOffsetLag` as an attribute > in > > > >>> the rolled segment. We can then propagate > > > >>> the `lastStableOffsetLag` information in the RemoteLogMetadata > > events. > > > >>> > > > >>> While reading the remote log segment, if the `lastStableOffsetLag` > is > > > 0, > > > >>> then there is no need to traverse to > > > >>> the subsequent segments for aborted transactions which covers the > > case > > > >>> for the dominant case where the > > > >>> partition had no transactions at all. > > > >>> > > > >>> With Log compaction, the shrinked segments might get merged. One > > option > > > >>> is to take the max of `lastStableOffsetLag` > > > >>> and store it in the new LogSegment. Since, the tiered storage does > > not > > > >>> support compacted topics / historical compacted > > > >>> topics, we can omit this case. > > > >>> > > > >>> If this approach looks good, I can update the KIP with the details. > > > >>> > > > >>> -- > > > >>> Kamal > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> On Tue, Jun 25, 2024 at 4:24 PM Divij Vaidya < > > divijvaidy...@gmail.com> > > > >>> wrote: > > > >>> > > > >>>> Hi Kamal > > > >>>> > > > >>>> Thanks for the bump. I have been thinking about this passively for > > the > > > >>>> past > > > >>>> few days. > > > >>>> > > > >>>> The simplest solution is to store a state at segment level > metadata. > > > The > > > >>>> state should specify whether the trx index is empty or not. It > would > > > be > > > >>>> populated during segment archival. We would then iterate over the > > > >>>> metadata > > > >>>> for future segments without having to make a remote call to > download > > > the > > > >>>> trx index itself. > > > >>>> > > > >>>> The other solution for storing state at a partition level wouldn't > > > >>>> work, as > > > >>>> you mentioned, because we will have to change the state on every > > > >>>> mutation > > > >>>> to the log i.e. at expiration of segments and append. > > > >>>> > > > >>>> I have been thinking whether we can do something better than the > > > simple > > > >>>> solution, hence the delay in replying. Let me tell you my half > baked > > > >>>> train > > > >>>> of thoughts, perhaps, you can explore this as well. I have been > > > thinking > > > >>>> about using LSO (last stable offset) to handle the case when the > > > >>>> partition > > > >>>> never had any transactions. For a partition which never had any > > > >>>> transaction, I would assume that the LSO is never initialized (or > is > > > >>>> equal > > > >>>> to log start offset)? Or is it equal to HW in that case? This is > > > >>>> something > > > >>>> that I am yet to verify. If this idea works, then we would not > have > > to > > > >>>> iterate through the metadata for the dominant case where the > > partition > > > >>>> had > > > >>>> no transactions at all. > > > >>>> > > > >>>> -- > > > >>>> Divij Vaidya > > > >>>> > > > >>>> > > > >>>> > > > >>>> On Tue, Jun 25, 2024 at 11:42 AM Kamal Chandraprakash < > > > >>>> kamal.chandraprak...@gmail.com> wrote: > > > >>>> > > > >>>> > Bump. Please review this proposal. > > > >>>> > > > > >>>> > > > > >>>> > On Mon, Jun 17, 2024 at 6:55 PM Kamal Chandraprakash < > > > >>>> > kamal.chandraprak...@gmail.com> wrote: > > > >>>> > > > > >>>> > > Divij, > > > >>>> > > > > > >>>> > > Thanks for the review! Updated the KIP with 1, 2, 3, and 4 > > review > > > >>>> > > comments. > > > >>>> > > > > > >>>> > > > 4. Potential alternative - Instead of having an algorithm > > where > > > we > > > >>>> > > traverse > > > >>>> > > across segment metadata and looking for isTxnIdxEmpty flag, > > should > > > >>>> we > > > >>>> > > directly introduce a nextSegmentWithTrxInx() function? This > > would > > > >>>> allow > > > >>>> > > implementers to optimize the otherwise linear scan across > > metadata > > > >>>> for > > > >>>> > all > > > >>>> > > segments by using techniques such as skip list etc. > > > >>>> > > > > > >>>> > > This is a good point to optimize the scan. We need to maintain > > the > > > >>>> > > skip-list > > > >>>> > > for each leader-epoch. With unclean leader election, some > > brokers > > > >>>> may not > > > >>>> > > have > > > >>>> > > the complete lineage. This will expand the scope of the work. > > > >>>> > > > > > >>>> > > In this version, we plan to optimize only for the below 2 > cases: > > > >>>> > > > > > >>>> > > 1. A partition does not have the transaction index for any of > > the > > > >>>> > uploaded > > > >>>> > > segments. > > > >>>> > > The individual log segments `isTxnIdxEmpty` flag can be > > reduced > > > >>>> to a > > > >>>> > > single flag > > > >>>> > > in RLMM (using AND operator) that can serve the query - "Is > > all > > > >>>> the > > > >>>> > > transaction indexes empty for a partition?". > > > >>>> > > If yes, then we can directly scan the local-log for aborted > > > >>>> > > transactions. > > > >>>> > > 2. A partition is produced using the transactional producer. > The > > > >>>> > > assumption made is that > > > >>>> > > the transaction will either commit/rollback within 15 > > minutes > > > >>>> > > (default transaction.max.timeout.ms = 15 mins), possibly > we > > > >>>> may have > > > >>>> > > to search only > > > >>>> > > a few consecutive remote log segments to collect the > aborted > > > >>>> > > transactions. > > > >>>> > > 3. A partition is being produced with both normal and > > > transactional > > > >>>> > > producers. In this case, > > > >>>> > > we will be doing linear traversal. Maintaining a skip-list > > > might > > > >>>> > > improve the performance but > > > >>>> > > we delegate the RLMM implementation to users. If > implemented > > > >>>> > > incorrectly, then it can lead > > > >>>> > > to delivery of the aborted transaction records to the > > > consumer. > > > >>>> > > > > > >>>> > > I notice two drawbacks with the reduction method as proposed > in > > > the > > > >>>> KIP: > > > >>>> > > > > > >>>> > > 1. Even if one segment has a transaction index, then we have > to > > > >>>> iterate > > > >>>> > > over all the metadata events. > > > >>>> > > 2. Assume that there are 10 segments and segment-5 has a txn > > > index. > > > >>>> Once > > > >>>> > > the first 6 segments are deleted, > > > >>>> > > due to breach by time/size/start-offset, then we should > > return > > > >>>> `true` > > > >>>> > > for "Is all the transaction indexes empty for a partition?" > > > >>>> > > query but it will return `false` until the broker gets > > > restarted > > > >>>> and > > > >>>> > we > > > >>>> > > have to resort to iterate over all the metadata events. > > > >>>> > > > > > >>>> > > > 5. Potential alternative#2 - We know that we may want the > > > indexes > > > >>>> of > > > >>>> > > multiple higher segments. Instead of fetching them > sequentially, > > > we > > > >>>> could > > > >>>> > > implement a parallel fetch or a pre-fetch for the indexes. > This > > > >>>> would > > > >>>> > help > > > >>>> > > hide the latency of sequentially fetching the trx indexes. > > > >>>> > > > > > >>>> > > We can implement parallel-fetch/prefetch once the tiered > storage > > > is > > > >>>> GAed. > > > >>>> > > Since this feature will be useful > > > >>>> > > to prefetch the next remote log segment and it expands the > scope > > > of > > > >>>> the > > > >>>> > > work. > > > >>>> > > > > > >>>> > > > 6. Should the proposed API take "segmentId" as a parameter > > > >>>> instead of > > > >>>> > > "topicIdPartition"? Suggesting because isTxnIdEmpty is not a > > > >>>> property of > > > >>>> > a > > > >>>> > > partition, instead it's a property of a specific segment. > > > >>>> > > > > > >>>> > > We propose to use the `topicIdPartition` in > > > >>>> RemoteLogMetadataManager. > > > >>>> > > The implementation can fold/reduce the value of the individual > > log > > > >>>> > segment > > > >>>> > > `isTxnIdEmpty` flag. This is added to avoid scanning all the > > > >>>> metadata > > > >>>> > > events > > > >>>> > > when the partition does not have a transaction index in any of > > the > > > >>>> > > segments. > > > >>>> > > > > > >>>> > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya < > > > >>>> divijvaidy...@gmail.com> > > > >>>> > > wrote: > > > >>>> > > > > > >>>> > >> Hi Kamal > > > >>>> > >> > > > >>>> > >> Thanks for bringing this up. This is a problem worth solving. > > We > > > >>>> have > > > >>>> > >> faced > > > >>>> > >> this in situations where some Kafka clients default to > > > >>>> read_committed > > > >>>> > mode > > > >>>> > >> and end up having high latencies for remote fetches due to > this > > > >>>> > traversal > > > >>>> > >> across all segments. > > > >>>> > >> > > > >>>> > >> First some nits to clarify the KIP: > > > >>>> > >> 1. The motivation should make it clear that traversal of all > > > >>>> segments is > > > >>>> > >> only in the worst case. If I am not mistaken (please correct > me > > > if > > > >>>> > wrong), > > > >>>> > >> the traversal stops when it has found a segment containing > LSO. > > > >>>> > >> 2. There is nothing like a non-txn topic. A transaction may > be > > > >>>> started > > > >>>> > on > > > >>>> > >> any topic. Perhaps, rephrase the statement in the KIP so that > > it > > > is > > > >>>> > clear > > > >>>> > >> to the reader. > > > >>>> > >> 3. The hyperlink in the "the broker has to traverse all > the..." > > > >>>> seems > > > >>>> > >> incorrect. Did you want to point to > > > >>>> > >> > > > >>>> > >> > > > >>>> > > > > >>>> > > > > > > https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444 > > > >>>> > >> ? > > > >>>> > >> 4. In the testing section, could we add a test plan? For > > > example, I > > > >>>> > would > > > >>>> > >> list down adding a test which would verify the number of > calls > > > >>>> made to > > > >>>> > >> RLMM. This test would have a higher number of calls earlier > vs. > > > >>>> after > > > >>>> > this > > > >>>> > >> KIP. > > > >>>> > >> > > > >>>> > >> Other thoughts: > > > >>>> > >> 4. Potential alternative - Instead of having an algorithm > where > > > we > > > >>>> > >> traverse > > > >>>> > >> across segment metadata and looking for isTxnIdxEmpty flag, > > > should > > > >>>> we > > > >>>> > >> directly introduce a nextSegmentWithTrxInx() function? This > > would > > > >>>> allow > > > >>>> > >> implementers to optimize the otherwise linear scan across > > > metadata > > > >>>> for > > > >>>> > all > > > >>>> > >> segments by using techniques such as skip list etc. > > > >>>> > >> 5. Potential alternative#2 - We know that we may want the > > indexes > > > >>>> of > > > >>>> > >> multiple higher segments. Instead of fetching them > > sequentially, > > > we > > > >>>> > could > > > >>>> > >> implement a parallel fetch or a pre-fetch for the indexes. > This > > > >>>> would > > > >>>> > help > > > >>>> > >> hide the latency of sequentially fetching the trx indexes. > > > >>>> > >> 6. Should the proposed API take "segmentId" as a parameter > > > instead > > > >>>> of > > > >>>> > >> "topicIdPartition"? Suggesting because isTxnIdEmpty is not a > > > >>>> property > > > >>>> > of a > > > >>>> > >> partition, instead it's a property of a specific segment. > > > >>>> > >> > > > >>>> > >> Looking forward to hearing your thoughts about the > > alternatives. > > > >>>> Let's > > > >>>> > get > > > >>>> > >> this fixed. > > > >>>> > >> > > > >>>> > >> -- > > > >>>> > >> Divij Vaidya > > > >>>> > >> > > > >>>> > >> > > > >>>> > >> > > > >>>> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal Chandraprakash < > > > >>>> > >> kamal.chandraprak...@gmail.com> wrote: > > > >>>> > >> > > > >>>> > >> > Hi all, > > > >>>> > >> > > > > >>>> > >> > I have opened a KIP-1058 > > > >>>> > >> > < > > > >>>> > >> > > > > >>>> > >> > > > >>>> > > > > >>>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic > > > >>>> > >> > > > > > >>>> > >> > to reduce the pressure on remote storage when transactional > > > >>>> consumers > > > >>>> > >> are > > > >>>> > >> > reading non-txn topics from remote storage. > > > >>>> > >> > > > > >>>> > >> > > > > >>>> > >> > > > > >>>> > >> > > > >>>> > > > > >>>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic > > > >>>> > >> > > > > >>>> > >> > Feedbacks and suggestions are welcome. > > > >>>> > >> > > > > >>>> > >> > Thanks, > > > >>>> > >> > Kamal > > > >>>> > >> > > > > >>>> > >> > > > >>>> > > > > > >>>> > > > > >>>> > > > >>> > > > > > >