[
https://issues.apache.org/jira/browse/FLINK-23896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-23896:
-----------------------------------
Labels: pull-request-available (was: )
> The new KafkaSink drops data if job fails between checkpoint and transaction
> commit.
> ------------------------------------------------------------------------------------
>
> Key: FLINK-23896
> URL: https://issues.apache.org/jira/browse/FLINK-23896
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: David Morávek
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.14.0
>
>
> * Any time a new *transactional producer* is started,
> "[KafkaProducer#initTransactions()|https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#initTransactions--]"
> needs to be called in order to obtain new *ProducerId* from
> *TransactionCoordinator* (Kafka Broker component).
> ** *ProducerId* is increased every time a new producer with the same
> *TransactionalId* is registered.
> ** Publication of new ProducerId *FENCES* all prior ProducerIds and *ABORTS*
> all of uncommitted transactions assigned with them.
> * *KafkaCommitter* uses a separate producer, that hacks into Kafka internals
> and resumes transaction using epoch and producer, without actually assigning
> a new ProducerId for itself. This committer uses *ProducerId* that is stored
> in *KafkaComittable* state to commit transaction.
> * If a *new producer is started before committing the transaction* (this can
> happen in some failover scenarios), ProducerId stored in the state is already
> *FENCED* and commit fails with *ProducerFencedException*. Because we
> currently ignore this exception (we just log a warning), we effectively
> *DROP* all uncommitted data from the previous checkpoint.
> Basically any job failure that happens between successfully taking a
> checkpoint and committing transactions, will trigger this behavior.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)