[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874235#comment-15874235
 ] 

ASF GitHub Bot commented on FLINK-5701:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r101971157
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
 ---
    @@ -88,201 +85,321 @@ public void testKeyValueDeserializersSetIfMissing() 
throws Exception {
        @Test
        public void testPartitionerOpenedWithDeterminatePartitionList() throws 
Exception {
                KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
                RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
                when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
                
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    -
    -           DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    +           
    +           // out-of-order list of 4 partitions
    +           List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +
    +           final DummyFlinkKafkaProducer producer = new 
DummyFlinkKafkaProducer(
                        FakeStandardProducerConfig.get(), mockPartitioner);
                producer.setRuntimeContext(mockRuntimeContext);
     
    +           final KafkaProducer mockProducer = 
producer.getMockKafkaProducer();
    +           
when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +           when(mockProducer.metrics()).thenReturn(null);
    +
                producer.open(new Configuration());
     
    -           // the internal mock KafkaProducer will return an out-of-order 
list of 4 partitions,
    -           // which should be sorted before provided to the custom 
partitioner's open() method
    +           // the out-of-order partitions list should be sorted before 
provided to the custom partitioner's open() method
                int[] correctPartitionList = {0, 1, 2, 3};
                verify(mockPartitioner).open(0, 1, correctPartitionList);
        }
     
        /**
    -    * Test ensuring that the producer is not dropping buffered records.;
    -    * we set a timeout because the test will not finish if the logic is 
broken
    +    * Test ensuring that if an invoke call happens right after an async 
exception is caught, it should be rethrown
         */
    -   @Test(timeout=5000)
    -   public void testAtLeastOnceProducer() throws Throwable {
    -           runAtLeastOnceTest(true);
    +   @Test
    +   public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null);
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           // let the message request return an async exception
    +           producer.getPendingCallbacks().get(0).onCompletion(null, new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.processElement(new StreamRecord<>("msg-2"));
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(e.getCause().getMessage().contains("artificial async 
exception"));
    +
    +                   // test succeeded
    +                   return;
    +           }
    +
    +           Assert.fail();
        }
     
        /**
    -    * Ensures that the at least once producing test fails if the flushing 
is disabled
    +    * Test ensuring that if a snapshot call happens right after an async 
exception is caught, it should be rethrown
         */
    -   @Test(expected = AssertionError.class, timeout=5000)
    -   public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws 
Throwable {
    -           runAtLeastOnceTest(false);
    -   }
    -
    -   private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws 
Throwable {
    -           final AtomicBoolean snapshottingFinished = new 
AtomicBoolean(false);
    +   @Test
    +   public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
                final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    -                   FakeStandardProducerConfig.get(), null, 
snapshottingFinished);
    -           producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +                   FakeStandardProducerConfig.get(), null);
     
                OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -                           new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
     
                testHarness.open();
     
    -           for (int i = 0; i < 100; i++) {
    -                   testHarness.processElement(new StreamRecord<>("msg-" + 
i));
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           // let the message request return an async exception
    +           producer.getPendingCallbacks().get(0).onCompletion(null, new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.snapshot(123L, 123L);
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(e.getCause().getMessage().contains("artificial async 
exception"));
    +
    +                   // test succeeded
    +                   return;
                }
     
    -           // start a thread confirming all pending records
    -           final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -           final Thread threadA = Thread.currentThread();
    +           Assert.fail();
    +   }
    +
    +   /**
    +    * Test ensuring that if an async exception is caught for one of the 
flushed requests on checkpoint,
    +    * it should be rethrown; we set a timeout because the test will not 
finish if the logic is broken.
    +    *
    +    * Note that this test does not test the snapshot method is blocked 
correctly when there are pending recorrds.
    +    * The test for that is covered in testAtLeastOnceProducer.
    +    */
    +   @SuppressWarnings("unchecked")
    +   @Test(timeout=5000)
    +   public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws 
Throwable {
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null);
    +           producer.setFlushOnCheckpoint(true);
    +
    +           final KafkaProducer<?, ?> mockProducer = 
producer.getMockKafkaProducer();
    +
    +           final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +           testHarness.processElement(new StreamRecord<>("msg-2"));
    +           testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +           verify(mockProducer, times(3)).send(any(ProducerRecord.class), 
any(Callback.class));
    +
    +           // only let the first callback succeed for now
    +           producer.getPendingCallbacks().get(0).onCompletion(null, null);
     
    -           Runnable confirmer = new Runnable() {
    +           CheckedThread snapshotThread = new CheckedThread() {
                        @Override
    -                   public void run() {
    -                           try {
    -                                   MockProducer mp = 
producer.getProducerInstance();
    -                                   List<Callback> pending = 
mp.getPending();
    -
    -                                   // we need to find out if the 
snapshot() method blocks forever
    -                                   // this is not possible. If snapshot() 
is running, it will
    -                                   // start removing elements from the 
pending list.
    -                                   synchronized (threadA) {
    -                                           threadA.wait(500L);
    -                                   }
    -                                   // we now check that no records have 
been confirmed yet
    -                                   Assert.assertEquals(100, 
pending.size());
    -                                   Assert.assertFalse("Snapshot method 
returned before all records were confirmed",
    -                                           snapshottingFinished.get());
    -
    -                                   // now confirm all checkpoints
    -                                   for (Callback c: pending) {
    -                                           c.onCompletion(null, null);
    -                                   }
    -                                   pending.clear();
    -                           } catch(Throwable t) {
    -                                   runnableError.f0 = t;
    -                           }
    +                   public void go() throws Exception {
    +                           // this should block at first, since there are 
still two pending records that needs to be flushed
    +                           testHarness.snapshot(123L, 123L);
                        }
                };
    -           Thread threadB = new Thread(confirmer);
    -           threadB.start();
    +           snapshotThread.start();
     
    -           // this should block:
    -           testHarness.snapshot(0, 0);
    +           // let the 2nd message fail with an async exception
    +           producer.getPendingCallbacks().get(1).onCompletion(null, new 
Exception("artificial async failure for 2nd message"));
    +           producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -           synchronized (threadA) {
    -                   threadA.notifyAll(); // just in case, to let the test 
fail faster
    -           }
    -           Assert.assertEquals(0, 
producer.getProducerInstance().getPending().size());
    -           Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -           while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -                   threadB.join(500);
    -           }
    -           Assert.assertFalse("Thread A is expected to be finished at this 
point. If not, the test is prone to fail", threadB.isAlive());
    -           if (runnableError.f0 != null) {
    -                   throw runnableError.f0;
    +           try {
    +                   snapshotThread.sync();
    +           } catch (Exception e) {
    +                   // the snapshot should have failed with the async 
exception
    +                   
Assert.assertTrue(e.getCause().getMessage().contains("artificial async failure 
for 2nd message"));
    +
    +                   // test succeeded
    +                   return;
                }
     
    +           Assert.fail();
    +   }
    +
    +   /**
    +    * Test ensuring that the producer is not dropping buffered records;
    +    * we set a timeout because the test will not finish if the logic is 
broken
    +    */
    +   @SuppressWarnings("unchecked")
    +   @Test(timeout=10000)
    +   public void testAtLeastOnceProducer() throws Throwable {
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null);
    +           producer.setFlushOnCheckpoint(true);
    +
    +           final KafkaProducer<?, ?> mockProducer = 
producer.getMockKafkaProducer();
    +
    +           final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +           testHarness.processElement(new StreamRecord<>("msg-2"));
    +           testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +           verify(mockProducer, times(3)).send(any(ProducerRecord.class), 
any(Callback.class));
    +           Assert.assertEquals(3, producer.getPendingSize());
    +
    +           // start a thread to perform checkpointing
    +           CheckedThread snapshotThread = new CheckedThread() {
    +                   @Override
    +                   public void go() throws Exception {
    +                           // this should block until all records are 
flushed;
    +                           // if the snapshot implementation returns 
before pending records are flushed,
    +                           testHarness.snapshot(123L, 123L);
    +                   }
    +           };
    +           snapshotThread.start();
    +
    +           // being extra safe that the snapshot is correctly blocked;
    +           // after some arbitrary time, the snapshot should still be 
blocked
    +           snapshotThread.join(3000);
    --- End diff --
    
    Blocking for 3 seconds is not good. It's basically a `sleep` what you're 
doing here. Better to wait on a latch which is triggered when you enter the 
flush method, for example.


> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to