[ 
https://issues.apache.org/jira/browse/FLINK-23896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402127#comment-17402127
 ] 

David Morávek commented on FLINK-23896:
---------------------------------------

[~fabian.paul] We've already discussed this with [~arvid] a little bit. You're 
right that TransactionalId is unique, that something I've missed. Can you 
please see the attached test case that simulates the issue if you can get an 
explanation for the ProducerFencedException?

I'm aware of potential problem with transaction timeouts (TX expiring on the 
broker side).

> The new KafkaSink drops data if job fails between checkpoint and transaction 
> commit.
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-23896
>                 URL: https://issues.apache.org/jira/browse/FLINK-23896
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: David Morávek
>            Assignee: David Morávek
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> * Any time a new *transactional producer* is started, 
> "[KafkaProducer#initTransactions()|https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#initTransactions--]";
>  needs to be called in order to obtain new *ProducerId* from 
> *TransactionCoordinator* (Kafka Broker component).
>  ** *ProducerId* is increased every time a new producer with the same 
> *TransactionalId* is registered.
>  ** Publication of new ProducerId *FENCES* all prior ProducerIds and *ABORTS* 
> all of uncommitted transactions assigned with them.
>  * *KafkaCommitter* uses a separate producer, that hacks into Kafka internals 
> and resumes transaction using epoch and producer, without actually assigning 
> a new ProducerId for itself. This committer uses *ProducerId* that is stored 
> in *KafkaComittable* state to commit transaction.
>  * If a *new producer is started before committing the transaction* (this can 
> happen in some failover scenarios), ProducerId stored in the state is already 
> *FENCED* and commit fails with *ProducerFencedException*. Because we 
> currently ignore this exception (we just log a warning), we effectively 
> *DROP* all uncommitted data from the previous checkpoint.
> Basically any job failure that happens between successfully taking a 
> checkpoint and committing transactions, will trigger this behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to