[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867860#comment-15867860
 ] 

ASF GitHub Bot commented on FLINK-5701:
---------------------------------------

Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    @tillrohrmann thanks for your second-pass review, the tips you mentioned 
were helpful. I've incoporated all of your comments except:
    
    > Maybe you could override the DummyFlinkKafkaProducer#flush method to 
insert some latches to see when you enter and when the method is done. Then you 
could wait on the first latch and check with the latter whether the method has 
completed.
    
    I think this is over-complicating things. I don't think it makes sense to 
add ways to explicitly wait for the `flush` method to complete - it's called in 
the `snapshotState` method, so isn't it identical to waiting for the snapshot 
thread to complete?
    
    Instead, I override the `snapshotState` method and added a flag inside the 
to make sure that when the base `snapshotState` implementation returns, it 
returned after it finished calling `flush`. Whether or not `flush` blocks 
correctly is out-of-scope of these tests, because `flush` is actually an 
abstract method for Kafka version-specific concrete subclasses to implement.


> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to