GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/2180

    [FLINK-4080][kinesis-connector] Guarantee exactly-once for Kinesis consumer 
when fail in middle of aggregated records

    If multiple Kinesis records were aggregated into a single record by KPL, 
when deaggregated at the consumer, all deaggregated subrecords will have the 
same sequence number. This breaks the exactly-once guarantee of the 
`FlinkKinesisConsumer` if it happens to fail while we are still in the middle 
of processing a deaggregated records (the first record's sequence number will 
incorrectly mark the whole batch of aggregated records as processed).
    
    To fix this, this PR changes the snapshot state type of 
`FlinkKinesisConsumer` from `HashMap<KinesisStreamShard, String>` to 
`HashMap<KinesisStreamShard, SequenceNumber>`.
    `SequenceNumber` is a new class to represent a combination of a "main 
sequence number" and a "subsequence number". When the `ShardConsumerThread` 
starts consuming records, we check if the last record after restore was a 
aggregated record. If yes, we first handle the dangling subrecords.
    
    @rmetzger I'm adding this change to the original Kinesis connector in 
`master` instead of waiting for the big PR #2131 to be merged, because I think 
this is a bug we must fix before 1.1, and I'm not sure if #2131 will be merged 
before the RC for 1.1 comes out. Depending on whether #2131 or this PR gets 
merged first, I'll rebase the other one accordingly.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-4080

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2180.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2180
    
----
commit a9593677a4c73c9475b1e85204002d6470f2115a
Author: Gordon Tai <[email protected]>
Date:   2016-06-29T07:46:35Z

    [FLINK-4080] Guarantee exactly-once for Kinesis consumer for failures in 
the middle of aggregated records

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to