[ 
https://issues.apache.org/jira/browse/FLINK-31363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17699131#comment-17699131
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-31363 at 3/10/23 10:36 PM:
-----------------------------------------------------------------------

Looking at the KafkaProducer code, the {{TransactionManager}} keeps a 
{{transactionStarted}} flag that is only set when a record has actually been 
sent to the transaction. On {{commitTransaction()}} / {{abortTransaction()}} 
API calls on the Java client, if the flag is false, then the client won't 
actually send a {{EndTxnRequest}} to the brokers.

So:
{code:java}
producer.beginTransaction();
producer.commitTransaction();

// or

producer.beginTransaction();
producer.abortTransaction(); {code}
the above doesn't throw in normal continuous execution.
It only throws if there was job downtime between the {{beginTransction()}} call 
and the commit/abort call (because the flag would have been cleared)

So - I think the correct way to fix this is that we need to additionally 
persist the {{transactionStarted}} flag in Flink checkpoints as transaction 
metadata, and then set that appropriately when creating the recovery producer 
at restore time. 


was (Author: tzulitai):
Looking at the KafkaProducer code, the {{TransactionManager}} keeps a 
{{transactionStarted}} flag that is only set when a record has actually been 
sent to the transaction. On {{commitTransaction()}} / {{abortTransaction()}} 
API calls on the Java client, if the flag is false, then the client won't 
actually send a {{EndTxnRequest}} to the brokers.

So:
{code:java}
producer.beginTransaction();
producer.commitTransaction();

// or

producer.beginTransaction();
producer.abortTransaction(); {code}
the above doesn't throw in normal continuous execution.
It only throws if there was job downtime between the {{beginTransction()}} call 
and the commit/abort call.

So - I think the correct way to fix this is that we need to additionally 
persist the {{transactionStarted}} flag in Flink checkpoints as transaction 
metadata, and then set that appropriately when creating the recovery producer 
at restore time. 

> KafkaSink failed to commit transactions under EXACTLY_ONCE semantics
> --------------------------------------------------------------------
>
>                 Key: FLINK-31363
>                 URL: https://issues.apache.org/jira/browse/FLINK-31363
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.17.0, 1.16.1, 1.18.0
>            Reporter: lightzhao
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2023-03-08-10-54-51-410.png
>
>
> When KafkaSink starts Exactly once and no data is written to the topic during 
> a checkpoint, the transaction commit exception is triggered, with the 
> following exception.
> [Transiting to fatal error state due to 
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to