[
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tzu-Li (Gordon) Tai updated FLINK-5701:
---------------------------------------
Description:
Reported in ML:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
The problem:
The producer holds a {{pendingRecords}} value that is incremented on each
invoke() and decremented on each callback, used to check if the producer needs
to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after
flushing the {{pendingRecords == 0}} and {{asyncException == null}} (currently,
we’re only checking {{pendingRecords}}).
A quick fix for this is to check and rethrow async exceptions in the
{{snapshotState}} method both before and after flushing and {{pendingRecords}}
becomes 0.
was:
Reported in ML:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
The problem:
The producer holds a {{pendingRecords}} value that is incremented on each
invoke() and decremented on each callback, used to check if the producer needs
to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after
flushing the {{pendingRecords == 0}} and {{asyncException == null}} (currently,
we’re only checking {{pendingRecords}}).
A quick fix for this is to check and rethrow async exceptions in the
`snapshotState` method both before and after flushing and `pendingRecords`
becomes 0.
> FlinkKafkaPrdocuer should check asyncException on checkpoints
> -------------------------------------------------------------
>
> Key: FLINK-5701
> URL: https://issues.apache.org/jira/browse/FLINK-5701
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector, Streaming Connectors
> Reporter: Tzu-Li (Gordon) Tai
>
> Reported in ML:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each
> invoke() and decremented on each callback, used to check if the producer
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}}
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the
> {{snapshotState}} method both before and after flushing and
> {{pendingRecords}} becomes 0.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)