GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/5269

    [FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip 
non-deserializable records

    ## What is the purpose of the change
    
    This PR is based on #5268, which includes fixes to harden Kinesis unit 
tests. Only the last commit is relevant.
    
    In the past, we allowed the Flink Kafka Consumer to skip corrupted Kafka 
records which cannot be deserialized. In reality pipelines, it is entirely 
normal that this could happen.
    
    This PR adds this functionality to the Flink Kinesis Consumer also.
    
    ## Brief change log
    
    - Clarify in Javadoc of `KinesisDeserializationSchema` that `null` can be 
returned if a message cannot be deserialized.
    - If `record` is `null` in 
`KinesisDataFetcher::emitRecordAndUpdateState(...)`, do not collect any output 
for the record.
    - Add `KinesisDataFetcherTest::testSkipCorruptedRecord()` to verify feature.
    
    ## Verifying this change
    
    Additional `KinesisDataFetcherTest::testSkipCorruptedRecord()` test 
verifies this change.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (**yes** / 
no / don't know)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-6004

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5269.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5269
    
----
commit 94b45919afa5a3ec3ce68c45e57f7989397f9640
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-10T02:11:31Z

    [FLINK-8398] [kinesis, tests] Cleanup confusing implementations in 
KinesisDataFetcherTest and related classes
    
    The previous implementation of the TestableKinesisDataFetcher was
    confusing in various ways, causing it hard to be re-used for other
    tests. This commit contains the following various cleaups:
    
    - Remove confusing mocks of source context and checkpoint lock. We now
      allow users of the TestableKinesisDataFetcher to provide a source
      context, which should provide the checkpoint lock.
    - Remove override of emitRecordAndUpdateState(). Strictly speaking, that
      method should be final. It was previously overriden to allow
      verifying how many records were output by the fetcher. That
      verification would be better implemented within a mock source context.
    - Properly parameterize the output type for the
      TestableKinesisDataFetcher.
    - Remove use of PowerMockito in KinesisDataFetcherTest.
    - Use CheckedThreads to properly capture any exceptions in fetcher /
      consumer threads in unit tests.
    - Use assertEquals / assertNull instead of assertTrue where-ever
      appropriate.

commit 547d19f9196512231661f427f3792f2e1f831339
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-10T05:41:49Z

    [FLINK-8398] [kinesis, tests] Stabilize flaky KinesisDataFetcherTests
    
    Prior to this commit, several unit tests in KinesisDataFetcherTest
    relied on sleeps to wait until a certain operation happens, in order for
    the test to pass.
    
    This commit removes those sleeps and replaces the test behaviours with
    OneShotLatches.

commit 8d2b086ae7133999ec03620e3434cd659fd8d9d3
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-10T06:04:10Z

    [FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip records
    
    This commit acknowledges that null can be returned from the
    deserialization schema, if the message cannot be deserialized. If null
    is returned for a Kinesis record, no output is produced for that record,
    while the sequence number in the shard state is still advanced so that
    the record is effectively accounted as processed.

----


---

Reply via email to