tzulitai opened a new pull request, #15:
URL: https://github.com/apache/flink-connector-kafka/pull/15

   This PR fixes FLINK-31363 by changing how we handle empty transactions on 
`prepareCommit`.
   
   Previously, regardless of whether the current transaction is empty or 
non-empty, we always emit a `KafkaCommittable` for it to be checkpointed by the 
`CommitterOperator`. The issue: on restore, when we resume the transaction and 
commit it, we recreate a `FlinkKafkaInternalProducer` that always has the 
internal `transactionStarted` flag set to `true`, which means that an 
`EndTxnRequest` will be sent to the brokers for committing the transaction. 
This results in an `InvalidTxnState` error since on the broker side the 
transaction hasn't actually been started yet (transactions are lazily started 
on brokers on the first record sent).
   
   I've considered two possible ways to address this:
   
   1. Store the `transactionStarted` flag in a `KafkaCommittable` alongside 
other txn metadata. Then, on restore, on the recreated producer, we set the 
internal `transactionStarted` accordingly to what the checkpoint says.
   2. Never checkpoint a `KafkaCommitable` if the transaction is empty. In this 
case, any `KafkaCommittable` restored from a checkpoint always has some data in 
them, and therefore it is correct to always set the internal 
`transactionStarted` flag to `true` on the recreated producer.
   
   This PR chooses to go with approach 2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to