Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4871#discussion_r146204340 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java --- @@ -49,61 +71,205 @@ public void testCreateWithNonSerializableDeserializerFails() { exception.expect(IllegalArgumentException.class); exception.expectMessage("The provided serialization schema is not serializable"); - Properties testConfig = new Properties(); - testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), testConfig); + new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), getStandardProperties()); } @Test public void testCreateWithSerializableDeserializer() { - Properties testConfig = new Properties(); - testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - new FlinkKinesisProducer<>(new SerializableSerializationSchema(), testConfig); + new FlinkKinesisProducer<>(new SerializableSerializationSchema(), getStandardProperties()); } @Test public void testConfigureWithNonSerializableCustomPartitionerFails() { exception.expect(IllegalArgumentException.class); exception.expectMessage("The provided custom partitioner is not serializable"); - Properties testConfig = new Properties(); - testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig) + new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties()) .setCustomPartitioner(new NonSerializableCustomPartitioner()); } @Test public void testConfigureWithSerializableCustomPartitioner() { - Properties testConfig = new Properties(); - testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig) + new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties()) .setCustomPartitioner(new SerializableCustomPartitioner()); } @Test public void testConsumerIsSerializable() { - Properties testConfig = new Properties(); - testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig); + FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties()); assertTrue(InstantiationUtil.isSerializable(consumer)); } // ---------------------------------------------------------------------- + // Tests to verify at-least-once guarantee + // ---------------------------------------------------------------------- + + /** + * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test + public void testAsyncErrorRethrownOnInvoke() throws Throwable { + final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema()); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("msg-1")); + + producer.getPendingRecordFutures().get(0).setException(new Exception("artificial async exception")); + + try { + testHarness.processElement(new StreamRecord<>("msg-2")); + } catch (Exception e) { + // the next invoke should rethrow the async exception + Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async exception").isPresent()); + + // test succeeded + return; + } + + Assert.fail(); + } + + /** + * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test + public void testAsyncErrorRethrownOnCheckpoint() throws Throwable { + final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema()); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("msg-1")); + + producer.getPendingRecordFutures().get(0).setException(new Exception("artificial async exception")); + + try { + testHarness.snapshot(123L, 123L); + } catch (Exception e) { + // the next invoke should rethrow the async exception + Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async exception").isPresent()); + + // test succeeded + return; + } + + Assert.fail(); + } + + /** + * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint, + * it should be rethrown; we set a timeout because the test will not finish if the logic is broken. + * + * <p>Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds. + * The test for that is covered in testAtLeastOnceProducer. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test + public void testAsyncErrorRethrownAfterFlush() throws Throwable { + final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema()); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("msg-1")); + testHarness.processElement(new StreamRecord<>("msg-2")); + testHarness.processElement(new StreamRecord<>("msg-3")); + + // only let the first record succeed for now + UserRecordResult result = mock(UserRecordResult.class); + when(result.isSuccessful()).thenReturn(true); + producer.getPendingRecordFutures().get(0).set(result); + + CheckedThread snapshotThread = new CheckedThread() { + @Override + public void go() throws Exception { + // this should block at first, since there are still two pending records that needs to be flushed + testHarness.snapshot(123L, 123L); + } + }; + snapshotThread.start(); + + // let the 2nd message fail with an async exception + producer.getPendingRecordFutures().get(1).setException(new Exception("artificial async failure for 2nd message")); + producer.getPendingRecordFutures().get(2).set(mock(UserRecordResult.class)); + + try { + snapshotThread.sync(); + } catch (Exception e) { + // the next invoke should rethrow the async exception + e.printStackTrace(); + Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async failure for 2nd message").isPresent()); + + // test succeeded + return; + } + + Assert.fail(); + } + + /** + * Test ensuring that the producer is not dropping buffered records; + * we set a timeout because the test will not finish if the logic is broken. + */ + @SuppressWarnings("unchecked") + @Test(timeout = 10000) + public void testAtLeastOnceProducer() throws Throwable { --- End diff -- I am not sure if this test is good enough: 1. It is not testing the code that it should :( it is using overriden version of the `FlinkKinesisProducer` - `DummyFlinkKinesisProducer` can hide bugs in the real implementation. 2. Implementing it as a unit test with mocks, doesn't test for out integration with `Kinesis`. You made some assumption how `at-least-once` should be implemented, you implemented it in production code and here you are repeating the same code using the same assumptions :( However looking at Kafka tests instability I'm not sure which approach is worse... Unless those are not tests instabilities but bugs in our code, which Kafka's ITCases are triggering from time to time - this mockito based test would not discover such bugs.
---