[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15862839#comment-15862839
 ] 

ASF GitHub Bot commented on FLINK-5701:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100695842
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
 ---
    @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() 
throws Exception {
        @Test
        public void testPartitionerOpenedWithDeterminatePartitionList() throws 
Exception {
                KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
                RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
                when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
                
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    +           
    +           // out-of-order list of 4 partitions
    +           List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +           
    +           KafkaProducer mockProducer = mock(KafkaProducer.class);
    +           
when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +           when(mockProducer.metrics()).thenReturn(null);
     
                DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    -                   FakeStandardProducerConfig.get(), mockPartitioner);
    +                   FakeStandardProducerConfig.get(), mockPartitioner, 
mockProducer);
                producer.setRuntimeContext(mockRuntimeContext);
     
                producer.open(new Configuration());
     
    -           // the internal mock KafkaProducer will return an out-of-order 
list of 4 partitions,
    -           // which should be sorted before provided to the custom 
partitioner's open() method
    +           // the out-of-order partitions list should be sorted before 
provided to the custom partitioner's open() method
                int[] correctPartitionList = {0, 1, 2, 3};
                verify(mockPartitioner).open(0, 1, correctPartitionList);
        }
     
        /**
    -    * Test ensuring that the producer is not dropping buffered records.;
    -    * we set a timeout because the test will not finish if the logic is 
broken
    +    * Test ensuring that if an invoke call happens right after an async 
exception is caught, it should be rethrown
         */
    -   @Test(timeout=5000)
    -   public void testAtLeastOnceProducer() throws Throwable {
    -           runAtLeastOnceTest(true);
    +   @Test
    +   public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +           KafkaProducer mockProducer = mock(KafkaProducer.class);
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null, mockProducer);
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           // let the message request return an async exception
    +           producer.getPendingCallbacks().get(0).onCompletion(null, new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.processElement(new StreamRecord<>("msg-2"));
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(e.getCause().getMessage().contains("artificial async 
exception"));
    +                   return;
    +           }
    +
    +           Assert.fail();
        }
     
        /**
    -    * Ensures that the at least once producing test fails if the flushing 
is disabled
    +    * Test ensuring that if a snapshot call happens right after an async 
exception is caught, it should be rethrown
         */
    -   @Test(expected = AssertionError.class, timeout=5000)
    -   public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws 
Throwable {
    -           runAtLeastOnceTest(false);
    -   }
    -
    -   private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws 
Throwable {
    -           final AtomicBoolean snapshottingFinished = new 
AtomicBoolean(false);
    +   @Test
    +   public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +           KafkaProducer mockProducer = mock(KafkaProducer.class);
                final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    -                   FakeStandardProducerConfig.get(), null, 
snapshottingFinished);
    -           producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +                   FakeStandardProducerConfig.get(), null, mockProducer);
     
                OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -                           new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
     
                testHarness.open();
     
    -           for (int i = 0; i < 100; i++) {
    -                   testHarness.processElement(new StreamRecord<>("msg-" + 
i));
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           // let the message request return an async exception
    +           producer.getPendingCallbacks().get(0).onCompletion(null, new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.snapshot(123L, 123L);
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(e.getCause().getMessage().contains("artificial async 
exception"));
    +                   return;
                }
     
    -           // start a thread confirming all pending records
    -           final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -           final Thread threadA = Thread.currentThread();
    +           Assert.fail();
    +   }
     
    -           Runnable confirmer = new Runnable() {
    +   /**
    +    * Test ensuring that if an async exception is caught for one of the 
flushed requests on checkpoint,
    +    * it should be rethrown; we set a timeout because the test will not 
finish if the logic is broken.
    +    *
    +    * Note that this test does not test the snapshot method is blocked 
correctly when there are pending recorrds.
    +    * The test for that is covered in testAtLeastOnceProducer.
    +    */
    +   @Test(timeout=5000)
    +   public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws 
Throwable {
    +           KafkaProducer mockProducer = mock(KafkaProducer.class);
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null, mockProducer);
    +           producer.setFlushOnCheckpoint(true);
    +
    +           final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +           testHarness.processElement(new StreamRecord<>("msg-2"));
    +           testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +           verify(mockProducer, times(3)).send(any(ProducerRecord.class), 
any(Callback.class));
    +
    +           // only let the first callback succeed for now
    +           producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +
    +           final Tuple1<Throwable> asyncError = new Tuple1<>(null);
    +           Thread snapshotThread = new Thread(new Runnable() {
    --- End diff --
    
    Here you can use a `CheckedThread` instead.


> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to