[
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867860#comment-15867860
]
ASF GitHub Bot commented on FLINK-5701:
---------------------------------------
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/3278
@tillrohrmann thanks for your second-pass review, the tips you mentioned
were helpful. I've incoporated all of your comments except:
> Maybe you could override the DummyFlinkKafkaProducer#flush method to
insert some latches to see when you enter and when the method is done. Then you
could wait on the first latch and check with the latter whether the method has
completed.
I think this is over-complicating things. I don't think it makes sense to
add ways to explicitly wait for the `flush` method to complete - it's called in
the `snapshotState` method, so isn't it identical to waiting for the snapshot
thread to complete?
Instead, I override the `snapshotState` method and added a flag inside the
to make sure that when the base `snapshotState` implementation returns, it
returned after it finished calling `flush`. Whether or not `flush` blocks
correctly is out-of-scope of these tests, because `flush` is actually an
abstract method for Kafka version-specific concrete subclasses to implement.
> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
> Key: FLINK-5701
> URL: https://issues.apache.org/jira/browse/FLINK-5701
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector, Streaming Connectors
> Reporter: Tzu-Li (Gordon) Tai
> Priority: Critical
>
> Reported in ML:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each
> invoke() and decremented on each callback, used to check if the producer
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}}
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the
> {{snapshotState}} method both before and after flushing and
> {{pendingRecords}} becomes 0.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)