[ 
https://issues.apache.org/jira/browse/FLINK-23896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-23896:
-----------------------------------
    Labels: pull-request-available  (was: )

> The new KafkaSink drops data if job fails between checkpoint and transaction 
> commit.
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-23896
>                 URL: https://issues.apache.org/jira/browse/FLINK-23896
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: David Morávek
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> * Any time a new *transactional producer* is started, 
> "[KafkaProducer#initTransactions()|https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#initTransactions--]";
>  needs to be called in order to obtain new *ProducerId* from 
> *TransactionCoordinator* (Kafka Broker component).
>  ** *ProducerId* is increased every time a new producer with the same 
> *TransactionalId* is registered.
>  ** Publication of new ProducerId *FENCES* all prior ProducerIds and *ABORTS* 
> all of uncommitted transactions assigned with them.
>  * *KafkaCommitter* uses a separate producer, that hacks into Kafka internals 
> and resumes transaction using epoch and producer, without actually assigning 
> a new ProducerId for itself. This committer uses *ProducerId* that is stored 
> in *KafkaComittable* state to commit transaction.
>  * If a *new producer is started before committing the transaction* (this can 
> happen in some failover scenarios), ProducerId stored in the state is already 
> *FENCED* and commit fails with *ProducerFencedException*. Because we 
> currently ignore this exception (we just log a warning), we effectively 
> *DROP* all uncommitted data from the previous checkpoint.
> Basically any job failure that happens between successfully taking a 
> checkpoint and committing transactions, will trigger this behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to