tzulitai opened a new pull request, #15: URL: https://github.com/apache/flink-connector-kafka/pull/15
This PR fixes FLINK-31363 by changing how we handle empty transactions on `prepareCommit`. Previously, regardless of whether the current transaction is empty or non-empty, we always emit a `KafkaCommittable` for it to be checkpointed by the `CommitterOperator`. The issue: on restore, when we resume the transaction and commit it, we recreate a `FlinkKafkaInternalProducer` that always has the internal `transactionStarted` flag set to `true`, which means that an `EndTxnRequest` will be sent to the brokers for committing the transaction. This results in an `InvalidTxnState` error since on the broker side the transaction hasn't actually been started yet (transactions are lazily started on brokers on the first record sent). I've considered two possible ways to address this: 1. Store the `transactionStarted` flag in a `KafkaCommittable` alongside other txn metadata. Then, on restore, on the recreated producer, we set the internal `transactionStarted` accordingly to what the checkpoint says. 2. Never checkpoint a `KafkaCommitable` if the transaction is empty. In this case, any `KafkaCommittable` restored from a checkpoint always has some data in them, and therefore it is correct to always set the internal `transactionStarted` flag to `true` on the recreated producer. This PR chooses to go with approach 2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
