[ 
https://issues.apache.org/jira/browse/FLINK-6004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346746#comment-16346746
 ] 

ASF GitHub Bot commented on FLINK-6004:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5269#discussion_r165038908
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
 ---
    @@ -86,6 +88,56 @@ public void testIfNoShardsAreFoundShouldThrowException() 
throws Exception {
                fetcher.runFetcher(); // this should throw RuntimeException
        }
     
    +   @Test
    +   public void testSkipCorruptedRecord() throws Exception {
    +           final String stream = "fakeStream";
    +           final int numShards = 3;
    +
    +           final LinkedList<KinesisStreamShardState> testShardStates = new 
LinkedList<>();
    +           final TestSourceContext<String> sourceContext = new 
TestSourceContext<>();
    +
    +           final TestableKinesisDataFetcher<String> fetcher = new 
TestableKinesisDataFetcher<>(
    +                   Collections.singletonList(stream),
    +                   sourceContext,
    +                   TestUtils.getStandardProperties(),
    +                   new KinesisDeserializationSchemaWrapper<>(new 
SimpleStringSchema()),
    +                   1,
    +                   0,
    +                   new AtomicReference<>(),
    +                   testShardStates,
    +                   new HashMap<>(),
    +                   
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(Collections.singletonMap(stream,
 numShards)));
    +
    +           // FlinkKinesisConsumer is responsible for setting up the 
fetcher before it can be run;
    +           // run the consumer until it reaches the point where the 
fetcher starts to run
    +           final DummyFlinkKafkaConsumer<String> consumer = new 
DummyFlinkKafkaConsumer<>(TestUtils.getStandardProperties(), fetcher, 1, 0);
    +
    +           CheckedThread consumerThread = new CheckedThread() {
    +                   @Override
    +                   public void go() throws Exception {
    +                           consumer.run(new TestSourceContext<>());
    +                   }
    +           };
    +           consumerThread.start();
    +
    +           fetcher.waitUntilRun();
    +           consumer.cancel();
    +           consumerThread.sync();
    +
    +           assertEquals(numShards, testShardStates.size());
    +
    +           for (int i = 0; i < numShards; i++) {
    +                   fetcher.emitRecordAndUpdateState("record-" + i, 10L, i, 
new SequenceNumber("seq-num-1"));
    +                   assertEquals(new SequenceNumber("seq-num-1"), 
testShardStates.get(i).getLastProcessedSequenceNum());
    +                           assertEquals(new StreamRecord<>("record-" + i, 
10L), sourceContext.removeLatestOutput());
    --- End diff --
    
    Will fix.


> Allow FlinkKinesisConsumer to skip corrupted messages
> -----------------------------------------------------
>
>                 Key: FLINK-6004
>                 URL: https://issues.apache.org/jira/browse/FLINK-6004
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Major
>
> It is quite clear from the fix of FLINK-3679 that in reality, users might 
> encounter corrupted messages from Kafka / Kinesis / generally external 
> sources when deserializing them.
> The consumers should support simply skipping those messages, by letting the 
> deserialization schema return {{null}}, and checking {{null}} values within 
> the consumer.
> This has been done for the Kafka consumer already. This ticket tracks the 
> improvement for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to