[
https://issues.apache.org/jira/browse/FLINK-26606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707789#comment-17707789
]
Matthias Pohl commented on FLINK-26606:
---------------------------------------
Moving the discussion from [PR
#22121|https://github.com/apache/flink/pull/22121] into this parent issue:
{quote}
@XComp I'm interested in the background of your ticket. Based on the
description, I think the key point of this ticket is that "CompletedCheckpoints
are being discarded in CheckpointsCleaner". Could you provide the specific
codepath for this? Additionally, I would like to learn more about "the contract
of StateObject#discardState" . If these are clear, I would be happy to drive
the entire issue.
{quote}
Repeatable cleanup with introduced in 1.15 with
[FLIP-194|https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore].
The repeatable cleanup is executed when shutting down the Dispatcher. Usually,
this also cleans up the {{CompletedCheckpoints}} when shutting down the
{{CompletedCheckpointStore}}. But checkpoints are not only cleaned up when all
jobs are finished. We also clean up (unused) checkpoints while the job is
running to reduce memory usage. The number of checkpoints that are kept is
defined by
[state.checkpoints.num-retained|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained].
[FLIP-270|https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints]
is dealing with it.
The logic for this can be found in
[CheckpointCoordinator.addCompletedCheckpointToStoreAndSubsumeOldest:1454|https://github.com/apache/flink/blob/eb17ec3f05d4bd512bc70ee79296d0b884894eaf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1454].
There you see how a more recent checkpoint is added to the
{{CompletedCheckpointStore}} and the oldest one is removed and cleaned up.
Going through the code you will see that it utilizes the
{{CheckpointsCleaner}}. But the {{CheckpointsCleaner}} will only try to discard
the checkpoint. If there's an error for any reason, no reference is kept. Only
a log warning is printed. Semantically, it means that the ownership of the
checkpoint is transitioned to the user through this log message. Flink is not
taking care of the cleanup anymore. The user has to deal with it if he/she
needs to. The idea of
[FLIP-270|https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints]
is to add repeatable cleanup in the {{CheckpointsCleaner}} as well.
Repeatable cleanup (as it is implemented right now) repeats the logic as long
as there is an exception caught. If no exception appears during cleanup, the
code will assume success and complete. And here's is where this issue comes
into play. We have to make sure for the checkpoints that the cleanup fails iff
(i.e. if and only if) the cleanup didn't succeed and the resource is still
available. That's why we need to make sure that the {{CompletedCheckpoint}} and
its state is cleaned up idempotently: We don't want an error to appear if the
artifacts do not exist anymore. Otherwise, repeatable cleanup would try forever.
I hope this context helps.
> CompletedCheckpoints that failed to be discarded are not stored in the
> CompletedCheckpointStore
> -----------------------------------------------------------------------------------------------
>
> Key: FLINK-26606
> URL: https://issues.apache.org/jira/browse/FLINK-26606
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / Coordination
> Affects Versions: 1.15.0
> Reporter: Matthias Pohl
> Priority: Major
>
> We introduced a repeatable per-job cleanup after the job reached a
> globally-terminated state. It also tries to clean up the
> {{CompletedCheckpointStore}}. But we missed one code path where
> {{CompletedCheckpoints}} are tried to be discarded in the
> {{CheckpointsCleaner}}. The {{CompletedCheckpointStore}} does not hold any
> references to these {{CompletedCheckpoints}} anymore. The shutdown at the end
> is not able to clean these checkpoints up.
> We should not remove the {{CompletedCheckpoints}} from the
> {{CompletedCheckpointStore}} if the deletion failed. This would enable us to
> retry deleting these artifacts at the end of the job and consider them in the
> retryable cleanup as well.
> The documentation was updated to cover this issue. Fixing this issue should
> also include removing the corresponding paragraph from the documentation (see
> [related flink-docs PR|https://github.com/apache/flink/pull/19058]).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)