Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/3001
@tony810430 (cc @StephanEwen, f.y.i.)
At a second closer look, I'm afraid this PR can't be merged as is. The
problem is that the state redistribution of `ListCheckpointed` doesn't work
with the Kinesis consumer's current shard discovery mechanism.
On restore, each subtask uses the restored states it gets to appropriately
set the "last seen shard ID" of the subtask. With this value set, the subtask
is able to discover only shards after the "last seen shard ID". Then, the
subtask determines which of the newly discovered shards it should be
responsible of consuming, using a simple modulo operation on the shards' hash
values.
This works before when restored state could not be redistributed, because
subtasks will always be restored shards which belong to that subtask (i.e. via
the modulo on hash operation).
The state redistribution on restore for `ListCheckpointed` breaks this. For
example:
Job starts with only 1 subtask for FlinkKinesisConsumer, and the Kinesis
stream has 2 shards:
subtask #1 --> shard1, shard2.
After a restore with increased parallelism to 2, let's say the list state
gets redistributed as:
subtask #1 --> shard1
subtask #2 --> shard2
Subtask #1's _last seen shard ID_ will be set to shard1, and will therefore
discover shard2 as a new shard afterwards. If the shard2 gets hashed to subtask
#1, we'll have both subtasks consuming shard2.
Changing the hashing / subtask-to-shard assignment determination for the
shard discovery probably can't solve the problem, because no matter how we
change that, it'll still be dependent of what the list state redistribution
looks like.
The only way I can see in solving this would probably be have merged state
on restore, so that all subtasks may set the "last seen shard ID" to the
largest ID across all subtasks, not just the local subtask.
In flip-8 I see the community has discussed an interface for merged state
also (a unioned list state on restore). I think that will be really useful in
this particular case here. It'll also be relevant for the Kafka connector,
right now it seems irrelevant only because the Kafka consumer doesn't have
partition discovery yet.
@StefanRRichter could you probably provide some insight on the merged state
aspect? I'm not that familiar yet with the recent works and progress on the
repartitionable states.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---