[
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15857494#comment-15857494
]
ASF GitHub Bot commented on FLINK-5701:
---------------------------------------
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/3278
Instead of directly re-adding
`testAtLeastOnceProducerFailsIfFlushingDisabled `, I instead added a test
`testDoesNotWaitForPendingRecordsIfFlushingDisabled` to simply assure that the
snapshot method returns even if there are pending records.
The purpose of the `testDoesNotWaitForPendingRecordsIfFlushingDisabled`
test is to assure that the actual `testAtLeastOnceProducer` test is valid.
I think this was what the original
`testAtLeastOnceProducerFailsIfFlushingDisabled` was actually meant for anyway.
@tillrohrmann please let me know if this makes sense to you :-)
> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
> Key: FLINK-5701
> URL: https://issues.apache.org/jira/browse/FLINK-5701
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector, Streaming Connectors
> Reporter: Tzu-Li (Gordon) Tai
> Priority: Critical
>
> Reported in ML:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each
> invoke() and decremented on each callback, used to check if the producer
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}}
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the
> {{snapshotState}} method both before and after flushing and
> {{pendingRecords}} becomes 0.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)