This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1dcc44eecba552e62646298df510264f6f4f556c Author: Roman Khachatryan <[email protected]> AuthorDate: Wed Feb 17 18:40:30 2021 +0100 [hotfix][checkpointing] Extract CheckpointSubsumeHelper This is a pre-requisite refactoring for a subsequent bug fix. --- .../checkpoint/CheckpointSubsumeHelper.java | 66 ++++++++++++++++++++++ .../DefaultCompletedCheckpointStore.java | 45 +++++++-------- .../EmbeddedCompletedCheckpointStore.java | 5 +- .../StandaloneCompletedCheckpointStore.java | 10 +--- 4 files changed, 93 insertions(+), 33 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java new file mode 100644 index 0000000..7d3e7aa --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.function.ThrowingConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Deque; + +/** + * Encapsulates the logic to subsume older checkpoints by {@link CompletedCheckpointStore checkpoint + * stores}. In general, checkpoints should be subsumed whenever state.checkpoints.num-retained is + * exceeded. + * + * <p>Additional considerations: + * + * <ul> + * <li>Savepoints must be stored in the same queue to prevent duplicates (@see <a + * href="https://issues.apache.org/jira/browse/FLINK-10354">FLINK-10354</a>). + * <li>To prevent unlimited queue growth, savepoints are also counted in num-retained together + * with checkpoints + * <li>Savepoints actual state should NOT be discarded when they are subsumed. + * <li>At least one (most recent) checkpoint (not savepoint) should be kept. Otherwise, subsequent + * incremental checkpoints may refer to a discarded state (@see <a + * href="https://issues.apache.org/jira/browse/FLINK-21351">FLINK-21351</a>). + * <li>Except when the job is stopped with savepoint when no future checkpoints will be made. + * </ul> + */ +class CheckpointSubsumeHelper { + private static final Logger LOG = LoggerFactory.getLogger(CheckpointSubsumeHelper.class); + + public static void subsume( + Deque<CompletedCheckpoint> checkpoints, + int numRetain, + ThrowingConsumer<CompletedCheckpoint, Exception> subsumeAction) + throws Exception { + if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) { + return; + } + + while (checkpoints.size() > numRetain) { + CompletedCheckpoint completedCheckpoint = checkpoints.removeFirst(); + try { + subsumeAction.accept(completedCheckpoint); + } catch (Exception e) { + LOG.warn("Fail to subsume the old checkpoint.", e); + } + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java index 091f40d..f99ceea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java @@ -230,15 +230,15 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>> completedCheckpoints.addLast(checkpoint); - // Everything worked, let's remove a previous checkpoint if necessary. - while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { - final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst(); - tryRemoveCompletedCheckpoint( - completedCheckpoint, - completedCheckpoint.shouldBeDiscardedOnSubsume(), - checkpointsCleaner, - postCleanup); - } + CheckpointSubsumeHelper.subsume( + completedCheckpoints, + maxNumberOfCheckpointsToRetain, + completedCheckpoint -> + tryRemoveCompletedCheckpoint( + completedCheckpoint, + completedCheckpoint.shouldBeDiscardedOnSubsume(), + checkpointsCleaner, + postCleanup)); LOG.debug("Added {} to {}.", checkpoint, path); } @@ -266,11 +266,15 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>> LOG.info("Shutting down"); for (CompletedCheckpoint checkpoint : completedCheckpoints) { - tryRemoveCompletedCheckpoint( - checkpoint, - checkpoint.shouldBeDiscardedOnShutdown(jobStatus), - checkpointsCleaner, - postCleanup); + try { + tryRemoveCompletedCheckpoint( + checkpoint, + checkpoint.shouldBeDiscardedOnShutdown(jobStatus), + checkpointsCleaner, + postCleanup); + } catch (Exception e) { + LOG.warn("Fail to remove checkpoint during shutdown.", e); + } } completedCheckpoints.clear(); @@ -293,14 +297,11 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>> CompletedCheckpoint completedCheckpoint, boolean shouldDiscard, CheckpointsCleaner checkpointsCleaner, - Runnable postCleanup) { - try { - if (tryRemove(completedCheckpoint.getCheckpointID())) { - checkpointsCleaner.cleanCheckpoint( - completedCheckpoint, shouldDiscard, postCleanup, ioExecutor); - } - } catch (Exception e) { - LOG.warn("Failed to subsume the old checkpoint", e); + Runnable postCleanup) + throws Exception { + if (tryRemove(completedCheckpoint.getCheckpointID())) { + checkpointsCleaner.cleanCheckpoint( + completedCheckpoint, shouldDiscard, postCleanup, ioExecutor); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java index 1c75b6f..99de80b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java @@ -62,9 +62,8 @@ public class EmbeddedCompletedCheckpointStore implements CompletedCheckpointStor throws Exception { checkpoints.addLast(checkpoint); - if (checkpoints.size() > maxRetainedCheckpoints) { - removeOldestCheckpoint(); - } + CheckpointSubsumeHelper.subsume( + checkpoints, maxRetainedCheckpoints, CompletedCheckpoint::discardOnSubsume); } @VisibleForTesting diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index bc163cd..6147259 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -70,14 +70,8 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt checkpoints.addLast(checkpoint); - if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { - try { - CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); - checkpointToSubsume.discardOnSubsume(); - } catch (Exception e) { - LOG.warn("Fail to subsume the old checkpoint.", e); - } - } + CheckpointSubsumeHelper.subsume( + checkpoints, maxNumberOfCheckpointsToRetain, CompletedCheckpoint::discardOnSubsume); } @Override
