[
https://issues.apache.org/jira/browse/FLINK-23896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402120#comment-17402120
]
Fabian Paul commented on FLINK-23896:
-------------------------------------
[~dmvk] thanks for looking into this. I am not sure your analysis is fully
correct since in the current implementation it should be guaranteed that a
transaction that is in the committer state is not reused by the KafkaWriter to
write new records. This would cause the epoch to increase and fence the
committer.
Unfortunately, the current and old implementation has the downside that it
indeed might lose data if the transaction is part of the checkpoint but not
committed, the job fails and is not restarted until Kafka aborts the
transactions. We recommend setting a high enough transaction timeout to
alleviate this problem.
> The new KafkaSink drops data if job fails between checkpoint and transaction
> commit.
> ------------------------------------------------------------------------------------
>
> Key: FLINK-23896
> URL: https://issues.apache.org/jira/browse/FLINK-23896
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: David Morávek
> Assignee: David Morávek
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.14.0
>
>
> * Any time a new *transactional producer* is started,
> "[KafkaProducer#initTransactions()|https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#initTransactions--]"
> needs to be called in order to obtain new *ProducerId* from
> *TransactionCoordinator* (Kafka Broker component).
> ** *ProducerId* is increased every time a new producer with the same
> *TransactionalId* is registered.
> ** Publication of new ProducerId *FENCES* all prior ProducerIds and *ABORTS*
> all of uncommitted transactions assigned with them.
> * *KafkaCommitter* uses a separate producer, that hacks into Kafka internals
> and resumes transaction using epoch and producer, without actually assigning
> a new ProducerId for itself. This committer uses *ProducerId* that is stored
> in *KafkaComittable* state to commit transaction.
> * If a *new producer is started before committing the transaction* (this can
> happen in some failover scenarios), ProducerId stored in the state is already
> *FENCED* and commit fails with *ProducerFencedException*. Because we
> currently ignore this exception (we just log a warning), we effectively
> *DROP* all uncommitted data from the previous checkpoint.
> Basically any job failure that happens between successfully taking a
> checkpoint and committing transactions, will trigger this behavior.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)