[ 
https://issues.apache.org/jira/browse/FLINK-31363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17699003#comment-17699003
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-31363:
---------------------------------------------

Thanks for reporting this [~lightzhao], I think this is a valid issue.

Kafka internally doesn't actually consider a transaction started until the 
first record is sent to a partition, and then that partition is added to a 
transaction.
So, when we start new transactions after every checkpoint in the KafkaSink via 
{{{}producer.beginTransaction(){}}}, there's actually no explicit txn request 
sent to Kafka until the first {{{}producer.send(){}}}.

In other words, I think the following would return an InvalidTxnStateException 
from Kafka:

 
{code:java}
producer.beginTransaction();
producer.commitTransaction();

// or

producer.beginTransaction();
producer.abortTransaction();{code}
And this can happen if, for example, within a checkpoint no data has been sent 
to Kafka at all. Which may be the case if e.g. some upstream filtering operator 
filtered out all records, all the job simply had no data to process because 
there was no records written to the Kafka source topic.

It is possible to address this by postponing the 
{{producer.beginTransaction()}} call until the first record after a checkpoint 
instead of pre-emptively starting a new transaction after the last checkpoint 
(like we do now), but that's going to add a redundant if-check on the main 
record processing loop.

Before deciding on anything, I need to check with our Kafka experts to see if 
this makes sense semantically.
Perhaps a different kind of exception could be returned from Kafka to indicate 
that the txn wasn't actually started on the server side, and we can then just 
safely ignore the exception.

 

> KafkaSink failed to commit transactions under EXACTLY_ONCE semantics
> --------------------------------------------------------------------
>
>                 Key: FLINK-31363
>                 URL: https://issues.apache.org/jira/browse/FLINK-31363
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.17.0, 1.16.1, 1.18.0
>            Reporter: lightzhao
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2023-03-08-10-54-51-410.png
>
>
> When KafkaSink starts Exactly once and no data is written to the topic during 
> a checkpoint, the transaction commit exception is triggered, with the 
> following exception.
> [Transiting to fatal error state due to 
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to