[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-5701:
---------------------------------------
    Description: 
Reported in ML: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html

The problem:

The producer holds a {{pendingRecords}} value that is incremented on each 
invoke() and decremented on each callback, used to check if the producer needs 
to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after 
flushing the {{pendingRecords == 0}} and {{asyncException == null}} (currently, 
we’re only checking {{pendingRecords}}).

A quick fix for this is to check and rethrow async exceptions in the 
`snapshotState` method both before and after flushing and `pendingRecords` 
becomes 0.

  was:
Reported in ML: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html

The problem:

The producer holds a {{pendingRecords}} value that is incremented on each 
invoke() and decremented on each callback, used to check if the producer needs 
to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after 
flushing the {{pendingRecords == 0}} and {{asyncException == null}} (currently, 
we’re only checking {{pendingRecords}}).

Generally, to fix this, we need to handle exceptions in the callback and re-add 
the original record back into the producer. I think the {{onComplete}} method 
is called after the KafkaProducer internally finishes all retry attempts and is 
removed from the buffer, so if we don’t do anything with the exception other 
than just logging it, the message will be lost.

Two additional things we need to address in order to solve this:
1. {{FlinkKafkaProducer}} needs to keep a map of callback to their 
corresponding original record.

2. We need to determine what async exceptions to actually re-add to the 
FlinkKafkaProducer. We simply cannot re-add for every exception, otherwise  
errors that simply cannot be resolved by retrying will hang the checkpoint 
flush process forever, and it'll be unclear to the user why the checkpoint is 
taking so long. The ElasticsearchSink has similar issues (FLINK-5353 and 
FLINK-5122). The proposed approach for this, instead of determining which async 
exceptions to retry case by case, is to let the user provide async failure 
handlers and let them implement logic on which exceptions to handle / re-add.


> FlinkKafkaPrdocuer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> `snapshotState` method both before and after flushing and `pendingRecords` 
> becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to