[
https://issues.apache.org/jira/browse/FLINK-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17125552#comment-17125552
]
Piotr Nowojski commented on FLINK-14971:
----------------------------------------
Carrying the discussion over from FLINK-16770
{quote}
Two steps to a cleaner solution
(1) When the checkpoint is ready (all tasks acked, metadata written out),
Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.
That means the Checkpoint is removed from the "Pending Checkpoints" map and
added to the CompletedCheckpointStore in one call in the main thread. If this
is in one call, it is atomic against other modifications (cancellation,
disposing checkpoints). Because the checkpoint is removed from the "Pending
Checkpoints" map (not owned by the coordinator any more) it will not get
cancelled during shutdown of the coordinator.
==> This is a very simple change
(2) The addition to the CompletedCheckpointStore must be constant time and
executed in the main thread
That means that the CompletedCheckpointStore would put the Completed Checkpoint
into a local list and then kick off the asynchronous request to add it to ZK.
If the JM looks up the latest checkpoint, it refers to that local list. That
way all local components refer to the same status and do not exchange status
asynchronously via an external system (ZK).
==> The change is that the CompletedCheckpointStore would not always repopulate
itself from ZK upon "restore checkpoint", but keep the local state and only
repopulate itself when the master gains leader status (and clears itself when
leader status is lost).
==> This is a slightly more complex change, but not too big.
{quote}
First step, executing {{CompletedCheckpointStore#addCheckpoint}} in the main
thread is probably not a good idea because. It could block the JM's main thread
for quite a long periods of time, as I think this call is currently cleaning up
older checkpoints. Checkpoints cleanup is already causing us some problems
while it's happening in the io threads: FLINK-17073 FLINK-17860 .
We would have to figure out if transferring ownership can happen in the main
thread, while cleaning up asynchronously. I think this is what you mean as your
2nd step [~sewen]?
> Make all the non-IO operations in CheckpointCoordinator single-threaded
> -----------------------------------------------------------------------
>
> Key: FLINK-14971
> URL: https://issues.apache.org/jira/browse/FLINK-14971
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Checkpointing
> Reporter: Biao Liu
> Assignee: Biao Liu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.11.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Currently the ACK and declined message handling are executed in IO thread.
> This is the only rest part that non-IO operations are executed in IO thread.
> It blocks introducing main thread executor for {{CheckpointCoordinator}}. It
> would be resolved in this task.
> After resolving the ACK and declined message issue, the main thread executor
> would be introduced into {{CheckpointCoordinator}} to instead of timer
> thread. However the timer thread would be kept (maybe for a while
> temporarily) to schedule periodic triggering, since FLINK-13848 is not
> accepted yet.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)