BewareMyPower commented on PR #23940:
URL: https://github.com/apache/pulsar/pull/23940#issuecomment-2642000795
@dao-jun I found this issue when I implemented the Kafka transaction. In
Kafka, `analyzeAndValidateProducerState` is called before writing messages,
this method will update a map that maps the next message's offset (key) to an
object and it will remove the key after the record batch is written.
For example, assuming there are 3 ongoing writes, the flow looks like the
following pseudo code:
```java
for (int i = 0; i < 3; i++) {
long nextOffset = interceptor.getIndex() + 1; // LEO
ongoingTxns.put(nextOffset, txn); // record the ongoing txn of this
record batch
asyncAddEntry().thenAccept(__ -> ongoingTxns.remove(nextOffset));
}
```
Assuming each record batch has only 1 message, ideally, after these 3
writes, `ongoingTxns` will have 3 keys (0, 1, 2). However, since
`asyncAddEntry` switches thread to the ML's executor to call interceptor's
`beforeAddEntry` method, there is a chance that `nextOffset` is always 0 in
these 3 loops and `ongoingTxns.remove(nextOffset)` will take effect only once.
Then it could hit
[here](https://github.com/streamnative/kop/blob/14f9832d8a437c5b717e3c74fd7e4cda9810aff7/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java#L312)
in the actual code.
The details above are beyond the scope of this PR but could help you
understand the motivation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]