[
https://issues.apache.org/jira/browse/FLINK-4577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947987#comment-15947987
]
ASF GitHub Bot commented on FLINK-4577:
---------------------------------------
Github user skidder commented on the issue:
https://github.com/apache/flink/pull/3458
@tzulitai I was able to scale up the number of shards on a Kinesis stream
without any interruption in processing. Looks good!
# Configuration
## Flink Cluster
Single Job Manager running as standalone cluster, with a single Task
Manager with 4 slots. Both Flink servers were built from source of this feature
branch.
## Application
Single Flink application running with parallelism of 4
## Kinesis stream
Stream name `mux_video_events_staging` with one shard
# Test Steps & Results
On startup the Flink application has Sub-task (1) read from the one shard
on the stream:
```
2017-03-29 15:55:03,306 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Subtask 1 will be seeded with initial shard
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49569797317567661038287361310393874557410775187880673282,}}'}, starting state
set as sequence number LATEST_SEQUENCE_NUM
2017-03-29 15:55:03,312 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Subtask 1 will start consuming seeded shard
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49569797317567661038287361310393874557410775187880673282,}}'} from sequence
number LATEST_SEQUENCE_NUM with ShardConsumer 0
```
Next, I increased the number of shards from 1 to 2. Sub-task (1) previously
responsible for reading from the one shard is marked as temporarily idle;
Sub-tasks (2) & (3) begin reading from the 2 new shards:
```
2017-03-29 17:36:36,741 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Subtask 3 has discovered a new shard
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId:
shardId-000000000002,ParentShardId: shardId-000000000000,HashKeyRange:
{StartingHashKey: 170141183460469231731687303715884105729,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49571795731277369408532401914933651574587478731630051362,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 0
2017-03-29 17:36:38,606 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Subtask 1 has reached the end of subscribed shard:
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49569797317567661038287361310393874557410775187880673282,}}'}
2017-03-29 17:36:38,606 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Subtask 1 has reached the end of all currently subscribed shards; marking the
subtask as temporarily idle ...
2017-03-29 17:36:45,240 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Subtask 2 has discovered a new shard
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId:
shardId-000000000001,ParentShardId: shardId-000000000000,HashKeyRange:
{StartingHashKey: 0,EndingHashKey:
170141183460469231731687303715884105728},SequenceNumberRange:
{StartingSequenceNumber:
49571795731255068663333871291792115856314830370124070930,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 0
```
I then increased the number of shards from 2 to 4. Sub-task (0) reads from
a new shard; Sub-tasks
(2) & (3) stop reading from their closed shards and begin reading from the
new shards; Sub-task (1), which was previously marked as temporarily-idle,
starts reading from a new shard:
```
2017-03-29 17:41:58,005 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Subtask 2 has reached the end of subscribed shard:
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId:
shardId-000000000001,ParentShardId: shardId-000000000000,HashKeyRange:
{StartingHashKey: 0,EndingHashKey:
170141183460469231731687303715884105728},SequenceNumberRange:
{StartingSequenceNumber:
49571795731255068663333871291792115856314830370124070930,}}'}
2017-03-29 17:41:58,393 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Subtask 1 has discovered a new shard
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId:
shardId-000000000004,ParentShardId: shardId-000000000001,HashKeyRange:
{StartingHashKey: 85070591730234615865843651857942052865,EndingHashKey:
170141183460469231731687303715884105728},SequenceNumberRange:
{StartingSequenceNumber:
49571795845144974392229763675615029074730034502679134274,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 1
2017-03-29 17:41:59,602 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Subtask 0 has discovered a new shard
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId:
shardId-000000000003,ParentShardId: shardId-000000000001,HashKeyRange:
{StartingHashKey: 0,EndingHashKey:
85070591730234615865843651857942052864},SequenceNumberRange:
{StartingSequenceNumber:
49571795845122673647031233052473493356457386141173153842,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 0
2017-03-29 17:42:28,173 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Subtask 3 has reached the end of subscribed shard:
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId:
shardId-000000000002,ParentShardId: shardId-000000000000,HashKeyRange:
{StartingHashKey: 170141183460469231731687303715884105729,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49571795731277369408532401914933651574587478731630051362,}}'}
2017-03-29 17:42:29,520 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Subtask 2 has discovered a new shard
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId:
shardId-000000000005,ParentShardId: shardId-000000000002,HashKeyRange:
{StartingHashKey: 170141183460469231731687303715884105729,EndingHashKey:
255211775190703847597530955573826158593},SequenceNumberRange:
{StartingSequenceNumber:
49571795855871632832722993406693709563873898448640016466,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 1
2017-03-29 17:42:31,183 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Subtask 3 has discovered a new shard
KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId:
shardId-000000000006,ParentShardId: shardId-000000000002,HashKeyRange:
{StartingHashKey: 255211775190703847597530955573826158594,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49571795855893933577921524029835245282146546810145996898,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 1
```
> Re-enable transparent reshard handling in Kinesis Consumer
> ----------------------------------------------------------
>
> Key: FLINK-4577
> URL: https://issues.apache.org/jira/browse/FLINK-4577
> Project: Flink
> Issue Type: Task
> Components: Kinesis Connector
> Reporter: Tzu-Li (Gordon) Tai
>
> In FLINK-4341, we disabled transparent reshard handling in the Kinesis
> consumer as a short-term workaround before FLINK-4576 comes around.
> This ticket tracks the progress of re-enabling it again, by implementing a
> {{LowWatermarkListener}} interface as described in FLINK-4576.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)