[ https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tzu-Li (Gordon) Tai resolved FLINK-5701. ---------------------------------------- Resolution: Fixed Additionally fixed for 1.1.5 with http://git-wip-us.apache.org/repos/asf/flink/commit/ 6662cc6. > FlinkKafkaProducer should check asyncException on checkpoints > ------------------------------------------------------------- > > Key: FLINK-5701 > URL: https://issues.apache.org/jira/browse/FLINK-5701 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Critical > Fix For: 1.3.0, 1.1.5, 1.2.1 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html > The problem: > The producer holds a {{pendingRecords}} value that is incremented on each > invoke() and decremented on each callback, used to check if the producer > needs to sync on pending callbacks on checkpoints. > On each checkpoint, we should only consider the checkpoint succeeded iff > after flushing the {{pendingRecords == 0}} and {{asyncException == null}} > (currently, we’re only checking {{pendingRecords}}). > A quick fix for this is to check and rethrow async exceptions in the > {{snapshotState}} method both before and after flushing and > {{pendingRecords}} becomes 0. -- This message was sent by Atlassian JIRA (v6.3.15#6346)