[
https://issues.apache.org/jira/browse/FLINK-31363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17699003#comment-17699003
]
Tzu-Li (Gordon) Tai commented on FLINK-31363:
---------------------------------------------
Thanks for reporting this [~lightzhao], I think this is a valid issue.
Kafka internally doesn't actually consider a transaction started until the
first record is sent to a partition, and then that partition is added to a
transaction.
So, when we start new transactions after every checkpoint in the KafkaSink via
{{{}producer.beginTransaction(){}}}, there's actually no explicit txn request
sent to Kafka until the first {{{}producer.send(){}}}.
In other words, I think the following would return an InvalidTxnStateException
from Kafka:
{code:java}
producer.beginTransaction();
producer.commitTransaction();
// or
producer.beginTransaction();
producer.abortTransaction();{code}
And this can happen if, for example, within a checkpoint no data has been sent
to Kafka at all. Which may be the case if e.g. some upstream filtering operator
filtered out all records, all the job simply had no data to process because
there was no records written to the Kafka source topic.
It is possible to address this by postponing the
{{producer.beginTransaction()}} call until the first record after a checkpoint
instead of pre-emptively starting a new transaction after the last checkpoint
(like we do now), but that's going to add a redundant if-check on the main
record processing loop.
Before deciding on anything, I need to check with our Kafka experts to see if
this makes sense semantically.
Perhaps a different kind of exception could be returned from Kafka to indicate
that the txn wasn't actually started on the server side, and we can then just
safely ignore the exception.
> KafkaSink failed to commit transactions under EXACTLY_ONCE semantics
> --------------------------------------------------------------------
>
> Key: FLINK-31363
> URL: https://issues.apache.org/jira/browse/FLINK-31363
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.17.0, 1.16.1, 1.18.0
> Reporter: lightzhao
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2023-03-08-10-54-51-410.png
>
>
> When KafkaSink starts Exactly once and no data is written to the topic during
> a checkpoint, the transaction commit exception is triggered, with the
> following exception.
> [Transiting to fatal error state due to
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer
> attempted a transactional operation in an invalid state.]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)