[ 
https://issues.apache.org/jira/browse/FLINK-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129274#comment-17129274
 ] 

Biao Liu commented on FLINK-14971:
----------------------------------

I spent a bit time to recall the summary of Stephan.

The first step "(1) When the checkpoint is ready (all tasks acked, metadata 
written out), Checkpoint Coordinator transfers ownership to the 
CompletedCheckpointStore" is a good idea for me. When checkpoint is ready, it 
can't be cancelled. Otherwise we need to think about how to revert 
{{CompletedCheckpointStore}}. It simplifies the scenario a lot.

Let's focus on the second step. If I understand correctly, the option (b) is a 
bit subtle. If JM recovers from a checkpoint (N) which is not persist to ZK, 
and then the JM process is gone, the new JM would recover from checkpoint N-1. 
I'm not sure there is no side-effect at all of both JM and TM side. But my gut 
feeling is that it might be a dangerous semantic. It might break assumption of 
some users.

The option (a) is the most feasible one for me. There are some facts behind 
this solution, please correct me if I'm wrong.
 1. The asynchronous committing of {{CompletedCheckpointStore}} must be done 
first, then {{CheckpointCoordinator}} notifies tasks that the checkpoint is 
completed. Otherwise the rule "NOTE: It is not fine to ignore it and start from 
an earlier checkpoint if it will get committed later. That is the bug to 
prevent" might be broken. The corner case is like, when 
{{CheckpointCoordinator}} notifies tasks that checkpoint N is completed first, 
then commit to ZK asynchronously(not successful yet), the JM process is gone. A 
new JM process starts, it would recover from checkpoint N-1, because N-1 is the 
last successful checkpoint recorded in ZK.
 2. If job fails before asynchronous committing completes, 
{{CheckpointCoordinator}} needs to decide how to handle this committing. When 
committing completes, JM might be stuck in restoring or other steps (like 
cancelling tasks). I see two options. Option A is failing this checkpoint, 
revert {{CheckpointCoordinator}} and do not do not subsume older checkpoints 
(which is described in FLINK-16770). Option B is treating this checkpoint as a 
successful one but do not notify tasks, because tasks are cancelling or waiting 
to be restarted, it's meaningless. I think option B is simpler and better and 
also acceptable because the notification of checkpoint completing is not 
guaranteed anyway.

> Make all the non-IO operations in CheckpointCoordinator single-threaded
> -----------------------------------------------------------------------
>
>                 Key: FLINK-14971
>                 URL: https://issues.apache.org/jira/browse/FLINK-14971
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>            Reporter: Biao Liu
>            Assignee: Biao Liu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently the ACK and declined message handling are executed in IO thread. 
> This is the only rest part that non-IO operations are executed in IO thread. 
> It blocks introducing main thread executor for {{CheckpointCoordinator}}. It 
> would be resolved in this task.
> After resolving the ACK and declined message issue, the main thread executor 
> would be introduced into {{CheckpointCoordinator}} to instead of timer 
> thread. However the timer thread would be kept (maybe for a while 
> temporarily) to schedule periodic triggering, since FLINK-13848 is not 
> accepted yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to