[
https://issues.apache.org/jira/browse/FLINK-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162168#comment-17162168
]
Roman Khachatryan commented on FLINK-17073:
-------------------------------------------
As for the long-term solution, I'd propose the following:
# Extract *ZooKeeperCompletedCheckpointStore.tryRemoveCompletedCheckpoint*
(along with *executor*) to a new class, e.g. *CheckpointCleaner* that maintains
a queue of checkpoints to remove
# On removal completion, it calls
*CheckpointCoordinator*.timer.execute(*executeQueuedRequest*)
# In *CheckpointRequestDecider.chooseRequestToExecute*, check
*CheckpointCleaner.numberOfCheckpointsToRemove* and return if it's greater than
the threshold (see below)
# *CheckpointCleaner* reports **this count in a thread-safe manner
(performance isn't an issue here)
This way, we ensure these properties:
# if checkpoint can't be started, it's queued (if the queue has space) but not
started
# once a subsumed checkpoint is removed, we check the queue, and if possible,
start the checkpoint
It's possible that we check the queue twice (1st in chooseRequestToExecute, 2nd
in CheckpointCleaner), but that's OK, we'll just execute the next request if
possible.
Regarding adding an additional configuration parameter for the threshold, I
don't see much value in it. Conceptually, we don't want to proceed as long as
there are checkpoints to remove from the previous completion.
So we can use max-concurrent-checkpoints as a threshold (maybe multiplied by
some constant factor to account for spikes and savepoints).
What do you think [~echauchot], [~SleePy]?
> Slow checkpoint cleanup causing OOMs
> ------------------------------------
>
> Key: FLINK-17073
> URL: https://issues.apache.org/jira/browse/FLINK-17073
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.7.3, 1.8.0, 1.9.0, 1.10.0, 1.11.0
> Reporter: Till Rohrmann
> Assignee: Etienne Chauchot
> Priority: Major
> Fix For: 1.12.0
>
>
> A user reported that he sees a decline in checkpoint cleanup speed when
> upgrading from Flink 1.7.2 to 1.10.0. The result is that a lot of cleanup
> tasks are waiting in the execution queue occupying memory. Ultimately, the JM
> process dies with an OOM.
> Compared to Flink 1.7.2, we introduced a dedicated {{ioExecutor}} which is
> used by the {{HighAvailabilityServices}} (FLINK-11851). Before, we use the
> {{AkkaRpcService}} thread pool which was a {{ForkJoinPool}} with a max
> parallelism of 64. Now it is a {{FixedThreadPool}} with as many threads as
> CPU cores. This change might have caused the decline in completed checkpoint
> discard throughput. This suspicion needs to be validated before trying to fix
> it!
> [1]
> https://lists.apache.org/thread.html/r390e5d775878918edca0b6c9f18de96f828c266a888e34ed30ce8494%40%3Cuser.flink.apache.org%3E
--
This message was sent by Atlassian Jira
(v8.3.4#803005)