Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4871#discussion_r146204340
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
 ---
    @@ -49,61 +71,205 @@ public void 
testCreateWithNonSerializableDeserializerFails() {
                exception.expect(IllegalArgumentException.class);
                exception.expectMessage("The provided serialization schema is 
not serializable");
     
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           new FlinkKinesisProducer<>(new 
NonSerializableSerializationSchema(), testConfig);
    +           new FlinkKinesisProducer<>(new 
NonSerializableSerializationSchema(), getStandardProperties());
        }
     
        @Test
        public void testCreateWithSerializableDeserializer() {
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           new FlinkKinesisProducer<>(new 
SerializableSerializationSchema(), testConfig);
    +           new FlinkKinesisProducer<>(new 
SerializableSerializationSchema(), getStandardProperties());
        }
     
        @Test
        public void testConfigureWithNonSerializableCustomPartitionerFails() {
                exception.expect(IllegalArgumentException.class);
                exception.expectMessage("The provided custom partitioner is not 
serializable");
     
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
    +           new FlinkKinesisProducer<>(new SimpleStringSchema(), 
getStandardProperties())
                        .setCustomPartitioner(new 
NonSerializableCustomPartitioner());
        }
     
        @Test
        public void testConfigureWithSerializableCustomPartitioner() {
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
    +           new FlinkKinesisProducer<>(new SimpleStringSchema(), 
getStandardProperties())
                        .setCustomPartitioner(new 
SerializableCustomPartitioner());
        }
     
        @Test
        public void testConsumerIsSerializable() {
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           FlinkKinesisProducer<String> consumer = new 
FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig);
    +           FlinkKinesisProducer<String> consumer = new 
FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties());
                assertTrue(InstantiationUtil.isSerializable(consumer));
        }
     
        // 
----------------------------------------------------------------------
    +   // Tests to verify at-least-once guarantee
    +   // 
----------------------------------------------------------------------
    +
    +   /**
    +    * Test ensuring that if an invoke call happens right after an async 
exception is caught, it should be rethrown.
    +    */
    +   @SuppressWarnings("ResultOfMethodCallIgnored")
    +   @Test
    +   public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +           final DummyFlinkKinesisProducer<String> producer = new 
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           producer.getPendingRecordFutures().get(0).setException(new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.processElement(new StreamRecord<>("msg-2"));
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async 
exception").isPresent());
    +
    +                   // test succeeded
    +                   return;
    +           }
    +
    +           Assert.fail();
    +   }
    +
    +   /**
    +    * Test ensuring that if a snapshot call happens right after an async 
exception is caught, it should be rethrown.
    +    */
    +   @SuppressWarnings("ResultOfMethodCallIgnored")
    +   @Test
    +   public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +           final DummyFlinkKinesisProducer<String> producer = new 
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           producer.getPendingRecordFutures().get(0).setException(new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.snapshot(123L, 123L);
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async 
exception").isPresent());
    +
    +                   // test succeeded
    +                   return;
    +           }
    +
    +           Assert.fail();
    +   }
    +
    +   /**
    +    * Test ensuring that if an async exception is caught for one of the 
flushed requests on checkpoint,
    +    * it should be rethrown; we set a timeout because the test will not 
finish if the logic is broken.
    +    *
    +    * <p>Note that this test does not test the snapshot method is blocked 
correctly when there are pending recorrds.
    +    * The test for that is covered in testAtLeastOnceProducer.
    +    */
    +   @SuppressWarnings("ResultOfMethodCallIgnored")
    +   @Test
    +   public void testAsyncErrorRethrownAfterFlush() throws Throwable {
    +           final DummyFlinkKinesisProducer<String> producer = new 
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +           testHarness.processElement(new StreamRecord<>("msg-2"));
    +           testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +           // only let the first record succeed for now
    +           UserRecordResult result = mock(UserRecordResult.class);
    +           when(result.isSuccessful()).thenReturn(true);
    +           producer.getPendingRecordFutures().get(0).set(result);
    +
    +           CheckedThread snapshotThread = new CheckedThread() {
    +                   @Override
    +                   public void go() throws Exception {
    +                           // this should block at first, since there are 
still two pending records that needs to be flushed
    +                           testHarness.snapshot(123L, 123L);
    +                   }
    +           };
    +           snapshotThread.start();
    +
    +           // let the 2nd message fail with an async exception
    +           producer.getPendingRecordFutures().get(1).setException(new 
Exception("artificial async failure for 2nd message"));
    +           
producer.getPendingRecordFutures().get(2).set(mock(UserRecordResult.class));
    +
    +           try {
    +                   snapshotThread.sync();
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   e.printStackTrace();
    +                   
Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async 
failure for 2nd message").isPresent());
    +
    +                   // test succeeded
    +                   return;
    +           }
    +
    +           Assert.fail();
    +   }
    +
    +   /**
    +    * Test ensuring that the producer is not dropping buffered records;
    +    * we set a timeout because the test will not finish if the logic is 
broken.
    +    */
    +   @SuppressWarnings("unchecked")
    +   @Test(timeout = 10000)
    +   public void testAtLeastOnceProducer() throws Throwable {
    --- End diff --
    
    I am not sure if this test is good enough:
    
    1. It is not testing the code that it should :( it is using overriden 
version of the `FlinkKinesisProducer` - `DummyFlinkKinesisProducer` can hide 
bugs in the real implementation.
    2. Implementing it as a unit test with mocks, doesn't test for out 
integration with `Kinesis`. You made some assumption how `at-least-once` should 
be implemented, you implemented it in production code and here you are 
repeating the same code using the same assumptions :(
    
    However looking at Kafka tests instability I'm not sure which approach is 
worse... Unless those are not tests instabilities but bugs in our code, which 
Kafka's ITCases are triggering from time to time - this mockito based test 
would not discover such bugs.


---

Reply via email to