[
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tzu-Li (Gordon) Tai updated FLINK-5701:
---------------------------------------
Summary: FlinkKafkaPrdocuer should check asyncException on checkpoints
(was: FlinkKafkaPrdocuer violates at-least-once by not handling failed records)
> FlinkKafkaPrdocuer should check asyncException on checkpoints
> -------------------------------------------------------------
>
> Key: FLINK-5701
> URL: https://issues.apache.org/jira/browse/FLINK-5701
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector, Streaming Connectors
> Reporter: Tzu-Li (Gordon) Tai
>
> Reported in ML:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each
> invoke() and decremented on each callback, used to check if the producer
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}}
> (currently, we’re only checking {{pendingRecords}}).
> Generally, to fix this, we need to handle exceptions in the callback and
> re-add the original record back into the producer. I think the {{onComplete}}
> method is called after the KafkaProducer internally finishes all retry
> attempts and is removed from the buffer, so if we don’t do anything with the
> exception other than just logging it, the message will be lost.
> Two additional things we need to address in order to solve this:
> 1. {{FlinkKafkaProducer}} needs to keep a map of callback to their
> corresponding original record.
> 2. We need to determine what async exceptions to actually re-add to the
> FlinkKafkaProducer. We simply cannot re-add for every exception, otherwise
> errors that simply cannot be resolved by retrying will hang the checkpoint
> flush process forever, and it'll be unclear to the user why the checkpoint is
> taking so long. The ElasticsearchSink has similar issues (FLINK-5353 and
> FLINK-5122). The proposed approach for this, instead of determining which
> async exceptions to retry case by case, is to let the user provide async
> failure handlers and let them implement logic on which exceptions to handle /
> re-add.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)