[
https://issues.apache.org/jira/browse/FLINK-21080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17384869#comment-17384869
]
Yun Gao commented on FLINK-21080:
---------------------------------
Hi Piotr, very thanks for pointing out this issue and very sorry for missed
this issue initially. I think this issue indeed exists: with union list state
it may rely on a single task to write the value to implement a broadcast state
pattern, but if this task is finished, this piece of information is lost.
Regarding the solution, I also agree with the two options would work, and it
would indeed seems to have some complexity if we want to keep the union list
state for the finished subtask. Might it be ok that we consider another option
that as the initial thought of this issue, perhaps we could abort the
checkpoint if we found that an operator has used the union list state but part
of its subtasks are finished?
In detail, we might:
1. For a task, if we have called finish() for all the operators, it would
report a TaskStateSnapshot [isFinished = true]
2. We record how much finished subtask state has received for each operator in
some way in the pending checkpoint.
3. When a pending checkpoint is fully acknowledged, we iterates over all the
OperatorState, if for an OperatorState, it has part of subtasks finished, we
then iterates over all its reported operator state handle to see if it has
union list state (the StateInfo seems to have the information of the mode). If
it indeed happens, we abort the checkpoint.
It seems that the probability of we met this case (an operator use union list
state and part of subtask is finished, and we just take a checkpoint) would be
not too high. Thus perhaps aborting a checkpoint in this case would not cause
too much confusion to the users?
> Identify JobVertex containing legacy source operators and abort checkpoint
> with legacy source operators partially finished
> --------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-21080
> URL: https://issues.apache.org/jira/browse/FLINK-21080
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataStream, Runtime / Checkpointing
> Reporter: Yun Gao
> Assignee: Yun Gao
> Priority: Major
> Labels: auto-unassigned
>
> Most legacy source operators would record the offset for each partitions, and
> after recovery it would read from the recorded offset. If before a checkpoint
> some subtasks are finished, the corresponding partition offsets would be
> deserted in the checkpoint. Then if the job recover with this checkpoint, the
> legacy source would re-discovery all the partitions and for those finished
> tasks, the legacy source would re-read them since their offsets are not
> recorded.
> Therefore, we would like to fail the checkpoint if some legacy source
> operators have part of subtasks finished.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)