[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-5701:
---------------------------------------
    Summary: FlinkKafkaPrdocuer should check asyncException on checkpoints  
(was: FlinkKafkaPrdocuer violates at-least-once by not handling failed records)

> FlinkKafkaPrdocuer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> Generally, to fix this, we need to handle exceptions in the callback and 
> re-add the original record back into the producer. I think the {{onComplete}} 
> method is called after the KafkaProducer internally finishes all retry 
> attempts and is removed from the buffer, so if we don’t do anything with the 
> exception other than just logging it, the message will be lost.
> Two additional things we need to address in order to solve this:
> 1. {{FlinkKafkaProducer}} needs to keep a map of callback to their 
> corresponding original record.
> 2. We need to determine what async exceptions to actually re-add to the 
> FlinkKafkaProducer. We simply cannot re-add for every exception, otherwise  
> errors that simply cannot be resolved by retrying will hang the checkpoint 
> flush process forever, and it'll be unclear to the user why the checkpoint is 
> taking so long. The ElasticsearchSink has similar issues (FLINK-5353 and 
> FLINK-5122). The proposed approach for this, instead of determining which 
> async exceptions to retry case by case, is to let the user provide async 
> failure handlers and let them implement logic on which exceptions to handle / 
> re-add.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to