[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15857769#comment-15857769
 ] 

ASF GitHub Bot commented on FLINK-5701:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100030377
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
 ---
    @@ -293,6 +293,61 @@ public void run() {
                testHarness.close();
        }
     
    +   /**
    +    * This test is meant to assure that testAtLeastOnceProducer is valid 
by testing that if flushing is disabled,
    +    * the snapshot method does indeed finishes without waiting for pending 
records;
    +    * we set a timeout because the test will not finish if the logic is 
broken
    +    */
    +   @Test(timeout=5000)
    +   public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws 
Throwable {
    +           final OneShotLatch inputLatch = new OneShotLatch();
    +
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null, inputLatch, 
100, new AtomicBoolean(false));
    +           producer.setFlushOnCheckpoint(false);
    +
    +           final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
    +
    +           testHarness.open();
    +
    +           List<Callback> pending = 
producer.getProducerInstance().getPending();
    +
    +           for (int i = 0; i < 100; i++) {
    +                   testHarness.processElement(new StreamRecord<>("msg-" + 
i));
    +           }
    +
    +           inputLatch.await();
    +
    +           // make sure that all callbacks have not been completed
    +           Assert.assertEquals(100, pending.size());
    +
    +           // use a separate thread to continuously monitor whether 
snapshotting has returned
    +           final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +           Thread snapshotThread = new Thread(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           try {
    +                                   testHarness.snapshot(123L, 123L);
    +                           } catch (Exception e) {
    +                                   runnableError.f0 = e;
    +                           }
    +                   }
    +           });
    --- End diff --
    
    Why this additional `Thread`? I think calling directly 
`testHarness.snapshot` is perfectly fine. Whether you block on this call or on 
the `Thread.join` does not make a difference.


> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to