[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-5701:
---------------------------------------
    Description: 
Reported in ML: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html

The problem:

The producer holds a {{pendingRecords}} value that is incremented on each 
invoke() and decremented on each callback, used to check if the producer needs 
to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after 
flushing the {{pendingRecords == 0}} and {{asyncException == null}} (currently, 
we’re only checking {{pendingRecords}}).

A quick fix for this is to check and rethrow async exceptions in the 
{{snapshotState}} method both before and after flushing and {{pendingRecords}} 
becomes 0.

  was:
Reported in ML: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html

The problem:

The producer holds a {{pendingRecords}} value that is incremented on each 
invoke() and decremented on each callback, used to check if the producer needs 
to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after 
flushing the {{pendingRecords == 0}} and {{asyncException == null}} (currently, 
we’re only checking {{pendingRecords}}).

A quick fix for this is to check and rethrow async exceptions in the 
`snapshotState` method both before and after flushing and `pendingRecords` 
becomes 0.


> FlinkKafkaPrdocuer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to