[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15860987#comment-15860987
 ] 

ASF GitHub Bot commented on FLINK-5701:
---------------------------------------

Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    @tillrohrmann I've hopefully addressed your comments. Could you please have 
a look?
    
    The major change was to refactor the `DummyFlinkKafkaProducer` so that a 
simple mock `KafkaProducer` can be provided for tests where a simple 
`verify(mockProducer).send(...)` is sufficient. The refactored version also has 
the advantage that it makes sure the `Callback` implementation in the connector 
is correct (correctly decrements `pendingRecords` value).
    
    Note: the only thing I could not really improve is the 
`testAtLeastOnceProducer` test. Like before, the refactored version is still 
essentially continuously checking whether the snapshot method has returned 
early. The test is definitely stable, just that I don't think there is a better 
approach for it.


> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to