dao-jun commented on PR #23940:
URL: https://github.com/apache/pulsar/pull/23940#issuecomment-2643481585
> @dao-jun I found this issue when I implemented the Kafka transaction. In
Kafka, `analyzeAndValidateProducerState` is called before writing messages,
this method will update a map that maps the next message's offset (key) to an
object and it will remove the key after the record batch is written.
>
> For example, assuming there are 3 ongoing writes, the flow looks like the
following pseudo code:
>
> ```java
> for (int i = 0; i < 3; i++) {
> long nextOffset = interceptor.getIndex() + 1; // LEO
> ongoingTxns.put(nextOffset, txn); // record the ongoing txn of this
record batch
> asyncAddEntry().thenAccept(__ -> ongoingTxns.remove(nextOffset));
> }
> ```
>
> Assuming each record batch has only 1 message, ideally, after these 3
writes, `ongoingTxns` will have 3 keys (0, 1, 2). However, since
`asyncAddEntry` switches thread to the ML's executor to call interceptor's
`beforeAddEntry` method, there is a chance that `nextOffset` is always 0 in
these 3 loops and `ongoingTxns.remove(nextOffset)` will take effect only once.
Then it could hit
[here](https://github.com/streamnative/kop/blob/14f9832d8a437c5b717e3c74fd7e4cda9810aff7/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java#L312)
in the actual code, which is much different from the OSS KoP.
>
> The details above are beyond the scope of this PR but could help you
understand the motivation.
Thanks for your explain!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]