Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r101973453
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
 ---
    @@ -88,201 +85,321 @@ public void testKeyValueDeserializersSetIfMissing() 
throws Exception {
        @Test
        public void testPartitionerOpenedWithDeterminatePartitionList() throws 
Exception {
                KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
                RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
                when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
                
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    -
    -           DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    +           
    +           // out-of-order list of 4 partitions
    +           List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +
    +           final DummyFlinkKafkaProducer producer = new 
DummyFlinkKafkaProducer(
                        FakeStandardProducerConfig.get(), mockPartitioner);
                producer.setRuntimeContext(mockRuntimeContext);
     
    +           final KafkaProducer mockProducer = 
producer.getMockKafkaProducer();
    +           
when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +           when(mockProducer.metrics()).thenReturn(null);
    +
                producer.open(new Configuration());
     
    -           // the internal mock KafkaProducer will return an out-of-order 
list of 4 partitions,
    -           // which should be sorted before provided to the custom 
partitioner's open() method
    +           // the out-of-order partitions list should be sorted before 
provided to the custom partitioner's open() method
                int[] correctPartitionList = {0, 1, 2, 3};
                verify(mockPartitioner).open(0, 1, correctPartitionList);
        }
     
        /**
    -    * Test ensuring that the producer is not dropping buffered records.;
    -    * we set a timeout because the test will not finish if the logic is 
broken
    +    * Test ensuring that if an invoke call happens right after an async 
exception is caught, it should be rethrown
         */
    -   @Test(timeout=5000)
    -   public void testAtLeastOnceProducer() throws Throwable {
    -           runAtLeastOnceTest(true);
    +   @Test
    +   public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null);
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           // let the message request return an async exception
    +           producer.getPendingCallbacks().get(0).onCompletion(null, new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.processElement(new StreamRecord<>("msg-2"));
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(e.getCause().getMessage().contains("artificial async 
exception"));
    +
    +                   // test succeeded
    +                   return;
    +           }
    +
    +           Assert.fail();
        }
     
        /**
    -    * Ensures that the at least once producing test fails if the flushing 
is disabled
    +    * Test ensuring that if a snapshot call happens right after an async 
exception is caught, it should be rethrown
         */
    -   @Test(expected = AssertionError.class, timeout=5000)
    -   public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws 
Throwable {
    -           runAtLeastOnceTest(false);
    -   }
    -
    -   private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws 
Throwable {
    -           final AtomicBoolean snapshottingFinished = new 
AtomicBoolean(false);
    +   @Test
    +   public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
                final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    -                   FakeStandardProducerConfig.get(), null, 
snapshottingFinished);
    -           producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +                   FakeStandardProducerConfig.get(), null);
     
                OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -                           new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
     
                testHarness.open();
     
    -           for (int i = 0; i < 100; i++) {
    -                   testHarness.processElement(new StreamRecord<>("msg-" + 
i));
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           // let the message request return an async exception
    +           producer.getPendingCallbacks().get(0).onCompletion(null, new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.snapshot(123L, 123L);
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(e.getCause().getMessage().contains("artificial async 
exception"));
    +
    +                   // test succeeded
    +                   return;
                }
     
    -           // start a thread confirming all pending records
    -           final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -           final Thread threadA = Thread.currentThread();
    +           Assert.fail();
    +   }
    +
    +   /**
    +    * Test ensuring that if an async exception is caught for one of the 
flushed requests on checkpoint,
    +    * it should be rethrown; we set a timeout because the test will not 
finish if the logic is broken.
    +    *
    +    * Note that this test does not test the snapshot method is blocked 
correctly when there are pending recorrds.
    +    * The test for that is covered in testAtLeastOnceProducer.
    +    */
    +   @SuppressWarnings("unchecked")
    +   @Test(timeout=5000)
    +   public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws 
Throwable {
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null);
    +           producer.setFlushOnCheckpoint(true);
    +
    +           final KafkaProducer<?, ?> mockProducer = 
producer.getMockKafkaProducer();
    +
    +           final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +           testHarness.processElement(new StreamRecord<>("msg-2"));
    +           testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +           verify(mockProducer, times(3)).send(any(ProducerRecord.class), 
any(Callback.class));
    +
    +           // only let the first callback succeed for now
    +           producer.getPendingCallbacks().get(0).onCompletion(null, null);
     
    -           Runnable confirmer = new Runnable() {
    +           CheckedThread snapshotThread = new CheckedThread() {
                        @Override
    -                   public void run() {
    -                           try {
    -                                   MockProducer mp = 
producer.getProducerInstance();
    -                                   List<Callback> pending = 
mp.getPending();
    -
    -                                   // we need to find out if the 
snapshot() method blocks forever
    -                                   // this is not possible. If snapshot() 
is running, it will
    -                                   // start removing elements from the 
pending list.
    -                                   synchronized (threadA) {
    -                                           threadA.wait(500L);
    -                                   }
    -                                   // we now check that no records have 
been confirmed yet
    -                                   Assert.assertEquals(100, 
pending.size());
    -                                   Assert.assertFalse("Snapshot method 
returned before all records were confirmed",
    -                                           snapshottingFinished.get());
    -
    -                                   // now confirm all checkpoints
    -                                   for (Callback c: pending) {
    -                                           c.onCompletion(null, null);
    -                                   }
    -                                   pending.clear();
    -                           } catch(Throwable t) {
    -                                   runnableError.f0 = t;
    -                           }
    +                   public void go() throws Exception {
    +                           // this should block at first, since there are 
still two pending records that needs to be flushed
    +                           testHarness.snapshot(123L, 123L);
                        }
                };
    -           Thread threadB = new Thread(confirmer);
    -           threadB.start();
    +           snapshotThread.start();
     
    -           // this should block:
    -           testHarness.snapshot(0, 0);
    +           // let the 2nd message fail with an async exception
    +           producer.getPendingCallbacks().get(1).onCompletion(null, new 
Exception("artificial async failure for 2nd message"));
    +           producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -           synchronized (threadA) {
    -                   threadA.notifyAll(); // just in case, to let the test 
fail faster
    -           }
    -           Assert.assertEquals(0, 
producer.getProducerInstance().getPending().size());
    -           Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -           while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -                   threadB.join(500);
    -           }
    -           Assert.assertFalse("Thread A is expected to be finished at this 
point. If not, the test is prone to fail", threadB.isAlive());
    -           if (runnableError.f0 != null) {
    -                   throw runnableError.f0;
    +           try {
    +                   snapshotThread.sync();
    +           } catch (Exception e) {
    +                   // the snapshot should have failed with the async 
exception
    +                   
Assert.assertTrue(e.getCause().getMessage().contains("artificial async failure 
for 2nd message"));
    +
    +                   // test succeeded
    +                   return;
                }
     
    +           Assert.fail();
    +   }
    +
    +   /**
    +    * Test ensuring that the producer is not dropping buffered records;
    +    * we set a timeout because the test will not finish if the logic is 
broken
    +    */
    +   @SuppressWarnings("unchecked")
    +   @Test(timeout=10000)
    +   public void testAtLeastOnceProducer() throws Throwable {
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null);
    +           producer.setFlushOnCheckpoint(true);
    +
    +           final KafkaProducer<?, ?> mockProducer = 
producer.getMockKafkaProducer();
    +
    +           final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +           testHarness.processElement(new StreamRecord<>("msg-2"));
    +           testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +           verify(mockProducer, times(3)).send(any(ProducerRecord.class), 
any(Callback.class));
    +           Assert.assertEquals(3, producer.getPendingSize());
    +
    +           // start a thread to perform checkpointing
    +           CheckedThread snapshotThread = new CheckedThread() {
    +                   @Override
    +                   public void go() throws Exception {
    +                           // this should block until all records are 
flushed;
    +                           // if the snapshot implementation returns 
before pending records are flushed,
    +                           testHarness.snapshot(123L, 123L);
    +                   }
    +           };
    +           snapshotThread.start();
    +
    +           // being extra safe that the snapshot is correctly blocked;
    +           // after some arbitrary time, the snapshot should still be 
blocked
    +           snapshotThread.join(3000);
    --- End diff --
    
    Now I see what you mean, and the purpose of the latch you were previously 
proposing. Thanks for the comment. I will address this and proceed to merge the 
PR later today.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to