[ 
https://issues.apache.org/jira/browse/FLINK-21080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385258#comment-17385258
 ] 

Yun Gao commented on FLINK-21080:
---------------------------------

Hi Piotr, sorry for the rush reply, and there seems still one issue, I'll 
detail it in the last:
{quote}A note. BroadcastState I think is something else compared to the 
UnionListState, is it? Also with UnionListState it doesn't necessary must only 
single subtaskId == 0 subtask writing/updating the state. There might be cases 
were there are many writers.
{quote}
Yes indeed, the BroadcastState should also be a problem since currently when 
repartitioning the state the new parallelism _i_ would fetch the state of _i_ % 
previousParallelsim, if some of task has been finished, its state would be 
empty and thus cause problem. We might also do some workaround for 
BroadcastState similar to the UnionListState, but perhaps we could also do some 
change to the repartition method since logically the broadcast state of all the 
subtask is the same, and the restarted task could restored from any subtask. 
And for the UnionListState there would indeed other cases, and we would 
consider them as whole in the following~
{quote}FlinkKafkaProducer is using UnionListState. So it wouldn't be as un 
common as you think?

Let's see if maybe others will come with a different proposals. Also it would 
be nice to estimate complexity of supporting finished subtasks with 
UnionListState, but I would lean towards an easy stop-gap solution (aborting 
checkpoints if UnionListState is detected with finished subtasks) to unblock 
this FLIP, and maybe improve it incrementally.
{quote}
Got that, I also agree with that aborting checkpoint may mainly serve as a 
temporary solution and we could seek for more sophisticated solutions. For 
keeping the remaining state after the subtask finished, previously we once also 
have a though in this direction, and at that time, we ever thought one solution 
that if a snapshot of one task is taken after we have called operator.finish(), 
we also referred to the snapshot in the following checkpoint, and we would need 
use reference count to deal with the release of the state objects. The 
complexity mainly comes from the reference count management at the 
CheckpointCoordinator side. But we would indeed need more detailed estimation 
and comparison to other options for keeping the remaining state.

And for the aborting checkpoint method, there might still one issue if 
_notifyCheckpointComplete_ message might get lost: by aborting failed the 
checkpoints with part of tasks report _isFinished_, we are in fact force the 
fast subtask to wait for the slow tasks, and the fast subtask would blocked on 
waiting for the final checkpoint. At last we would have a checkpoint that 
within it, all the subtasks report _isFinished = true_, then we could rely on 
this checkpoint to unblock all the subtasks. If all the 
_notifyCheckpointComplete_ succeeds, then all the subtasks would finish 
normally. But it some of the _notifyCheckpointComplete_ lost, then the 
remaining tasks would still be blocked, and since all the following checkpoints 
would fail due to part of tasks are finished (a state would still be reported 
in snapshot even if it has no elements), the tasks would block forever. 

Thus would it be possible for us to consider making the 
_notifyCheckpointComplete_ reliable with mechanism like retrying ? Logically we 
could also avoid repeat notification by filtering them in the task side, thus 
it should not cause additional issues. And currently it could help us with 
other scenarios: currently for savepoint it would also blocked if 
_notifyCheckpointComplete_ get lost, and now it relies on the timeout in the 
client side to restart the job. By making it reliable we may avoid some 
failovers.

If we have concerns in making the _notifyCheckpointComplete_ reliable, perhaps 
as a part of the temporary fix, would it be ok for us to first making the 
_notifyCheckpointComplete_ reliable at least for checkpoints with operators 
have all _isFinished = true_ states ? We could identify if a checkpoint 
satisfies the condition on finalizing.

With this fix for operators using UnionListState like FlinkKafkaProducer, 
hopefully it should not enlong the whole job execution time, and users could 
only see several aborted checkpoints. It might have problem if some subtasks 
run much longer than others, but it should not be too frequent.

> Identify JobVertex containing legacy source operators and abort checkpoint 
> with legacy source operators partially finished
> --------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21080
>                 URL: https://issues.apache.org/jira/browse/FLINK-21080
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataStream, Runtime / Checkpointing
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Major
>              Labels: auto-unassigned
>
> Most legacy source operators would record the offset for each partitions, and 
> after recovery it would read from the recorded offset. If before a checkpoint 
> some subtasks are finished, the corresponding partition offsets would be 
> deserted in the checkpoint. Then if the job recover with this checkpoint, the 
> legacy source would re-discovery all the partitions and for those finished 
> tasks, the legacy source would re-read them since their offsets are not 
> recorded. 
> Therefore, we would like to fail the checkpoint if some legacy source 
> operators have part of subtasks finished. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to