Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5269#discussion_r165038908
--- Diff:
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
---
@@ -86,6 +88,56 @@ public void testIfNoShardsAreFoundShouldThrowException()
throws Exception {
fetcher.runFetcher(); // this should throw RuntimeException
}
+ @Test
+ public void testSkipCorruptedRecord() throws Exception {
+ final String stream = "fakeStream";
+ final int numShards = 3;
+
+ final LinkedList<KinesisStreamShardState> testShardStates = new
LinkedList<>();
+ final TestSourceContext<String> sourceContext = new
TestSourceContext<>();
+
+ final TestableKinesisDataFetcher<String> fetcher = new
TestableKinesisDataFetcher<>(
+ Collections.singletonList(stream),
+ sourceContext,
+ TestUtils.getStandardProperties(),
+ new KinesisDeserializationSchemaWrapper<>(new
SimpleStringSchema()),
+ 1,
+ 0,
+ new AtomicReference<>(),
+ testShardStates,
+ new HashMap<>(),
+
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(Collections.singletonMap(stream,
numShards)));
+
+ // FlinkKinesisConsumer is responsible for setting up the
fetcher before it can be run;
+ // run the consumer until it reaches the point where the
fetcher starts to run
+ final DummyFlinkKafkaConsumer<String> consumer = new
DummyFlinkKafkaConsumer<>(TestUtils.getStandardProperties(), fetcher, 1, 0);
+
+ CheckedThread consumerThread = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ consumer.run(new TestSourceContext<>());
+ }
+ };
+ consumerThread.start();
+
+ fetcher.waitUntilRun();
+ consumer.cancel();
+ consumerThread.sync();
+
+ assertEquals(numShards, testShardStates.size());
+
+ for (int i = 0; i < numShards; i++) {
+ fetcher.emitRecordAndUpdateState("record-" + i, 10L, i,
new SequenceNumber("seq-num-1"));
+ assertEquals(new SequenceNumber("seq-num-1"),
testShardStates.get(i).getLastProcessedSequenceNum());
+ assertEquals(new StreamRecord<>("record-" + i,
10L), sourceContext.removeLatestOutput());
--- End diff --
Will fix.
---