[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15854349#comment-15854349
 ] 

ASF GitHub Bot commented on FLINK-5701:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3278

    [FLINK-5701] [kafka] FlinkKafkaProducer should check asyncException on 
checkpoints

    Prior to this change, at-least-once is violated for the FlinkKafkaProducer 
because we were not failing checkpoints if there were async exceptions from the 
Kafka producer.
    
    With this PR, on `snapshotState()`, we fail if there previously were async 
exceptions. We also fail if the flushed records on checkpoint resulted in 
exceptions.
    
    This PR also improves the tests in `FlinkKafkaProducerBaseTest` to use 
one-shot latches instead of sleeping for more stable tests. It also removes the 
test `testAtLeastOnceProducerFailsIfFlushingDisabled()`. My reasoning is that 
essentially, you _might_ still have at-least-once even if flushing is disabled 
(i.e. always no pending records to flush on checkpoint), so I don't see the 
necessity in having that test. I'm open to discussing the removal of that test 
and adding it back if others think it's necessary.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-5701

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3278.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3278
    
----
commit c3eb0a905b86c6265af91c4bda7d2c9da2dc6ce2
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-02-06T16:37:13Z

    [FLINK-5701] [kafka] FlinkKafkaProducer should check asyncException on 
checkpoints

----


> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to