Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3001
  
    @tony810430 (cc @StephanEwen, f.y.i.)
    At a second closer look, I'm afraid this PR can't be merged as is. The 
problem is that the state redistribution of `ListCheckpointed` doesn't work 
with the Kinesis consumer's current shard discovery mechanism.
    
    On restore, each subtask uses the restored states it gets to appropriately 
set the "last seen shard ID" of the subtask. With this value set, the subtask 
is able to discover only shards after the "last seen shard ID". Then, the 
subtask determines which of the newly discovered shards it should be 
responsible of consuming, using a simple modulo operation on the shards' hash 
values.
    
    This works before when restored state could not be redistributed, because 
subtasks will always be restored shards which belong to that subtask (i.e. via 
the modulo on hash operation).
    
    The state redistribution on restore for `ListCheckpointed` breaks this. For 
example:
    Job starts with only 1 subtask for FlinkKinesisConsumer, and the Kinesis 
stream has 2 shards:
    subtask #1 --> shard1, shard2.
    
    After a restore with increased parallelism to 2, let's say the list state 
gets redistributed as:
    subtask #1 --> shard1
    subtask #2 --> shard2
    
    Subtask #1's _last seen shard ID_ will be set to shard1, and will therefore 
discover shard2 as a new shard afterwards. If the shard2 gets hashed to subtask 
#1, we'll have both subtasks consuming shard2.
    
    Changing the hashing / subtask-to-shard assignment determination for the 
shard discovery probably can't solve the problem, because no matter how we 
change that, it'll still be dependent of what the list state redistribution 
looks like.
    
    The only way I can see in solving this would probably be have merged state 
on restore, so that all subtasks may set the "last seen shard ID" to the 
largest ID across all subtasks, not just the local subtask.
    
    In flip-8 I see the community has discussed an interface for merged state 
also (a unioned list state on restore). I think that will be really useful in 
this particular case here. It'll also be relevant for the Kafka connector, 
right now it seems irrelevant only because the Kafka consumer doesn't have 
partition discovery yet.
    @StefanRRichter could you probably provide some insight on the merged state 
aspect? I'm not that familiar yet with the recent works and progress on the 
repartitionable states.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to