[
https://issues.apache.org/jira/browse/FLINK-31363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17699131#comment-17699131
]
Tzu-Li (Gordon) Tai edited comment on FLINK-31363 at 3/10/23 10:36 PM:
-----------------------------------------------------------------------
Looking at the KafkaProducer code, the {{TransactionManager}} keeps a
{{transactionStarted}} flag that is only set when a record has actually been
sent to the transaction. On {{commitTransaction()}} / {{abortTransaction()}}
API calls on the Java client, if the flag is false, then the client won't
actually send a {{EndTxnRequest}} to the brokers.
So:
{code:java}
producer.beginTransaction();
producer.commitTransaction();
// or
producer.beginTransaction();
producer.abortTransaction(); {code}
the above doesn't throw in normal continuous execution.
It only throws if there was job downtime between the {{beginTransction()}} call
and the commit/abort call (because the flag would have been cleared)
So - I think the correct way to fix this is that we need to additionally
persist the {{transactionStarted}} flag in Flink checkpoints as transaction
metadata, and then set that appropriately when creating the recovery producer
at restore time.
was (Author: tzulitai):
Looking at the KafkaProducer code, the {{TransactionManager}} keeps a
{{transactionStarted}} flag that is only set when a record has actually been
sent to the transaction. On {{commitTransaction()}} / {{abortTransaction()}}
API calls on the Java client, if the flag is false, then the client won't
actually send a {{EndTxnRequest}} to the brokers.
So:
{code:java}
producer.beginTransaction();
producer.commitTransaction();
// or
producer.beginTransaction();
producer.abortTransaction(); {code}
the above doesn't throw in normal continuous execution.
It only throws if there was job downtime between the {{beginTransction()}} call
and the commit/abort call.
So - I think the correct way to fix this is that we need to additionally
persist the {{transactionStarted}} flag in Flink checkpoints as transaction
metadata, and then set that appropriately when creating the recovery producer
at restore time.
> KafkaSink failed to commit transactions under EXACTLY_ONCE semantics
> --------------------------------------------------------------------
>
> Key: FLINK-31363
> URL: https://issues.apache.org/jira/browse/FLINK-31363
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.17.0, 1.16.1, 1.18.0
> Reporter: lightzhao
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2023-03-08-10-54-51-410.png
>
>
> When KafkaSink starts Exactly once and no data is written to the topic during
> a checkpoint, the transaction commit exception is triggered, with the
> following exception.
> [Transiting to fatal error state due to
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer
> attempted a transactional operation in an invalid state.]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)