[
https://issues.apache.org/jira/browse/FLINK-21080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385990#comment-17385990
]
Yun Gao edited comment on FLINK-21080 at 7/23/21, 8:04 AM:
-----------------------------------------------------------
Hi Till, very thanks for the thoughts! I also agree with the broadcast state
and the union state, but for one thing perhaps the normal operator list state
(namely the SPLIT_DISTRIBUTE mode) would be ok?
For the first run, if a part of subtask finished, currently the problem mainly
comes from we do not store their state in the following checkpoints. Then after
restoring,
# if there is no rescaling and no topology change, the state assignment would
not change and the finished task would still not received new records, thus
there should be no problem in this case.
# If there are rescaling or topology change, the situation should be similar
to the normal rescale up, we might have new subtasks that started from an empty
state. As a whole, the remaining records should not rely on the discarded
states since in the previous run it should also do not have access to these
pieces of states, and there should not be problems if a record is assigned to
another subtask since it also happen in rescale case.
The union state is different in that for the normal list state, the subtask
usually initialize its state with what is left in the checkpoint, but for the
union list state, it might relies on the replication (like in the sink case) or
relies to the minus (like the source case), thus if we have a part of states
lost, we would have a problem~
was (Author: gaoyunhaii):
Hi Till, very thanks for the thoughts! I also agree with the broadcast state
and the union state, but for one thing perhaps the normal operator list state
(namely the SPLIT_DISTRIBUTE mode) would be ok?
For the first run, if a part of subtask finished, currently the problem mainly
comes from we do not store their state in the following checkpoints. Then after
restoring,
# if there is no rescaling and no topology change, the state assignment would
not change and the finished task would still not received new records, thus
there should be no problem in this case.
# If there are rescaling or topology change, the situation should be similar
to the normal rescale up, we might have new subtasks that started from an empty
state. As a whole, the remaining records should not rely on the discarded
states since in the previous run it should also do not have access to these
pieces of states, and there should be problem a record is assigned to another
subtask since it also happen in rescale case.
The union state is different in that for the normal list state, the subtask
usually initialize its state with what is left in the checkpoint, but for the
union list state, it might relies on the replication (like in the sink case) or
relies to the minus (like the source case), thus if we have a part of states
lost, we would have a problem~
> Handle UnionListState with finished operators
> ---------------------------------------------
>
> Key: FLINK-21080
> URL: https://issues.apache.org/jira/browse/FLINK-21080
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataStream, Runtime / Checkpointing
> Reporter: Yun Gao
> Assignee: Yun Gao
> Priority: Major
> Labels: auto-unassigned
>
> Most legacy source operators would record the offset for each partitions, and
> after recovery it would read from the recorded offset. If before a checkpoint
> some subtasks are finished, the corresponding partition offsets would be
> deserted in the checkpoint. Then if the job recover with this checkpoint, the
> legacy source would re-discovery all the partitions and for those finished
> tasks, the legacy source would re-read them since their offsets are not
> recorded.
> Therefore, we would like to fail the checkpoint if some legacy source
> operators have part of subtasks finished.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)