[ 
https://issues.apache.org/jira/browse/FLINK-4577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947987#comment-15947987
 ] 

ASF GitHub Bot commented on FLINK-4577:
---------------------------------------

Github user skidder commented on the issue:

    https://github.com/apache/flink/pull/3458
  
    @tzulitai I was able to scale up the number of shards on a Kinesis stream 
without any interruption in processing. Looks good!
    
    # Configuration
    ## Flink Cluster
    Single Job Manager running as standalone cluster, with a single Task 
Manager with 4 slots. Both Flink servers were built from source of this feature 
branch.
    
    ## Application
    Single Flink application running with parallelism of 4
    
    ## Kinesis stream
    Stream name `mux_video_events_staging` with one shard
    
    # Test Steps & Results
    On startup the Flink application has Sub-task (1) read from the one shard 
on the stream:
    ```
    2017-03-29 15:55:03,306 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
Subtask 1 will be seeded with initial shard 
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: 
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49569797317567661038287361310393874557410775187880673282,}}'}, starting state 
set as sequence number LATEST_SEQUENCE_NUM
    2017-03-29 15:55:03,312 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
Subtask 1 will start consuming seeded shard 
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: 
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49569797317567661038287361310393874557410775187880673282,}}'} from sequence 
number LATEST_SEQUENCE_NUM with ShardConsumer 0
    ```
    
    Next, I increased the number of shards from 1 to 2. Sub-task (1) previously 
responsible for reading from the one shard is marked as temporarily idle; 
Sub-tasks (2) & (3) begin reading from the 2 new shards:
    ```
    2017-03-29 17:36:36,741 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
Subtask 3 has discovered a new shard 
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: 
shardId-000000000002,ParentShardId: shardId-000000000000,HashKeyRange: 
{StartingHashKey: 170141183460469231731687303715884105729,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49571795731277369408532401914933651574587478731630051362,}}'} due to 
resharding, and will start consuming the shard from sequence number 
EARLIEST_SEQUENCE_NUM with ShardConsumer 0
    2017-03-29 17:36:38,606 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
Subtask 1 has reached the end of subscribed shard: 
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: 
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49569797317567661038287361310393874557410775187880673282,}}'}
    2017-03-29 17:36:38,606 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
Subtask 1 has reached the end of all currently subscribed shards; marking the 
subtask as temporarily idle ...
    2017-03-29 17:36:45,240 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
Subtask 2 has discovered a new shard 
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: 
shardId-000000000001,ParentShardId: shardId-000000000000,HashKeyRange: 
{StartingHashKey: 0,EndingHashKey: 
170141183460469231731687303715884105728},SequenceNumberRange: 
{StartingSequenceNumber: 
49571795731255068663333871291792115856314830370124070930,}}'} due to 
resharding, and will start consuming the shard from sequence number 
EARLIEST_SEQUENCE_NUM with ShardConsumer 0
    ```
    
    I then increased the number of shards from 2 to 4. Sub-task (0) reads from 
a new shard; Sub-tasks 
    (2) & (3) stop reading from their closed shards and begin reading from the 
new shards; Sub-task (1), which was previously marked as temporarily-idle, 
starts reading from a new shard:
    ```
    2017-03-29 17:41:58,005 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
Subtask 2 has reached the end of subscribed shard: 
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: 
shardId-000000000001,ParentShardId: shardId-000000000000,HashKeyRange: 
{StartingHashKey: 0,EndingHashKey: 
170141183460469231731687303715884105728},SequenceNumberRange: 
{StartingSequenceNumber: 
49571795731255068663333871291792115856314830370124070930,}}'}
    2017-03-29 17:41:58,393 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
Subtask 1 has discovered a new shard 
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: 
shardId-000000000004,ParentShardId: shardId-000000000001,HashKeyRange: 
{StartingHashKey: 85070591730234615865843651857942052865,EndingHashKey: 
170141183460469231731687303715884105728},SequenceNumberRange: 
{StartingSequenceNumber: 
49571795845144974392229763675615029074730034502679134274,}}'} due to 
resharding, and will start consuming the shard from sequence number 
EARLIEST_SEQUENCE_NUM with ShardConsumer 1
    2017-03-29 17:41:59,602 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
Subtask 0 has discovered a new shard 
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: 
shardId-000000000003,ParentShardId: shardId-000000000001,HashKeyRange: 
{StartingHashKey: 0,EndingHashKey: 
85070591730234615865843651857942052864},SequenceNumberRange: 
{StartingSequenceNumber: 
49571795845122673647031233052473493356457386141173153842,}}'} due to 
resharding, and will start consuming the shard from sequence number 
EARLIEST_SEQUENCE_NUM with ShardConsumer 0
    2017-03-29 17:42:28,173 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
Subtask 3 has reached the end of subscribed shard: 
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: 
shardId-000000000002,ParentShardId: shardId-000000000000,HashKeyRange: 
{StartingHashKey: 170141183460469231731687303715884105729,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49571795731277369408532401914933651574587478731630051362,}}'}
    2017-03-29 17:42:29,520 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
Subtask 2 has discovered a new shard 
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: 
shardId-000000000005,ParentShardId: shardId-000000000002,HashKeyRange: 
{StartingHashKey: 170141183460469231731687303715884105729,EndingHashKey: 
255211775190703847597530955573826158593},SequenceNumberRange: 
{StartingSequenceNumber: 
49571795855871632832722993406693709563873898448640016466,}}'} due to 
resharding, and will start consuming the shard from sequence number 
EARLIEST_SEQUENCE_NUM with ShardConsumer 1
    2017-03-29 17:42:31,183 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
Subtask 3 has discovered a new shard 
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: 
shardId-000000000006,ParentShardId: shardId-000000000002,HashKeyRange: 
{StartingHashKey: 255211775190703847597530955573826158594,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49571795855893933577921524029835245282146546810145996898,}}'} due to 
resharding, and will start consuming the shard from sequence number 
EARLIEST_SEQUENCE_NUM with ShardConsumer 1
    ```


> Re-enable transparent reshard handling in Kinesis Consumer
> ----------------------------------------------------------
>
>                 Key: FLINK-4577
>                 URL: https://issues.apache.org/jira/browse/FLINK-4577
>             Project: Flink
>          Issue Type: Task
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>
> In FLINK-4341, we disabled transparent reshard handling in the Kinesis 
> consumer as a short-term workaround before FLINK-4576 comes around.
> This ticket tracks the progress of re-enabling it again, by implementing a 
> {{LowWatermarkListener}} interface as described in FLINK-4576.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to