This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1dcc44eecba552e62646298df510264f6f4f556c
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Feb 17 18:40:30 2021 +0100

    [hotfix][checkpointing] Extract CheckpointSubsumeHelper
    
    This is a pre-requisite refactoring for a subsequent bug fix.
---
 .../checkpoint/CheckpointSubsumeHelper.java        | 66 ++++++++++++++++++++++
 .../DefaultCompletedCheckpointStore.java           | 45 +++++++--------
 .../EmbeddedCompletedCheckpointStore.java          |  5 +-
 .../StandaloneCompletedCheckpointStore.java        | 10 +---
 4 files changed, 93 insertions(+), 33 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
new file mode 100644
index 0000000..7d3e7aa
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+
+/**
+ * Encapsulates the logic to subsume older checkpoints by {@link 
CompletedCheckpointStore checkpoint
+ * stores}. In general, checkpoints should be subsumed whenever 
state.checkpoints.num-retained is
+ * exceeded.
+ *
+ * <p>Additional considerations:
+ *
+ * <ul>
+ *   <li>Savepoints must be stored in the same queue to prevent duplicates 
(@see <a
+ *       
href="https://issues.apache.org/jira/browse/FLINK-10354";>FLINK-10354</a>).
+ *   <li>To prevent unlimited queue growth, savepoints are also counted in 
num-retained together
+ *       with checkpoints
+ *   <li>Savepoints actual state should NOT be discarded when they are 
subsumed.
+ *   <li>At least one (most recent) checkpoint (not savepoint) should be kept. 
Otherwise, subsequent
+ *       incremental checkpoints may refer to a discarded state (@see <a
+ *       
href="https://issues.apache.org/jira/browse/FLINK-21351";>FLINK-21351</a>).
+ *   <li>Except when the job is stopped with savepoint when no future 
checkpoints will be made.
+ * </ul>
+ */
+class CheckpointSubsumeHelper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointSubsumeHelper.class);
+
+    public static void subsume(
+            Deque<CompletedCheckpoint> checkpoints,
+            int numRetain,
+            ThrowingConsumer<CompletedCheckpoint, Exception> subsumeAction)
+            throws Exception {
+        if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
+            return;
+        }
+
+        while (checkpoints.size() > numRetain) {
+            CompletedCheckpoint completedCheckpoint = 
checkpoints.removeFirst();
+            try {
+                subsumeAction.accept(completedCheckpoint);
+            } catch (Exception e) {
+                LOG.warn("Fail to subsume the old checkpoint.", e);
+            }
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
index 091f40d..f99ceea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
@@ -230,15 +230,15 @@ public class DefaultCompletedCheckpointStore<R extends 
ResourceVersion<R>>
 
         completedCheckpoints.addLast(checkpoint);
 
-        // Everything worked, let's remove a previous checkpoint if necessary.
-        while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
-            final CompletedCheckpoint completedCheckpoint = 
completedCheckpoints.removeFirst();
-            tryRemoveCompletedCheckpoint(
-                    completedCheckpoint,
-                    completedCheckpoint.shouldBeDiscardedOnSubsume(),
-                    checkpointsCleaner,
-                    postCleanup);
-        }
+        CheckpointSubsumeHelper.subsume(
+                completedCheckpoints,
+                maxNumberOfCheckpointsToRetain,
+                completedCheckpoint ->
+                        tryRemoveCompletedCheckpoint(
+                                completedCheckpoint,
+                                
completedCheckpoint.shouldBeDiscardedOnSubsume(),
+                                checkpointsCleaner,
+                                postCleanup));
 
         LOG.debug("Added {} to {}.", checkpoint, path);
     }
@@ -266,11 +266,15 @@ public class DefaultCompletedCheckpointStore<R extends 
ResourceVersion<R>>
             LOG.info("Shutting down");
 
             for (CompletedCheckpoint checkpoint : completedCheckpoints) {
-                tryRemoveCompletedCheckpoint(
-                        checkpoint,
-                        checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
-                        checkpointsCleaner,
-                        postCleanup);
+                try {
+                    tryRemoveCompletedCheckpoint(
+                            checkpoint,
+                            checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
+                            checkpointsCleaner,
+                            postCleanup);
+                } catch (Exception e) {
+                    LOG.warn("Fail to remove checkpoint during shutdown.", e);
+                }
             }
 
             completedCheckpoints.clear();
@@ -293,14 +297,11 @@ public class DefaultCompletedCheckpointStore<R extends 
ResourceVersion<R>>
             CompletedCheckpoint completedCheckpoint,
             boolean shouldDiscard,
             CheckpointsCleaner checkpointsCleaner,
-            Runnable postCleanup) {
-        try {
-            if (tryRemove(completedCheckpoint.getCheckpointID())) {
-                checkpointsCleaner.cleanCheckpoint(
-                        completedCheckpoint, shouldDiscard, postCleanup, 
ioExecutor);
-            }
-        } catch (Exception e) {
-            LOG.warn("Failed to subsume the old checkpoint", e);
+            Runnable postCleanup)
+            throws Exception {
+        if (tryRemove(completedCheckpoint.getCheckpointID())) {
+            checkpointsCleaner.cleanCheckpoint(
+                    completedCheckpoint, shouldDiscard, postCleanup, 
ioExecutor);
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index 1c75b6f..99de80b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -62,9 +62,8 @@ public class EmbeddedCompletedCheckpointStore implements 
CompletedCheckpointStor
             throws Exception {
         checkpoints.addLast(checkpoint);
 
-        if (checkpoints.size() > maxRetainedCheckpoints) {
-            removeOldestCheckpoint();
-        }
+        CheckpointSubsumeHelper.subsume(
+                checkpoints, maxRetainedCheckpoints, 
CompletedCheckpoint::discardOnSubsume);
     }
 
     @VisibleForTesting
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index bc163cd..6147259 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -70,14 +70,8 @@ public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointSt
 
         checkpoints.addLast(checkpoint);
 
-        if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-            try {
-                CompletedCheckpoint checkpointToSubsume = 
checkpoints.removeFirst();
-                checkpointToSubsume.discardOnSubsume();
-            } catch (Exception e) {
-                LOG.warn("Fail to subsume the old checkpoint.", e);
-            }
-        }
+        CheckpointSubsumeHelper.subsume(
+                checkpoints, maxNumberOfCheckpointsToRetain, 
CompletedCheckpoint::discardOnSubsume);
     }
 
     @Override

Reply via email to