[ 
https://issues.apache.org/jira/browse/FLINK-23896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402188#comment-17402188
 ] 

Fabian Paul commented on FLINK-23896:
-------------------------------------

I think the seen problem is not only affecting KafkaSink but all sinks 
implementing FLIP-143. The interface claims that if committing fails the 
committables should be returned [1]. Unfortunately, this behaviour was not 
implemented, and returning failed committables runs into an 
UnsupportedOperationException[2]. This means all errors happening during 
committing cause potential data loss.

 

[1] 
[https://github.com/apache/flink/blob/273dce5b030e12dd3d7bebb2f51036a198d07112/flink-core/src/main/java/org/apache/flink/api/connector/sink/Committer.java#L39]

[2] 
https://github.com/apache/flink/blob/273dce5b030e12dd3d7bebb2f51036a198d07112/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterHandler.java#L141

> The new KafkaSink drops data if job fails between checkpoint and transaction 
> commit.
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-23896
>                 URL: https://issues.apache.org/jira/browse/FLINK-23896
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: David Morávek
>            Assignee: David Morávek
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> * Any time a new *transactional producer* is started, 
> "[KafkaProducer#initTransactions()|https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#initTransactions--]";
>  needs to be called in order to obtain new *ProducerId* from 
> *TransactionCoordinator* (Kafka Broker component).
>  ** *ProducerId* is increased every time a new producer with the same 
> *TransactionalId* is registered.
>  ** Publication of new ProducerId *FENCES* all prior ProducerIds and *ABORTS* 
> all of uncommitted transactions assigned with them.
>  * *KafkaCommitter* uses a separate producer, that hacks into Kafka internals 
> and resumes transaction using epoch and producer, without actually assigning 
> a new ProducerId for itself. This committer uses *ProducerId* that is stored 
> in *KafkaComittable* state to commit transaction.
>  * If a *new producer is started before committing the transaction* (this can 
> happen in some failover scenarios), ProducerId stored in the state is already 
> *FENCED* and commit fails with *ProducerFencedException*. Because we 
> currently ignore this exception (we just log a warning), we effectively 
> *DROP* all uncommitted data from the previous checkpoint.
> Basically any job failure that happens between successfully taking a 
> checkpoint and committing transactions, will trigger this behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to