[
https://issues.apache.org/jira/browse/FLINK-21080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385258#comment-17385258
]
Yun Gao commented on FLINK-21080:
---------------------------------
Hi Piotr, sorry for the rush reply, and there seems still one issue, I'll
detail it in the last:
{quote}A note. BroadcastState I think is something else compared to the
UnionListState, is it? Also with UnionListState it doesn't necessary must only
single subtaskId == 0 subtask writing/updating the state. There might be cases
were there are many writers.
{quote}
Yes indeed, the BroadcastState should also be a problem since currently when
repartitioning the state the new parallelism _i_ would fetch the state of _i_ %
previousParallelsim, if some of task has been finished, its state would be
empty and thus cause problem. We might also do some workaround for
BroadcastState similar to the UnionListState, but perhaps we could also do some
change to the repartition method since logically the broadcast state of all the
subtask is the same, and the restarted task could restored from any subtask.
And for the UnionListState there would indeed other cases, and we would
consider them as whole in the following~
{quote}FlinkKafkaProducer is using UnionListState. So it wouldn't be as un
common as you think?
Let's see if maybe others will come with a different proposals. Also it would
be nice to estimate complexity of supporting finished subtasks with
UnionListState, but I would lean towards an easy stop-gap solution (aborting
checkpoints if UnionListState is detected with finished subtasks) to unblock
this FLIP, and maybe improve it incrementally.
{quote}
Got that, I also agree with that aborting checkpoint may mainly serve as a
temporary solution and we could seek for more sophisticated solutions. For
keeping the remaining state after the subtask finished, previously we once also
have a though in this direction, and at that time, we ever thought one solution
that if a snapshot of one task is taken after we have called operator.finish(),
we also referred to the snapshot in the following checkpoint, and we would need
use reference count to deal with the release of the state objects. The
complexity mainly comes from the reference count management at the
CheckpointCoordinator side. But we would indeed need more detailed estimation
and comparison to other options for keeping the remaining state.
And for the aborting checkpoint method, there might still one issue if
_notifyCheckpointComplete_ message might get lost: by aborting failed the
checkpoints with part of tasks report _isFinished_, we are in fact force the
fast subtask to wait for the slow tasks, and the fast subtask would blocked on
waiting for the final checkpoint. At last we would have a checkpoint that
within it, all the subtasks report _isFinished = true_, then we could rely on
this checkpoint to unblock all the subtasks. If all the
_notifyCheckpointComplete_ succeeds, then all the subtasks would finish
normally. But it some of the _notifyCheckpointComplete_ lost, then the
remaining tasks would still be blocked, and since all the following checkpoints
would fail due to part of tasks are finished (a state would still be reported
in snapshot even if it has no elements), the tasks would block forever.
Thus would it be possible for us to consider making the
_notifyCheckpointComplete_ reliable with mechanism like retrying ? Logically we
could also avoid repeat notification by filtering them in the task side, thus
it should not cause additional issues. And currently it could help us with
other scenarios: currently for savepoint it would also blocked if
_notifyCheckpointComplete_ get lost, and now it relies on the timeout in the
client side to restart the job. By making it reliable we may avoid some
failovers.
If we have concerns in making the _notifyCheckpointComplete_ reliable, perhaps
as a part of the temporary fix, would it be ok for us to first making the
_notifyCheckpointComplete_ reliable at least for checkpoints with operators
have all _isFinished = true_ states ? We could identify if a checkpoint
satisfies the condition on finalizing.
With this fix for operators using UnionListState like FlinkKafkaProducer,
hopefully it should not enlong the whole job execution time, and users could
only see several aborted checkpoints. It might have problem if some subtasks
run much longer than others, but it should not be too frequent.
> Identify JobVertex containing legacy source operators and abort checkpoint
> with legacy source operators partially finished
> --------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-21080
> URL: https://issues.apache.org/jira/browse/FLINK-21080
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataStream, Runtime / Checkpointing
> Reporter: Yun Gao
> Assignee: Yun Gao
> Priority: Major
> Labels: auto-unassigned
>
> Most legacy source operators would record the offset for each partitions, and
> after recovery it would read from the recorded offset. If before a checkpoint
> some subtasks are finished, the corresponding partition offsets would be
> deserted in the checkpoint. Then if the job recover with this checkpoint, the
> legacy source would re-discovery all the partitions and for those finished
> tasks, the legacy source would re-read them since their offsets are not
> recorded.
> Therefore, we would like to fail the checkpoint if some legacy source
> operators have part of subtasks finished.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)