[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338612#comment-15338612
 ] 

ASF GitHub Bot commented on FLINK-3231:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/2131

    [FLINK-3231][streaming-connectors] FlinkKinesisConsumer rework to handle 
Kinesis resharding

    This change attempts to solve 2 issues:
    1. [FLINK-3231](https://issues.apache.org/jira/browse/FLINK-3231): Handle 
Kinesis-side resharding.
    2. [FLINK-4020](https://issues.apache.org/jira/browse/FLINK-4020): Remove 
shard list querying from Kinesis consumer constructor.
    
    Some notes on the implementation:
    - All subtasks has a thread that continuously polls for changes in the 
Kinesis stream, and uses exponential backoff with jitter to try to even out the 
concurrent Kinesis client describeStream operations across subtasks. 
Continuously polling is necessary because there's currently no way to "signal" 
a subtask that it has a new shard it should be subscribing to.
    - A big change is that all subtasks run a fetcher that continues to poll 
for shards, even if the subtask initially didn't have shards to consume 
(before, a MAX_VALUE waterwark was sent out).
    - Apart from the unit tests, I've manually tested this with 
`ManualExactlyOnceWithStreamReshardingTest`. However, since the 
`FlinkKinesisProducer` currently has some problem that records are resent when 
Kinesis streams are resharded (thus not allowing the exactly-once test to pass 
at all), this manual test uses a normal event generator instead of a producer 
topology running the `FlinkKinesisProducer`.
    
    Since this PR introduces considerable rework on the Kinesis consumer, I'll 
wait until this is merged before submitting 
[FLINK-4080](https://issues.apache.org/jira/browse/FLINK-4080) & 
[FLINK-4019](https://issues.apache.org/jira/browse/FLINK-4019).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-3231

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2131
    
----
commit 378ec8177f1bfe91b459233a8ce02e9f988c61ab
Author: Gordon Tai <[email protected]>
Date:   2016-06-08T10:46:02Z

    [FLINK-4020] Move shard list querying to open() for Kinesis consumer

commit 2c9f1304d5f6220fe36ad9d7833a506651f3fee6
Author: Gordon Tai <[email protected]>
Date:   2016-06-19T16:15:43Z

    [FLINK-3231] FlinkKinesisConsumer rework to handle Kinesis resharding

----


> Handle Kinesis-side resharding in Kinesis streaming consumer
> ------------------------------------------------------------
>
>                 Key: FLINK-3231
>                 URL: https://issues.apache.org/jira/browse/FLINK-3231
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kinesis Connector, Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to