[
https://issues.apache.org/jira/browse/FLINK-23896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402188#comment-17402188
]
Fabian Paul commented on FLINK-23896:
-------------------------------------
I think the seen problem is not only affecting KafkaSink but all sinks
implementing FLIP-143. The interface claims that if committing fails the
committables should be returned [1]. Unfortunately, this behaviour was not
implemented, and returning failed committables runs into an
UnsupportedOperationException[2]. This means all errors happening during
committing cause potential data loss.
[1]
[https://github.com/apache/flink/blob/273dce5b030e12dd3d7bebb2f51036a198d07112/flink-core/src/main/java/org/apache/flink/api/connector/sink/Committer.java#L39]
[2]
https://github.com/apache/flink/blob/273dce5b030e12dd3d7bebb2f51036a198d07112/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterHandler.java#L141
> The new KafkaSink drops data if job fails between checkpoint and transaction
> commit.
> ------------------------------------------------------------------------------------
>
> Key: FLINK-23896
> URL: https://issues.apache.org/jira/browse/FLINK-23896
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: David Morávek
> Assignee: David Morávek
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.14.0
>
>
> * Any time a new *transactional producer* is started,
> "[KafkaProducer#initTransactions()|https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#initTransactions--]"
> needs to be called in order to obtain new *ProducerId* from
> *TransactionCoordinator* (Kafka Broker component).
> ** *ProducerId* is increased every time a new producer with the same
> *TransactionalId* is registered.
> ** Publication of new ProducerId *FENCES* all prior ProducerIds and *ABORTS*
> all of uncommitted transactions assigned with them.
> * *KafkaCommitter* uses a separate producer, that hacks into Kafka internals
> and resumes transaction using epoch and producer, without actually assigning
> a new ProducerId for itself. This committer uses *ProducerId* that is stored
> in *KafkaComittable* state to commit transaction.
> * If a *new producer is started before committing the transaction* (this can
> happen in some failover scenarios), ProducerId stored in the state is already
> *FENCED* and commit fails with *ProducerFencedException*. Because we
> currently ignore this exception (we just log a warning), we effectively
> *DROP* all uncommitted data from the previous checkpoint.
> Basically any job failure that happens between successfully taking a
> checkpoint and committing transactions, will trigger this behavior.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)