This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6b70fb0f6781a19938669177e69a5d31165f591
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Feb 17 18:40:36 2021 +0100

    [FLINK-21351][checkpointing] Don't subsume last checkpoint
    
    When a savepoint is added to CompletedCheckpointStore
    all previous checkpoints will be removed if number to
    retain is 1.
    
    This makes future incremental checkpoints invalid since
    they can refer to the discarded state.
---
 .../checkpoint/CheckpointSubsumeHelper.java        | 60 +++++++++++++---
 .../DefaultCompletedCheckpointStoreTest.java       | 83 +++++++++++++++++++++-
 2 files changed, 132 insertions(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
index 7d3e7aa..da8e575 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
@@ -17,11 +17,12 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.util.function.ThrowingConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Deque;
+import java.util.Iterator;
+import java.util.Optional;
 
 /**
  * Encapsulates the logic to subsume older checkpoints by {@link 
CompletedCheckpointStore checkpoint
@@ -46,21 +47,60 @@ class CheckpointSubsumeHelper {
     private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointSubsumeHelper.class);
 
     public static void subsume(
-            Deque<CompletedCheckpoint> checkpoints,
-            int numRetain,
-            ThrowingConsumer<CompletedCheckpoint, Exception> subsumeAction)
+            Deque<CompletedCheckpoint> checkpoints, int numRetain, 
SubsumeAction subsumeAction)
             throws Exception {
         if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
             return;
         }
+        CompletedCheckpoint latest = checkpoints.peekLast();
+        Optional<CompletedCheckpoint> latestNotSavepoint = 
getLatestNotSavepoint(checkpoints);
+        Iterator<CompletedCheckpoint> iterator = checkpoints.iterator();
+        while (checkpoints.size() > numRetain && iterator.hasNext()) {
+            CompletedCheckpoint next = iterator.next();
+            if (canSubsume(next, latest, latestNotSavepoint)) {
+                iterator.remove();
+                try {
+                    subsumeAction.subsume(next);
+                } catch (Exception e) {
+                    LOG.warn("Fail to subsume the old checkpoint.", e);
+                }
+            }
+            // Don't break out from the loop to subsume intermediate savepoints
+        }
+    }
 
-        while (checkpoints.size() > numRetain) {
-            CompletedCheckpoint completedCheckpoint = 
checkpoints.removeFirst();
-            try {
-                subsumeAction.accept(completedCheckpoint);
-            } catch (Exception e) {
-                LOG.warn("Fail to subsume the old checkpoint.", e);
+    private static Optional<CompletedCheckpoint> getLatestNotSavepoint(
+            Deque<CompletedCheckpoint> completed) {
+        Iterator<CompletedCheckpoint> descendingIterator = 
completed.descendingIterator();
+        while (descendingIterator.hasNext()) {
+            CompletedCheckpoint next = descendingIterator.next();
+            if (!next.getProperties().isSavepoint()) {
+                return Optional.of(next);
             }
         }
+        return Optional.empty();
+    }
+
+    private static boolean canSubsume(
+            CompletedCheckpoint next,
+            CompletedCheckpoint latest,
+            Optional<CompletedCheckpoint> latestNonSavepoint) {
+        if (next == latest) {
+            return false;
+        } else if (next.getProperties().isSavepoint()) {
+            return true;
+        } else if (latest.getProperties().isSynchronous()) {
+            // If the job has stopped with a savepoint then it's safe to 
subsume because no future
+            // snapshots will be taken during this run
+            return true;
+        } else {
+            // Don't remove the latest non-savepoint lest invalidate future 
incremental snapshots
+            return latestNonSavepoint.filter(checkpoint -> checkpoint != 
next).isPresent();
+        }
+    }
+
+    @FunctionalInterface
+    interface SubsumeAction {
+        void subsume(CompletedCheckpoint checkpoint) throws Exception;
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index 58207af..5de9559 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.testutils.FlinkMatchers;
@@ -25,6 +26,7 @@ import 
org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper
 import org.apache.flink.runtime.persistence.TestingStateHandleStore;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -35,6 +37,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -44,8 +47,11 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import static java.util.Arrays.asList;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -72,6 +78,56 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
         executorService.shutdownNow();
     }
 
+    @Test
+    public void testAtLeastOneCheckpointRetained() throws Exception {
+        CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+        CompletedCheckpoint cp2 = getCheckpoint(false, 2L);
+        CompletedCheckpoint sp1 = getCheckpoint(true, 3L);
+        CompletedCheckpoint sp2 = getCheckpoint(true, 4L);
+        CompletedCheckpoint sp3 = getCheckpoint(true, 5L);
+        testCheckpointRetention(1, asList(cp1, cp2, sp1, sp2, sp3), 
asList(cp2, sp3));
+    }
+
+    @Test
+    public void testOlderSavepointSubsumed() throws Exception {
+        CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+        CompletedCheckpoint sp1 = getCheckpoint(true, 2L);
+        CompletedCheckpoint cp2 = getCheckpoint(false, 3L);
+        testCheckpointRetention(1, asList(cp1, sp1, cp2), asList(cp2));
+    }
+
+    @Test
+    public void testSubsumeAfterStoppingWithSavepoint() throws Exception {
+        CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+        CompletedCheckpoint sp1 = getCheckpoint(true, 2L);
+        CompletedCheckpoint stop = 
getCheckpoint(CheckpointProperties.forSyncSavepoint(false), 3L);
+        testCheckpointRetention(1, asList(cp1, sp1, stop), asList(stop));
+    }
+
+    @Test
+    public void testNotSubsumedIfNotNeeded() throws Exception {
+        CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+        CompletedCheckpoint cp2 = getCheckpoint(false, 2L);
+        CompletedCheckpoint cp3 = getCheckpoint(false, 3L);
+        testCheckpointRetention(3, asList(cp1, cp2, cp3), asList(cp1, cp2, 
cp3));
+    }
+
+    private void testCheckpointRetention(
+            int numRetain,
+            List<CompletedCheckpoint> completed,
+            List<CompletedCheckpoint> expectedRetained)
+            throws Exception {
+        final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore =
+                builder.setGetAllSupplier(() -> createStateHandles(3)).build();
+        final CompletedCheckpointStore completedCheckpointStore =
+                createCompletedCheckpointStore(stateHandleStore, numRetain);
+
+        for (CompletedCheckpoint c : completed) {
+            completedCheckpointStore.addCheckpoint(c, new 
CheckpointsCleaner(), () -> {});
+        }
+        assertEquals(expectedRetained, 
completedCheckpointStore.getAllCheckpoints());
+    }
+
     /**
      * We have three completed checkpoints(1, 2, 3) in the state handle store. 
We expect that {@link
      * DefaultCompletedCheckpointStore#recover()} should recover the sorted 
checkpoints by name.
@@ -298,8 +354,13 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
 
     private CompletedCheckpointStore createCompletedCheckpointStore(
             TestingStateHandleStore<CompletedCheckpoint> stateHandleStore) {
+        return createCompletedCheckpointStore(stateHandleStore, 1);
+    }
+
+    private CompletedCheckpointStore createCompletedCheckpointStore(
+            TestingStateHandleStore<CompletedCheckpoint> stateHandleStore, int 
toRetain) {
         return new DefaultCompletedCheckpointStore<>(
-                1,
+                toRetain,
                 stateHandleStore,
                 new CheckpointStoreUtil() {
                     @Override
@@ -314,4 +375,24 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
                 },
                 executorService);
     }
+
+    private CompletedCheckpoint getCheckpoint(boolean isSavepoint, long id) {
+        return getCheckpoint(
+                isSavepoint
+                        ? CheckpointProperties.forSavepoint(false)
+                        : 
CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
+                id);
+    }
+
+    private CompletedCheckpoint getCheckpoint(CheckpointProperties props, long 
id) {
+        return new CompletedCheckpoint(
+                new JobID(),
+                id,
+                0L,
+                0L,
+                Collections.emptyMap(),
+                Collections.emptyList(),
+                props,
+                new TestCompletedCheckpointStorageLocation());
+    }
 }

Reply via email to