[ 
https://issues.apache.org/jira/browse/FLINK-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17125552#comment-17125552
 ] 

Piotr Nowojski commented on FLINK-14971:
----------------------------------------

Carrying the discussion over from FLINK-16770
{quote}
Two steps to a cleaner solution
(1) When the checkpoint is ready (all tasks acked, metadata written out), 
Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.

That means the Checkpoint is removed from the "Pending Checkpoints" map and 
added to the CompletedCheckpointStore in one call in the main thread. If this 
is in one call, it is atomic against other modifications (cancellation, 
disposing checkpoints). Because the checkpoint is removed from the "Pending 
Checkpoints" map (not owned by the coordinator any more) it will not get 
cancelled during shutdown of the coordinator.
    ==> This is a very simple change

 

(2) The addition to the CompletedCheckpointStore must be constant time and 
executed in the main thread

That means that the CompletedCheckpointStore would put the Completed Checkpoint 
into a local list and then kick off the asynchronous request to add it to ZK.
If the JM looks up the latest checkpoint, it refers to that local list. That 
way all local components refer to the same status and do not exchange status 
asynchronously via an external system (ZK).
==> The change is that the CompletedCheckpointStore would not always repopulate 
itself from ZK upon "restore checkpoint", but keep the local state and only 
repopulate itself when the master gains leader status (and clears itself when 
leader status is lost).

==> This is a slightly more complex change, but not too big.
{quote}
First step, executing {{CompletedCheckpointStore#addCheckpoint}} in the main 
thread is probably not a good idea because. It could block the JM's main thread 
for quite a long periods of time, as I think this call is currently cleaning up 
older checkpoints. Checkpoints cleanup is already causing us some problems 
while it's happening in the io threads: FLINK-17073 FLINK-17860 . 

We would have to figure out if transferring ownership can happen in the main 
thread, while cleaning up asynchronously. I think this is what you mean as your 
2nd step [~sewen]?

> Make all the non-IO operations in CheckpointCoordinator single-threaded
> -----------------------------------------------------------------------
>
>                 Key: FLINK-14971
>                 URL: https://issues.apache.org/jira/browse/FLINK-14971
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>            Reporter: Biao Liu
>            Assignee: Biao Liu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently the ACK and declined message handling are executed in IO thread. 
> This is the only rest part that non-IO operations are executed in IO thread. 
> It blocks introducing main thread executor for {{CheckpointCoordinator}}. It 
> would be resolved in this task.
> After resolving the ACK and declined message issue, the main thread executor 
> would be introduced into {{CheckpointCoordinator}} to instead of timer 
> thread. However the timer thread would be kept (maybe for a while 
> temporarily) to schedule periodic triggering, since FLINK-13848 is not 
> accepted yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to