Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4871#discussion_r146204340
--- Diff:
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
---
@@ -49,61 +71,205 @@ public void
testCreateWithNonSerializableDeserializerFails() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("The provided serialization schema is
not serializable");
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
-
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisProducer<>(new
NonSerializableSerializationSchema(), testConfig);
+ new FlinkKinesisProducer<>(new
NonSerializableSerializationSchema(), getStandardProperties());
}
@Test
public void testCreateWithSerializableDeserializer() {
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
-
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisProducer<>(new
SerializableSerializationSchema(), testConfig);
+ new FlinkKinesisProducer<>(new
SerializableSerializationSchema(), getStandardProperties());
}
@Test
public void testConfigureWithNonSerializableCustomPartitionerFails() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("The provided custom partitioner is not
serializable");
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
-
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
+ new FlinkKinesisProducer<>(new SimpleStringSchema(),
getStandardProperties())
.setCustomPartitioner(new
NonSerializableCustomPartitioner());
}
@Test
public void testConfigureWithSerializableCustomPartitioner() {
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
-
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
+ new FlinkKinesisProducer<>(new SimpleStringSchema(),
getStandardProperties())
.setCustomPartitioner(new
SerializableCustomPartitioner());
}
@Test
public void testConsumerIsSerializable() {
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
-
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- FlinkKinesisProducer<String> consumer = new
FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig);
+ FlinkKinesisProducer<String> consumer = new
FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties());
assertTrue(InstantiationUtil.isSerializable(consumer));
}
//
----------------------------------------------------------------------
+ // Tests to verify at-least-once guarantee
+ //
----------------------------------------------------------------------
+
+ /**
+ * Test ensuring that if an invoke call happens right after an async
exception is caught, it should be rethrown.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Test
+ public void testAsyncErrorRethrownOnInvoke() throws Throwable {
+ final DummyFlinkKinesisProducer<String> producer = new
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+
+ producer.getPendingRecordFutures().get(0).setException(new
Exception("artificial async exception"));
+
+ try {
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ } catch (Exception e) {
+ // the next invoke should rethrow the async exception
+
Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async
exception").isPresent());
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that if a snapshot call happens right after an async
exception is caught, it should be rethrown.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Test
+ public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
+ final DummyFlinkKinesisProducer<String> producer = new
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+
+ producer.getPendingRecordFutures().get(0).setException(new
Exception("artificial async exception"));
+
+ try {
+ testHarness.snapshot(123L, 123L);
+ } catch (Exception e) {
+ // the next invoke should rethrow the async exception
+
Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async
exception").isPresent());
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that if an async exception is caught for one of the
flushed requests on checkpoint,
+ * it should be rethrown; we set a timeout because the test will not
finish if the logic is broken.
+ *
+ * <p>Note that this test does not test the snapshot method is blocked
correctly when there are pending recorrds.
+ * The test for that is covered in testAtLeastOnceProducer.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Test
+ public void testAsyncErrorRethrownAfterFlush() throws Throwable {
+ final DummyFlinkKinesisProducer<String> producer = new
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ testHarness.processElement(new StreamRecord<>("msg-3"));
+
+ // only let the first record succeed for now
+ UserRecordResult result = mock(UserRecordResult.class);
+ when(result.isSuccessful()).thenReturn(true);
+ producer.getPendingRecordFutures().get(0).set(result);
+
+ CheckedThread snapshotThread = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ // this should block at first, since there are
still two pending records that needs to be flushed
+ testHarness.snapshot(123L, 123L);
+ }
+ };
+ snapshotThread.start();
+
+ // let the 2nd message fail with an async exception
+ producer.getPendingRecordFutures().get(1).setException(new
Exception("artificial async failure for 2nd message"));
+
producer.getPendingRecordFutures().get(2).set(mock(UserRecordResult.class));
+
+ try {
+ snapshotThread.sync();
+ } catch (Exception e) {
+ // the next invoke should rethrow the async exception
+ e.printStackTrace();
+
Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async
failure for 2nd message").isPresent());
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that the producer is not dropping buffered records;
+ * we set a timeout because the test will not finish if the logic is
broken.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 10000)
+ public void testAtLeastOnceProducer() throws Throwable {
--- End diff --
I am not sure if this test is good enough:
1. It is not testing the code that it should :( it is using overriden
version of the `FlinkKinesisProducer` - `DummyFlinkKinesisProducer` can hide
bugs in the real implementation.
2. Implementing it as a unit test with mocks, doesn't test for out
integration with `Kinesis`. You made some assumption how `at-least-once` should
be implemented, you implemented it in production code and here you are
repeating the same code using the same assumptions :(
However looking at Kafka tests instability I'm not sure which approach is
worse... Unless those are not tests instabilities but bugs in our code, which
Kafka's ITCases are triggering from time to time - this mockito based test
would not discover such bugs.
---