Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4871#discussion_r146200480
--- Diff:
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
---
@@ -49,61 +71,205 @@ public void
testCreateWithNonSerializableDeserializerFails() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("The provided serialization schema is
not serializable");
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
-
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisProducer<>(new
NonSerializableSerializationSchema(), testConfig);
+ new FlinkKinesisProducer<>(new
NonSerializableSerializationSchema(), getStandardProperties());
}
@Test
public void testCreateWithSerializableDeserializer() {
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
-
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisProducer<>(new
SerializableSerializationSchema(), testConfig);
+ new FlinkKinesisProducer<>(new
SerializableSerializationSchema(), getStandardProperties());
}
@Test
public void testConfigureWithNonSerializableCustomPartitionerFails() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("The provided custom partitioner is not
serializable");
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
-
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
+ new FlinkKinesisProducer<>(new SimpleStringSchema(),
getStandardProperties())
.setCustomPartitioner(new
NonSerializableCustomPartitioner());
}
@Test
public void testConfigureWithSerializableCustomPartitioner() {
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
-
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
+ new FlinkKinesisProducer<>(new SimpleStringSchema(),
getStandardProperties())
.setCustomPartitioner(new
SerializableCustomPartitioner());
}
@Test
public void testConsumerIsSerializable() {
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
-
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- FlinkKinesisProducer<String> consumer = new
FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig);
+ FlinkKinesisProducer<String> consumer = new
FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties());
assertTrue(InstantiationUtil.isSerializable(consumer));
}
//
----------------------------------------------------------------------
+ // Tests to verify at-least-once guarantee
+ //
----------------------------------------------------------------------
+
+ /**
+ * Test ensuring that if an invoke call happens right after an async
exception is caught, it should be rethrown.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Test
+ public void testAsyncErrorRethrownOnInvoke() throws Throwable {
+ final DummyFlinkKinesisProducer<String> producer = new
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+
+ producer.getPendingRecordFutures().get(0).setException(new
Exception("artificial async exception"));
+
+ try {
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ } catch (Exception e) {
+ // the next invoke should rethrow the async exception
+
Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async
exception").isPresent());
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that if a snapshot call happens right after an async
exception is caught, it should be rethrown.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Test
+ public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
+ final DummyFlinkKinesisProducer<String> producer = new
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+
+ producer.getPendingRecordFutures().get(0).setException(new
Exception("artificial async exception"));
+
+ try {
+ testHarness.snapshot(123L, 123L);
+ } catch (Exception e) {
+ // the next invoke should rethrow the async exception
+
Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async
exception").isPresent());
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that if an async exception is caught for one of the
flushed requests on checkpoint,
+ * it should be rethrown; we set a timeout because the test will not
finish if the logic is broken.
--- End diff --
Did you forget to set the timeout?
---