This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 08794a6b2188268c3de6b134f399228415c7fc8f
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Mon Dec 6 18:21:52 2021 +0100

    [FLINK-25192][checkpointing] Implement no-claim mode support
    
    We introduce NO_CLAIM restore mode in which we do not take ownership of
    the restored checkpoint. We do enforce the first, successful checkpoint
    after a restore to be "full" meaning it can not share any artefacts with
    the initial one.
    
    Taking a "full" snapshot needs support from state backend, therefore we
    add a method supportsNoClaimMode to the Snapshottable interface, so
    that state backend can add support incrementally. If a state backend
    does not support forces snapshots, user can switch to either the LEGACY
    mode or CLAIM mode.
    
    This closes #18086
---
 .../shortcodes/generated/rest_v1_dispatcher.html   |   2 +-
 .../generated/savepoint_config_configuration.html  |   4 +-
 .../apache/flink/client/cli/CliFrontendParser.java |   6 +-
 .../flink/client/cli/CliFrontendRunTest.java       |  20 +++
 .../flink/streaming/tests/StubStateBackend.java    |   5 +
 .../src/test/resources/rest_api_v1.snapshot        |   4 +-
 .../runtime/checkpoint/CheckpointCoordinator.java  |  24 +++-
 .../runtime/checkpoint/CheckpointOptions.java      |  54 ++++---
 .../runtime/checkpoint/CheckpointProperties.java   |  45 +++++-
 .../flink/runtime/checkpoint/CheckpointType.java   |  42 +++++-
 .../runtime/checkpoint/CompletedCheckpoint.java    |   5 +-
 .../network/api/serialization/EventSerializer.java |   6 +
 .../apache/flink/runtime/jobgraph/RestoreMode.java |  12 +-
 .../runtime/jobgraph/SavepointConfigOptions.java   |   2 +-
 .../runtime/jobgraph/SavepointRestoreSettings.java |   2 +-
 .../apache/flink/runtime/state/StateBackend.java   |  16 +++
 .../runtime/state/filesystem/FsStateBackend.java   |   6 +
 .../runtime/state/hashmap/HashMapStateBackend.java |   6 +
 .../runtime/state/memory/MemoryStateBackend.java   |   6 +
 .../CheckpointCoordinatorTriggeringTest.java       | 159 +++++++++++++++++++++
 .../runtime/checkpoint/CheckpointOptionsTest.java  |  23 ++-
 .../runtime/checkpoint/CheckpointTypeTest.java     |   2 +
 .../checkpoint/CompletedCheckpointTest.java        |  13 +-
 .../runtime/checkpoint/PendingCheckpointTest.java  |  10 +-
 .../checkpoint/RestoredCheckpointStatsTest.java    |   2 +-
 .../api/serialization/EventSerializerTest.java     |  10 +-
 .../PipelinedSubpartitionWithReadViewTest.java     |   3 +
 .../consumer/ChannelStatePersisterTest.java        |   7 +-
 .../partition/consumer/LocalInputChannelTest.java  |   9 +-
 .../consumer/RecoveredInputChannelTest.java        |   6 +-
 .../partition/consumer/RemoteInputChannelTest.java |  24 +++-
 .../operators/testutils/DummyEnvironment.java      |   3 +-
 .../state/EmbeddedRocksDBStateBackend.java         |   7 +
 .../streaming/state/RocksDBStateBackend.java       |   7 +
 .../snapshot/RocksIncrementalSnapshotStrategy.java |  23 ++-
 .../runtime/tasks/SourceOperatorStreamTask.java    |   6 +
 .../streaming/runtime/tasks/SourceStreamTask.java  |   6 +
 .../flink/streaming/runtime/tasks/StreamTask.java  |  20 +++
 .../checkpointing/AlternatingCheckpointsTest.java  | 102 +++++++++----
 .../checkpointing/CheckpointedInputGateTest.java   |   5 +-
 .../io/checkpointing/InputProcessorUtilTest.java   |   2 +
 .../io/checkpointing/UnalignedCheckpointsTest.java |   8 +-
 ...tStreamTaskChainedSourcesCheckpointingTest.java |   8 +-
 .../runtime/tasks/MultipleInputStreamTaskTest.java |   8 +-
 .../tasks/StreamTaskFinalCheckpointsTest.java      |   8 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  47 ++++++
 .../tasks/SubtaskCheckpointCoordinatorTest.java    |   6 +-
 .../flink-sql-client/src/test/resources/sql/set.q  |  10 +-
 .../flink/test/checkpointing/SavepointITCase.java  | 110 ++++++++++++++
 49 files changed, 796 insertions(+), 125 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index d4f7354..b64743c 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -871,7 +871,7 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
     },
     "restoreMode" : {
       "type" : "string",
-      "enum" : [ "CLAIM", "LEGACY" ]
+      "enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
     },
     "savepointPath" : {
       "type" : "string"
diff --git 
a/docs/layouts/shortcodes/generated/savepoint_config_configuration.html 
b/docs/layouts/shortcodes/generated/savepoint_config_configuration.html
index 62a25a6..2a40bd7 100644
--- a/docs/layouts/shortcodes/generated/savepoint_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/savepoint_config_configuration.html
@@ -10,9 +10,9 @@
     <tbody>
         <tr>
             <td><h5>execution.savepoint-restore-mode</h5></td>
-            <td style="word-wrap: break-word;">LEGACY</td>
+            <td style="word-wrap: break-word;">NO_CLAIM</td>
             <td><p>Enum</p></td>
-            <td>Describes the mode how Flink should restore from the given 
savepoint or retained checkpoint.<br /><br />Possible values:<ul><li>"CLAIM": 
Flink will take ownership of the given snapshot. It will clean the snapshot 
once it is subsumed by newer ones.</li><li>"LEGACY": Flink will not claim 
ownership of the snapshot and will not delete the files. However, it can 
directly depend on the existence of the files of the restored checkpoint. It 
might not be safe to delete checkpoints [...]
+            <td>Describes the mode how Flink should restore from the given 
savepoint or retained checkpoint.<br /><br />Possible values:<ul><li>"CLAIM": 
Flink will take ownership of the given snapshot. It will clean the snapshot 
once it is subsumed by newer ones.</li><li>"NO_CLAIM": Flink will not claim 
ownership of the snapshot files. However it will make sure it does not depend 
on any artefacts from the restored snapshot. In order to do that, Flink will 
take the first checkpoint as a f [...]
         </tr>
         <tr>
             <td><h5>execution.savepoint.ignore-unclaimed-state</h5></td>
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index a5f31c5..ee5384b 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -138,8 +138,10 @@ public class CliFrontendParser {
                     true,
                     "Defines how should we restore from the given savepoint. 
Supported options: "
                             + "[claim - claim ownership of the savepoint and 
delete once it is"
-                            + " subsumed, legacy (default) - do not assume 
ownership of the"
-                            + " savepoint files.");
+                            + " subsumed, no_claim (default) - do not claim 
ownership, the first"
+                            + " checkpoint will not reuse any files from the 
restored one, legacy "
+                            + "- the old behaviour, do not assume ownership of 
the savepoint files,"
+                            + " but can reuse some shared files.");
 
     static final Option SAVEPOINT_DISPOSE_OPTION =
             new Option("d", "dispose", true, "Path of savepoint to dispose.");
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index 861bb1f..2d10ad11 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -171,6 +171,26 @@ public class CliFrontendRunTest extends 
CliFrontendTestBase {
         assertTrue(savepointSettings.allowNonRestoredState());
     }
 
+    @Test
+    public void testNoClaimRestoreModeParsing() throws Exception {
+        // test configure savepoint with claim mode
+        String[] parameters = {
+            "-s", "expectedSavepointPath", "-n", "-restoreMode", "no_claim", 
getTestJarPath()
+        };
+
+        CommandLine commandLine =
+                CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, 
parameters, true);
+        ProgramOptions programOptions = ProgramOptions.create(commandLine);
+        ExecutionConfigAccessor executionOptions =
+                ExecutionConfigAccessor.fromProgramOptions(programOptions, 
Collections.emptyList());
+
+        SavepointRestoreSettings savepointSettings = 
executionOptions.getSavepointRestoreSettings();
+        assertTrue(savepointSettings.restoreSavepoint());
+        assertEquals(RestoreMode.NO_CLAIM, savepointSettings.getRestoreMode());
+        assertEquals("expectedSavepointPath", 
savepointSettings.getRestorePath());
+        assertTrue(savepointSettings.allowNonRestoredState());
+    }
+
     @Test(expected = CliArgsException.class)
     public void testUnrecognizedOption() throws Exception {
         // test unrecognized option
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
index fa9f8c0..d78208e 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
@@ -94,4 +94,9 @@ final class StubStateBackend implements StateBackend {
         return backend.createOperatorStateBackend(
                 env, operatorIdentifier, stateHandles, cancelStreamRegistry);
     }
+
+    @Override
+    public boolean supportsNoClaimRestoreMode() {
+        return backend.supportsNoClaimRestoreMode();
+    }
 }
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 8ca5d08..551ffbe 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -438,7 +438,7 @@
         },
         "restoreMode" : {
           "type" : "string",
-          "enum" : [ "CLAIM", "LEGACY" ]
+          "enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
         }
       }
     },
@@ -3378,4 +3378,4 @@
       }
     }
   } ]
-}
+}
\ No newline at end of file
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index feb8580..310ff51 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -218,6 +218,7 @@ public class CheckpointCoordinator {
     private final ExecutionAttemptMappingProvider attemptMappingProvider;
 
     private boolean baseLocationsForCheckpointInitialized = false;
+    private boolean forceFullSnapshot;
 
     // 
--------------------------------------------------------------------------------------------
 
@@ -684,9 +685,16 @@ public class CheckpointCoordinator {
         // no exception, no discarding, everything is OK
         final long checkpointId = checkpoint.getCheckpointID();
 
+        final CheckpointType type;
+        if (this.forceFullSnapshot && !request.props.isSavepoint()) {
+            type = CheckpointType.FULL_CHECKPOINT;
+        } else {
+            type = request.props.getCheckpointType();
+        }
+
         final CheckpointOptions checkpointOptions =
                 CheckpointOptions.forConfig(
-                        request.props.getCheckpointType(),
+                        type,
                         
checkpoint.getCheckpointStorageLocation().getLocationReference(),
                         isExactlyOnceMode,
                         unalignedCheckpointsEnabled,
@@ -1333,8 +1341,13 @@ public class CheckpointCoordinator {
             List<ExecutionVertex> tasksToAbort)
             throws CheckpointException {
         try {
-            return completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
-                    completedCheckpoint, checkpointsCleaner, 
this::scheduleTriggerRequest);
+            final CompletedCheckpoint subsumedCheckpoint =
+                    completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
+                            completedCheckpoint, checkpointsCleaner, 
this::scheduleTriggerRequest);
+            // reset the force full snapshot flag, we should've completed at 
least one full
+            // snapshot by now
+            this.forceFullSnapshot = false;
+            return subsumedCheckpoint;
         } catch (Exception exception) {
             if (exception instanceof PossibleInconsistentStateException) {
                 LOG.warn(
@@ -1591,6 +1604,8 @@ public class CheckpointCoordinator {
 
             LOG.info("Restoring job {} from {}.", job, latest);
 
+            this.forceFullSnapshot = latest.getProperties().isUnclaimed();
+
             // re-assign the task states
             final Map<OperatorID, OperatorState> operatorStates = 
extractOperatorStates(latest);
 
@@ -1695,6 +1710,9 @@ public class CheckpointCoordinator {
             case LEGACY:
                 checkpointProperties = 
CheckpointProperties.forSavepoint(false);
                 break;
+            case NO_CLAIM:
+                checkpointProperties = 
CheckpointProperties.forUnclaimedSnapshot();
+                break;
             default:
                 throw new IllegalArgumentException("Unknown snapshot restore 
mode");
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index 3b7e2b6..8be24d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -73,30 +73,29 @@ public class CheckpointOptions implements Serializable {
                 type, location, AlignmentType.ALIGNED, 
NO_ALIGNED_CHECKPOINT_TIME_OUT);
     }
 
-    public static CheckpointOptions 
unaligned(CheckpointStorageLocationReference location) {
+    public static CheckpointOptions unaligned(
+            CheckpointType type, CheckpointStorageLocationReference location) {
+        checkArgument(!type.isSavepoint(), "Savepoints can not be unaligned");
         return new CheckpointOptions(
-                CheckpointType.CHECKPOINT,
-                location,
-                AlignmentType.UNALIGNED,
-                NO_ALIGNED_CHECKPOINT_TIME_OUT);
+                type, location, AlignmentType.UNALIGNED, 
NO_ALIGNED_CHECKPOINT_TIME_OUT);
     }
 
     public static CheckpointOptions alignedWithTimeout(
-            CheckpointStorageLocationReference location, long 
alignedCheckpointTimeout) {
+            CheckpointType type,
+            CheckpointStorageLocationReference location,
+            long alignedCheckpointTimeout) {
+        checkArgument(!type.isSavepoint(), "Savepoints can not be unaligned");
         return new CheckpointOptions(
-                CheckpointType.CHECKPOINT,
-                location,
-                AlignmentType.ALIGNED,
-                alignedCheckpointTimeout);
+                type, location, AlignmentType.ALIGNED, 
alignedCheckpointTimeout);
     }
 
     private static CheckpointOptions forceAligned(
-            CheckpointStorageLocationReference location, long 
alignedCheckpointTimeout) {
+            CheckpointType type,
+            CheckpointStorageLocationReference location,
+            long alignedCheckpointTimeout) {
+        checkArgument(!type.isSavepoint(), "Savepoints can not be unaligned");
         return new CheckpointOptions(
-                CheckpointType.CHECKPOINT,
-                location,
-                AlignmentType.FORCED_ALIGNED,
-                alignedCheckpointTimeout);
+                type, location, AlignmentType.FORCED_ALIGNED, 
alignedCheckpointTimeout);
     }
 
     public static CheckpointOptions forConfig(
@@ -113,9 +112,9 @@ public class CheckpointOptions implements Serializable {
             return alignedNoTimeout(checkpointType, locationReference);
         } else if (alignedCheckpointTimeout == 0
                 || alignedCheckpointTimeout == NO_ALIGNED_CHECKPOINT_TIME_OUT) 
{
-            return unaligned(locationReference);
+            return unaligned(checkpointType, locationReference);
         } else {
-            return alignedWithTimeout(locationReference, 
alignedCheckpointTimeout);
+            return alignedWithTimeout(checkpointType, locationReference, 
alignedCheckpointTimeout);
         }
     }
 
@@ -190,15 +189,15 @@ public class CheckpointOptions implements Serializable {
     public CheckpointOptions withUnalignedSupported() {
         if (alignmentType == AlignmentType.FORCED_ALIGNED) {
             return alignedCheckpointTimeout != NO_ALIGNED_CHECKPOINT_TIME_OUT
-                    ? alignedWithTimeout(targetLocation, 
alignedCheckpointTimeout)
-                    : unaligned(targetLocation);
+                    ? alignedWithTimeout(checkpointType, targetLocation, 
alignedCheckpointTimeout)
+                    : unaligned(checkpointType, targetLocation);
         }
         return this;
     }
 
     public CheckpointOptions withUnalignedUnsupported() {
         if (isUnalignedCheckpoint() || isTimeoutable()) {
-            return forceAligned(targetLocation, alignedCheckpointTimeout);
+            return forceAligned(checkpointType, targetLocation, 
alignedCheckpointTimeout);
         }
         return this;
     }
@@ -228,18 +227,17 @@ public class CheckpointOptions implements Serializable {
 
     @Override
     public String toString() {
-        return "CheckpointOptions {"
-                + "checkpointType = "
+        return "CheckpointOptions{"
+                + "checkpointType="
                 + checkpointType
-                + ", targetLocation = "
+                + ", targetLocation="
                 + targetLocation
-                + ", alignment = "
+                + ", alignmentType="
                 + alignmentType
-                + ", alignedCheckpointTimeout = "
+                + ", alignedCheckpointTimeout="
                 + alignedCheckpointTimeout
-                + "}";
+                + '}';
     }
-
     // ------------------------------------------------------------------------
     //  Factory methods
     // ------------------------------------------------------------------------
@@ -255,6 +253,6 @@ public class CheckpointOptions implements Serializable {
 
     public CheckpointOptions toUnaligned() {
         checkState(alignmentType == AlignmentType.ALIGNED);
-        return unaligned(targetLocation);
+        return unaligned(checkpointType, targetLocation);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 91c9374..2fa68dd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 
 import java.io.Serializable;
 
@@ -53,6 +54,8 @@ public class CheckpointProperties implements Serializable {
     private final boolean discardFailed;
     private final boolean discardSuspended;
 
+    private final boolean unclaimed;
+
     @VisibleForTesting
     CheckpointProperties(
             boolean forced,
@@ -61,7 +64,8 @@ public class CheckpointProperties implements Serializable {
             boolean discardFinished,
             boolean discardCancelled,
             boolean discardFailed,
-            boolean discardSuspended) {
+            boolean discardSuspended,
+            boolean unclaimed) {
 
         this.forced = forced;
         this.checkpointType = checkNotNull(checkpointType);
@@ -70,6 +74,7 @@ public class CheckpointProperties implements Serializable {
         this.discardCancelled = discardCancelled;
         this.discardFailed = discardFailed;
         this.discardSuspended = discardSuspended;
+        this.unclaimed = unclaimed;
     }
 
     // ------------------------------------------------------------------------
@@ -89,6 +94,11 @@ public class CheckpointProperties implements Serializable {
         return forced;
     }
 
+    /** Returns whether the checkpoint should be restored in a {@link 
RestoreMode#NO_CLAIM} mode. */
+    public boolean isUnclaimed() {
+        return unclaimed;
+    }
+
     // ------------------------------------------------------------------------
     // Garbage collection behaviour
     // ------------------------------------------------------------------------
@@ -240,11 +250,11 @@ public class CheckpointProperties implements Serializable 
{
 
     private static final CheckpointProperties SAVEPOINT =
             new CheckpointProperties(
-                    true, CheckpointType.SAVEPOINT, false, false, false, 
false, false);
+                    true, CheckpointType.SAVEPOINT, false, false, false, 
false, false, false);
 
     private static final CheckpointProperties SAVEPOINT_NO_FORCE =
             new CheckpointProperties(
-                    false, CheckpointType.SAVEPOINT, false, false, false, 
false, false);
+                    false, CheckpointType.SAVEPOINT, false, false, false, 
false, false, false);
 
     private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED =
             new CheckpointProperties(
@@ -254,7 +264,8 @@ public class CheckpointProperties implements Serializable {
                     true, // Delete on success
                     true, // Delete on cancellation
                     true, // Delete on failure
-                    true); // Delete on suspension
+                    true, // Delete on suspension
+                    false);
 
     private static final CheckpointProperties CHECKPOINT_RETAINED_ON_FAILURE =
             new CheckpointProperties(
@@ -264,7 +275,8 @@ public class CheckpointProperties implements Serializable {
                     true, // Delete on success
                     true, // Delete on cancellation
                     false, // Retain on failure
-                    true); // Delete on suspension
+                    true, // Delete on suspension
+                    false);
 
     private static final CheckpointProperties 
CHECKPOINT_RETAINED_ON_CANCELLATION =
             new CheckpointProperties(
@@ -274,7 +286,8 @@ public class CheckpointProperties implements Serializable {
                     true, // Delete on success
                     false, // Retain on cancellation
                     false, // Retain on failure
-                    false); // Retain on suspension
+                    false, // Retain on suspension
+                    false);
 
     /**
      * Creates the checkpoint properties for a (manually triggered) savepoint.
@@ -288,6 +301,25 @@ public class CheckpointProperties implements Serializable {
         return forced ? SAVEPOINT : SAVEPOINT_NO_FORCE;
     }
 
+    /**
+     * Creates the checkpoint properties for a snapshot restored in {@link 
RestoreMode#NO_CLAIM}.
+     * Those properties should not be used when triggering a 
checkpoint/savepoint. They're useful
+     * when restoring a {@link CompletedCheckpointStore} after a JM failover.
+     *
+     * @return Checkpoint properties for a snapshot restored in {@link 
RestoreMode#NO_CLAIM}.
+     */
+    public static CheckpointProperties forUnclaimedSnapshot() {
+        return new CheckpointProperties(
+                false,
+                CheckpointType.SAVEPOINT, // unclaimed snapshot is similar to 
a savepoint
+                false,
+                false,
+                false,
+                false,
+                false,
+                true);
+    }
+
     public static CheckpointProperties forSyncSavepoint(boolean forced, 
boolean terminate) {
         return new CheckpointProperties(
                 forced,
@@ -296,6 +328,7 @@ public class CheckpointProperties implements Serializable {
                 false,
                 false,
                 false,
+                false,
                 false);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
index b378ae1..f133afc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
@@ -22,16 +22,28 @@ package org.apache.flink.runtime.checkpoint;
 public enum CheckpointType {
 
     /** A checkpoint, full or incremental. */
-    CHECKPOINT(false, PostCheckpointAction.NONE, "Checkpoint"),
+    CHECKPOINT(
+            false, PostCheckpointAction.NONE, "Checkpoint", 
SharingFilesStrategy.FORWARD_BACKWARD),
 
     /** A regular savepoint. */
-    SAVEPOINT(true, PostCheckpointAction.NONE, "Savepoint"),
+    SAVEPOINT(true, PostCheckpointAction.NONE, "Savepoint", 
SharingFilesStrategy.NO_SHARING),
 
     /** A savepoint taken while suspending the job. */
-    SAVEPOINT_SUSPEND(true, PostCheckpointAction.SUSPEND, "Suspend Savepoint"),
+    SAVEPOINT_SUSPEND(
+            true,
+            PostCheckpointAction.SUSPEND,
+            "Suspend Savepoint",
+            SharingFilesStrategy.NO_SHARING),
 
     /** A savepoint taken while terminating the job. */
-    SAVEPOINT_TERMINATE(true, PostCheckpointAction.TERMINATE, "Terminate 
Savepoint");
+    SAVEPOINT_TERMINATE(
+            true,
+            PostCheckpointAction.TERMINATE,
+            "Terminate Savepoint",
+            SharingFilesStrategy.NO_SHARING),
+
+    FULL_CHECKPOINT(
+            false, PostCheckpointAction.NONE, "Full Checkpoint", 
SharingFilesStrategy.FORWARD);
 
     private final boolean isSavepoint;
 
@@ -39,14 +51,18 @@ public enum CheckpointType {
 
     private final String name;
 
+    private final SharingFilesStrategy sharingFilesStrategy;
+
     CheckpointType(
             final boolean isSavepoint,
             final PostCheckpointAction postCheckpointAction,
-            final String name) {
+            final String name,
+            SharingFilesStrategy sharingFilesStrategy) {
 
         this.isSavepoint = isSavepoint;
         this.postCheckpointAction = postCheckpointAction;
         this.name = name;
+        this.sharingFilesStrategy = sharingFilesStrategy;
     }
 
     public boolean isSavepoint() {
@@ -77,10 +93,26 @@ public enum CheckpointType {
         return name;
     }
 
+    public SharingFilesStrategy getSharingFilesStrategy() {
+        return sharingFilesStrategy;
+    }
+
     /** What's the intended action after the checkpoint (relevant for stopping 
with savepoint). */
     public enum PostCheckpointAction {
         NONE,
         SUSPEND,
         TERMINATE
     }
+
+    /** Defines what files can be shared across snapshots. */
+    public enum SharingFilesStrategy {
+        // current snapshot can share files with previous snapshots.
+        // new snapshots can use files of the current snapshot
+        FORWARD_BACKWARD,
+        // later snapshots can share files with the current snapshot
+        FORWARD,
+        // current snapshot can not use files of older ones, future snapshots 
can
+        // not use files of the current one.
+        NO_SHARING;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index efd07be..1972427 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -204,7 +204,10 @@ public class CompletedCheckpoint implements Serializable, 
Checkpoint {
      * @param sharedStateRegistry The registry where shared states are 
registered
      */
     public void registerSharedStatesAfterRestored(SharedStateRegistry 
sharedStateRegistry) {
-        sharedStateRegistry.registerAll(operatorStates.values(), checkpointID);
+        // in claim mode we should not register any shared handles
+        if (!props.isUnclaimed()) {
+            sharedStateRegistry.registerAll(operatorStates.values(), 
checkpointID);
+        }
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index ba943a1..a44b729 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -80,6 +80,8 @@ public class EventSerializer {
 
     private static final int CHECKPOINT_TYPE_SAVEPOINT_TERMINATE = 3;
 
+    private static final int CHECKPOINT_TYPE_FULL_CHECKPOINT = 4;
+
     // ------------------------------------------------------------------------
     //  Serialization Logic
     // ------------------------------------------------------------------------
@@ -229,6 +231,8 @@ public class EventSerializer {
         final int typeInt;
         if (checkpointType == CheckpointType.CHECKPOINT) {
             typeInt = CHECKPOINT_TYPE_CHECKPOINT;
+        } else if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+            typeInt = CHECKPOINT_TYPE_FULL_CHECKPOINT;
         } else if (checkpointType == CheckpointType.SAVEPOINT) {
             typeInt = CHECKPOINT_TYPE_SAVEPOINT;
         } else if (checkpointType == CheckpointType.SAVEPOINT_SUSPEND) {
@@ -268,6 +272,8 @@ public class EventSerializer {
         final CheckpointType checkpointType;
         if (checkpointTypeCode == CHECKPOINT_TYPE_CHECKPOINT) {
             checkpointType = CheckpointType.CHECKPOINT;
+        } else if (checkpointTypeCode == CHECKPOINT_TYPE_FULL_CHECKPOINT) {
+            checkpointType = CheckpointType.FULL_CHECKPOINT;
         } else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT) {
             checkpointType = CheckpointType.SAVEPOINT;
         } else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT_SUSPEND) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
index 62c0170..10a4f2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
@@ -31,10 +31,16 @@ public enum RestoreMode implements DescribedEnum {
     CLAIM(
             "Flink will take ownership of the given snapshot. It will clean 
the"
                     + " snapshot once it is subsumed by newer ones."),
+    NO_CLAIM(
+            "Flink will not claim ownership of the snapshot files. However it 
will make sure it"
+                    + " does not depend on any artefacts from the restored 
snapshot. In order to do that,"
+                    + " Flink will take the first checkpoint as a full one, 
which means it might"
+                    + " reupload/duplicate files that are part of the restored 
checkpoint."),
     LEGACY(
-            "Flink will not claim ownership of the snapshot and will not 
delete the files. However, "
-                    + "it can directly depend on the existence of the files of 
the restored checkpoint. "
-                    + "It might not be safe to delete checkpoints that were 
restored in legacy mode ");
+            "This is the mode in which Flink worked so far. It will not claim 
ownership of the"
+                    + " snapshot and will not delete the files. However, it 
can directly depend on"
+                    + " the existence of the files of the restored checkpoint. 
It might not be safe"
+                    + " to delete checkpoints that were restored in legacy 
mode ");
 
     private final String description;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
index 17d9730..e8a9dd8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
@@ -52,7 +52,7 @@ public class SavepointConfigOptions {
     public static final ConfigOption<RestoreMode> RESTORE_MODE =
             key("execution.savepoint-restore-mode")
                     .enumType(RestoreMode.class)
-                    .defaultValue(RestoreMode.LEGACY)
+                    .defaultValue(RestoreMode.NO_CLAIM)
                     .withDescription(
                             "Describes the mode how Flink should restore from 
the given"
                                     + " savepoint or retained checkpoint.");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 9ab25df..256eb77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -33,7 +33,7 @@ public class SavepointRestoreSettings implements Serializable 
{
 
     /** No restore should happen. */
     private static final SavepointRestoreSettings NONE =
-            new SavepointRestoreSettings(null, false, RestoreMode.LEGACY);
+            new SavepointRestoreSettings(null, false, RestoreMode.NO_CLAIM);
 
     /** Savepoint restore path. */
     private final String restorePath;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 5952372..a551cfa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -23,7 +23,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
@@ -172,4 +175,17 @@ public interface StateBackend extends java.io.Serializable 
{
     default boolean useManagedMemory() {
         return false;
     }
+
+    /**
+     * Tells if a state backend supports the {@link RestoreMode#NO_CLAIM} mode.
+     *
+     * <p>If a state backend supports {@code NO_CLAIM} mode, it should create 
an independent
+     * snapshot when it receives {@link CheckpointType#FULL_CHECKPOINT} in 
{@link
+     * Snapshotable#snapshot(long, long, CheckpointStreamFactory, 
CheckpointOptions)}.
+     *
+     * @return If the state backend supports {@link RestoreMode#NO_CLAIM} mode.
+     */
+    default boolean supportsNoClaimRestoreMode() {
+        return false;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 68560ac..cacc1c2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -496,6 +496,12 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
         return true;
     }
 
+    @Override
+    public boolean supportsNoClaimRestoreMode() {
+        // we never share any files, all snapshots are full
+        return true;
+    }
+
     // ------------------------------------------------------------------------
     //  Reconfiguration
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java
index 17647bd..8e44be8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java
@@ -92,6 +92,12 @@ public class HashMapStateBackend extends 
AbstractStateBackend implements Configu
     }
 
     @Override
+    public boolean supportsNoClaimRestoreMode() {
+        // we never share any files, all snapshots are full
+        return true;
+    }
+
+    @Override
     public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
             Environment env,
             JobID jobID,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 70aa48e..c98995b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -286,6 +286,12 @@ public class MemoryStateBackend extends 
AbstractFileStateBackend
         return true;
     }
 
+    @Override
+    public boolean supportsNoClaimRestoreMode() {
+        // we never share any files, all snapshots are full
+        return true;
+    }
+
     // ------------------------------------------------------------------------
     //  Reconfiguration
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 5da3f13..0c46390 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -36,10 +38,13 @@ import 
org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -52,6 +57,7 @@ import java.util.function.Predicate;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -64,6 +70,8 @@ public class CheckpointCoordinatorTriggeringTest extends 
TestLogger {
 
     private ManuallyTriggeredScheduledExecutor 
manuallyTriggeredScheduledExecutor;
 
+    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
     @Before
     public void setUp() throws Exception {
         manuallyTriggeredScheduledExecutor = new 
ManuallyTriggeredScheduledExecutor();
@@ -166,6 +174,157 @@ public class CheckpointCoordinatorTriggeringTest extends 
TestLogger {
         }
     }
 
+    @Test
+    public void testTriggeringFullSnapshotAfterJobmasterFailover() throws 
Exception {
+        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway 
gateway =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .setTaskManagerGateway(gateway)
+                        .build();
+
+        ExecutionVertex vertex = 
graph.getJobVertex(jobVertexID).getTaskVertices()[0];
+        ExecutionAttemptID attemptID = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+
+        // create a savepoint, we can restore from later
+        final CompletedCheckpoint savepoint = takeSavepoint(graph, attemptID);
+
+        // restore from a savepoint in NO_CLAIM mode
+        final StandaloneCompletedCheckpointStore checkpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final StandaloneCheckpointIDCounter checkpointIDCounter =
+                new StandaloneCheckpointIDCounter();
+        CheckpointCoordinator checkpointCoordinator =
+                createCheckpointCoordinator(graph, checkpointStore, 
checkpointIDCounter);
+        checkpointCoordinator.restoreSavepoint(
+                SavepointRestoreSettings.forPath(
+                        savepoint.getExternalPointer(), true, 
RestoreMode.NO_CLAIM),
+                graph.getAllVertices(),
+                this.getClass().getClassLoader());
+        checkpointCoordinator.shutdown();
+
+        // imitate job manager failover
+        gateway.resetCount();
+        checkpointCoordinator =
+                createCheckpointCoordinator(graph, checkpointStore, 
checkpointIDCounter);
+        checkpointCoordinator.restoreLatestCheckpointedStateToAll(
+                new HashSet<>(graph.getAllVertices().values()), true);
+        checkpointCoordinator.startCheckpointScheduler();
+        final CompletableFuture<CompletedCheckpoint> checkpoint =
+                checkpointCoordinator.triggerCheckpoint(true);
+        manuallyTriggeredScheduledExecutor.triggerAll();
+        checkpointCoordinator.receiveAcknowledgeMessage(
+                new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 2),
+                TASK_MANAGER_LOCATION_INFO);
+        checkpoint.get();
+
+        assertThat(
+                
gateway.getOnlyTriggeredCheckpoint(attemptID).checkpointOptions.getCheckpointType(),
+                is(CheckpointType.FULL_CHECKPOINT));
+    }
+
+    @Test
+    public void testTriggeringFullCheckpoints() throws Exception {
+        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway 
gateway =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .setTaskManagerGateway(gateway)
+                        .build();
+
+        ExecutionVertex vertex = 
graph.getJobVertex(jobVertexID).getTaskVertices()[0];
+        ExecutionAttemptID attemptID = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+
+        // create a savepoint, we can restore from later
+        final CompletedCheckpoint savepoint = takeSavepoint(graph, attemptID);
+
+        // restore from a savepoint in NO_CLAIM mode
+        final StandaloneCompletedCheckpointStore checkpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final StandaloneCheckpointIDCounter checkpointIDCounter =
+                new StandaloneCheckpointIDCounter();
+        CheckpointCoordinator checkpointCoordinator =
+                createCheckpointCoordinator(graph, checkpointStore, 
checkpointIDCounter);
+        checkpointCoordinator.restoreSavepoint(
+                SavepointRestoreSettings.forPath(
+                        savepoint.getExternalPointer(), true, 
RestoreMode.NO_CLAIM),
+                graph.getAllVertices(),
+                this.getClass().getClassLoader());
+
+        // trigger a savepoint before any checkpoint completes
+        // next triggered checkpoint should still be a full one
+        takeSavepoint(graph, attemptID, checkpointCoordinator, 2);
+        checkpointCoordinator.startCheckpointScheduler();
+        gateway.resetCount();
+        // the checkpoint should be a FULL_CHECKPOINT
+        final CompletableFuture<CompletedCheckpoint> checkpoint =
+                checkpointCoordinator.triggerCheckpoint(true);
+        manuallyTriggeredScheduledExecutor.triggerAll();
+        checkpointCoordinator.receiveAcknowledgeMessage(
+                new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 3),
+                TASK_MANAGER_LOCATION_INFO);
+        checkpoint.get();
+
+        assertThat(
+                
gateway.getOnlyTriggeredCheckpoint(attemptID).checkpointOptions.getCheckpointType(),
+                is(CheckpointType.FULL_CHECKPOINT));
+    }
+
+    private CompletedCheckpoint takeSavepoint(ExecutionGraph graph, 
ExecutionAttemptID attemptID)
+            throws Exception {
+        CheckpointCoordinator checkpointCoordinator =
+                createCheckpointCoordinator(
+                        graph,
+                        new StandaloneCompletedCheckpointStore(1),
+                        new StandaloneCheckpointIDCounter());
+        final CompletedCheckpoint savepoint =
+                takeSavepoint(graph, attemptID, checkpointCoordinator, 1);
+        checkpointCoordinator.shutdown();
+        return savepoint;
+    }
+
+    private CompletedCheckpoint takeSavepoint(
+            ExecutionGraph graph,
+            ExecutionAttemptID attemptID,
+            CheckpointCoordinator checkpointCoordinator,
+            int savepointId)
+            throws Exception {
+        final CompletableFuture<CompletedCheckpoint> savepointFuture =
+                
checkpointCoordinator.triggerSavepoint(temporaryFolder.newFolder().getPath());
+        manuallyTriggeredScheduledExecutor.triggerAll();
+        checkpointCoordinator.receiveAcknowledgeMessage(
+                new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 
savepointId),
+                TASK_MANAGER_LOCATION_INFO);
+        return savepointFuture.get();
+    }
+
+    private CheckpointCoordinator createCheckpointCoordinator(
+            ExecutionGraph graph,
+            StandaloneCompletedCheckpointStore checkpointStore,
+            CheckpointIDCounter checkpointIDCounter)
+            throws Exception {
+        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration =
+                new CheckpointCoordinatorConfigurationBuilder()
+                        .setCheckpointInterval(
+                                10000) // periodic is ver long, we trigger 
checkpoint manually
+                        .setCheckpointTimeout(200000) // timeout is very long 
(200 s)
+                        .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+                        .build();
+        return new CheckpointCoordinatorBuilder()
+                .setExecutionGraph(graph)
+                
.setCheckpointCoordinatorConfiguration(checkpointCoordinatorConfiguration)
+                .setCompletedCheckpointStore(checkpointStore)
+                .setCheckpointIDCounter(checkpointIDCounter)
+                .setTimer(manuallyTriggeredScheduledExecutor)
+                .build();
+    }
+
     /**
      * This test verified that after a completed checkpoint a certain time has 
passed before another
      * is triggered.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
index 156d187..6657f51 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
@@ -112,16 +112,25 @@ public class CheckpointOptionsTest {
     public void testCheckpointIsTimeoutable() {
         CheckpointStorageLocationReference location =
                 CheckpointStorageLocationReference.getDefault();
-        assertTimeoutable(CheckpointOptions.alignedWithTimeout(location, 10), 
false, true, 10);
         assertTimeoutable(
-                CheckpointOptions.unaligned(location), true, false, 
NO_ALIGNED_CHECKPOINT_TIME_OUT);
+                
CheckpointOptions.alignedWithTimeout(CheckpointType.CHECKPOINT, location, 10),
+                false,
+                true,
+                10);
+        assertTimeoutable(
+                CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, 
location),
+                true,
+                false,
+                NO_ALIGNED_CHECKPOINT_TIME_OUT);
         assertTimeoutable(
-                CheckpointOptions.alignedWithTimeout(location, 
10).withUnalignedUnsupported(),
+                
CheckpointOptions.alignedWithTimeout(CheckpointType.CHECKPOINT, location, 10)
+                        .withUnalignedUnsupported(),
                 false,
                 false,
                 10);
         assertTimeoutable(
-                
CheckpointOptions.unaligned(location).withUnalignedUnsupported(),
+                CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, 
location)
+                        .withUnalignedUnsupported(),
                 false,
                 false,
                 NO_ALIGNED_CHECKPOINT_TIME_OUT);
@@ -131,8 +140,10 @@ public class CheckpointOptionsTest {
     public void testForceAlignmentIsReversable() {
         CheckpointStorageLocationReference location =
                 CheckpointStorageLocationReference.getDefault();
-        assertReversable(CheckpointOptions.alignedWithTimeout(location, 10), 
true);
-        assertReversable(CheckpointOptions.unaligned(location), true);
+        assertReversable(
+                
CheckpointOptions.alignedWithTimeout(CheckpointType.CHECKPOINT, location, 10),
+                true);
+        
assertReversable(CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, 
location), true);
 
         assertReversable(CheckpointOptions.alignedNoTimeout(CHECKPOINT, 
location), false);
         assertReversable(CheckpointOptions.alignedNoTimeout(SAVEPOINT, 
location), false);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
index 481559b6..4c1da95 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
@@ -37,5 +37,7 @@ public class CheckpointTypeTest {
         assertEquals(0, CheckpointType.CHECKPOINT.ordinal());
         assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
         assertEquals(2, CheckpointType.SAVEPOINT_SUSPEND.ordinal());
+        assertEquals(3, CheckpointType.SAVEPOINT_TERMINATE.ordinal());
+        assertEquals(4, CheckpointType.FULL_CHECKPOINT.ordinal());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index de05c76..d618667 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -243,7 +243,7 @@ public class CompletedCheckpointTest {
 
         CheckpointProperties props =
                 new CheckpointProperties(
-                        false, CheckpointType.CHECKPOINT, true, false, false, 
false, false);
+                        false, CheckpointType.CHECKPOINT, true, false, false, 
false, false, false);
 
         CompletedCheckpoint checkpoint =
                 new CompletedCheckpoint(
@@ -289,7 +289,14 @@ public class CompletedCheckpointTest {
             // Keep
             CheckpointProperties retainProps =
                     new CheckpointProperties(
-                            false, CheckpointType.CHECKPOINT, false, false, 
false, false, false);
+                            false,
+                            CheckpointType.CHECKPOINT,
+                            false,
+                            false,
+                            false,
+                            false,
+                            false,
+                            false);
             CompletedCheckpoint checkpoint =
                     new CompletedCheckpoint(
                             new JobID(),
@@ -315,7 +322,7 @@ public class CompletedCheckpointTest {
             // Keep
             CheckpointProperties discardProps =
                     new CheckpointProperties(
-                            false, CheckpointType.CHECKPOINT, true, true, 
true, true, true);
+                            false, CheckpointType.CHECKPOINT, true, true, 
true, true, true, false);
 
             checkpoint =
                     new CompletedCheckpoint(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index bec75d5..1cacd09 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -122,7 +122,7 @@ public class PendingCheckpointTest {
         // Forced checkpoints cannot be subsumed
         CheckpointProperties forced =
                 new CheckpointProperties(
-                        true, CheckpointType.SAVEPOINT, false, false, false, 
false, false);
+                        true, CheckpointType.SAVEPOINT, false, false, false, 
false, false, false);
         PendingCheckpoint pending = createPendingCheckpoint(forced);
         assertFalse(pending.canBeSubsumed());
 
@@ -136,7 +136,7 @@ public class PendingCheckpointTest {
         // Non-forced checkpoints can be subsumed
         CheckpointProperties subsumed =
                 new CheckpointProperties(
-                        false, CheckpointType.SAVEPOINT, false, false, false, 
false, false);
+                        false, CheckpointType.SAVEPOINT, false, false, false, 
false, false, false);
         pending = createPendingCheckpoint(subsumed);
         assertFalse(pending.canBeSubsumed());
     }
@@ -164,7 +164,7 @@ public class PendingCheckpointTest {
     public void testCompletionFuture() throws Exception {
         CheckpointProperties props =
                 new CheckpointProperties(
-                        false, CheckpointType.SAVEPOINT, false, false, false, 
false, false);
+                        false, CheckpointType.SAVEPOINT, false, false, false, 
false, false, false);
 
         // Abort declined
         PendingCheckpoint pending = createPendingCheckpoint(props);
@@ -220,7 +220,7 @@ public class PendingCheckpointTest {
     public void testAbortDiscardsState() throws Exception {
         CheckpointProperties props =
                 new CheckpointProperties(
-                        false, CheckpointType.CHECKPOINT, false, false, false, 
false, false);
+                        false, CheckpointType.CHECKPOINT, false, false, false, 
false, false, false);
         QueueExecutor executor = new QueueExecutor();
 
         OperatorState state = mock(OperatorState.class);
@@ -379,7 +379,7 @@ public class PendingCheckpointTest {
     public void testSetCanceller() throws Exception {
         final CheckpointProperties props =
                 new CheckpointProperties(
-                        false, CheckpointType.CHECKPOINT, true, true, true, 
true, true);
+                        false, CheckpointType.CHECKPOINT, true, true, true, 
true, true, false);
 
         PendingCheckpoint aborted = createPendingCheckpoint(props);
         abort(aborted, CheckpointFailureReason.CHECKPOINT_DECLINED);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
index 1835851..8ef1dc7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
@@ -31,7 +31,7 @@ public class RestoredCheckpointStatsTest {
         long triggerTimestamp = Integer.MAX_VALUE + 1L;
         CheckpointProperties props =
                 new CheckpointProperties(
-                        true, CheckpointType.SAVEPOINT, false, false, true, 
false, true);
+                        true, CheckpointType.SAVEPOINT, false, false, true, 
false, true, false);
         long restoreTimestamp = Integer.MAX_VALUE + 1L;
         String externalPath = "external-path";
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 67f2d9a..0fd1aa9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -62,6 +62,12 @@ public class EventSerializerTest {
                 1678L,
                 4623784L,
                 new CheckpointOptions(
+                        CheckpointType.FULL_CHECKPOINT,
+                        CheckpointStorageLocationReference.getDefault())),
+        new CheckpointBarrier(
+                1678L,
+                4623784L,
+                new CheckpointOptions(
                         CheckpointType.SAVEPOINT, 
CheckpointStorageLocationReference.getDefault())),
         new CheckpointBarrier(
                 1678L,
@@ -82,7 +88,9 @@ public class EventSerializerTest {
                         42L,
                         1337L,
                         CheckpointOptions.alignedWithTimeout(
-                                
CheckpointStorageLocationReference.getDefault(), 10)),
+                                CheckpointType.CHECKPOINT,
+                                
CheckpointStorageLocationReference.getDefault(),
+                                10)),
                 44),
         new SubtaskConnectionDescriptor(23, 42),
     };
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
index 9cbd96a..4088bd6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -363,6 +364,7 @@ public class PipelinedSubpartitionWithReadViewTest {
 
         CheckpointOptions options =
                 CheckpointOptions.unaligned(
+                        CheckpointType.CHECKPOINT,
                         new CheckpointStorageLocationReference(new byte[] {0, 
1, 2}));
         channelStateWriter.start(0, options);
         BufferConsumer barrierBuffer =
@@ -406,6 +408,7 @@ public class PipelinedSubpartitionWithReadViewTest {
 
         CheckpointOptions options =
                 CheckpointOptions.unaligned(
+                        CheckpointType.CHECKPOINT,
                         new CheckpointStorageLocationReference(new byte[] {0, 
1, 2}));
         BufferConsumer barrierBuffer =
                 EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, 
options), true);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.java
index c1d3cb3..bdbc57a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
@@ -49,7 +50,8 @@ public class ChannelStatePersisterTest {
                 new ChannelStatePersister(channelStateWriter, channelInfo);
 
         long checkpointId = 1L;
-        channelStateWriter.start(checkpointId, 
CheckpointOptions.unaligned(getDefault()));
+        channelStateWriter.start(
+                checkpointId, 
CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, getDefault()));
 
         persister.checkForBarrier(barrier(checkpointId));
         persister.startPersisting(checkpointId, 
Arrays.asList(buildSomeBuffer()));
@@ -115,7 +117,8 @@ public class ChannelStatePersisterTest {
             persister.stopPersisting(lateCheckpointId);
         }
         persister.checkForBarrier(barrier(lateCheckpointId));
-        channelStateWriter.start(checkpointId, 
CheckpointOptions.unaligned(getDefault()));
+        channelStateWriter.start(
+                checkpointId, 
CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, getDefault()));
         persister.startPersisting(checkpointId, 
Arrays.asList(buildSomeBuffer()));
         persister.maybePersist(buildSomeBuffer());
         persister.checkForBarrier(barrier(checkpointId));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index bc82390..30f355a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
@@ -99,7 +100,10 @@ public class LocalInputChannelTest {
     public void testNoDataPersistedAfterReceivingAlignedBarrier() throws 
Exception {
         CheckpointBarrier barrier =
                 new CheckpointBarrier(
-                        1L, 0L, 
CheckpointOptions.alignedWithTimeout(getDefault(), 123L));
+                        1L,
+                        0L,
+                        CheckpointOptions.alignedWithTimeout(
+                                CheckpointType.CHECKPOINT, getDefault(), 
123L));
         BufferConsumer barrierHolder = 
EventSerializer.toBufferConsumer(barrier, false);
         BufferConsumer data = createFilledFinishedBufferConsumer(1);
 
@@ -580,7 +584,8 @@ public class LocalInputChannelTest {
         channel.requestSubpartition(0);
 
         final CheckpointStorageLocationReference location = getDefault();
-        CheckpointOptions options = CheckpointOptions.unaligned(location);
+        CheckpointOptions options =
+                CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, 
location);
         stateWriter.start(0, options);
 
         final CheckpointBarrier barrier = new CheckpointBarrier(0, 123L, 
options);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
index f958f2d..99a562b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
@@ -45,7 +46,10 @@ public class RecoveredInputChannelTest {
 
     @Test(expected = CheckpointException.class)
     public void testCheckpointStartImpossible() throws CheckpointException {
-        buildChannel().checkpointStarted(new CheckpointBarrier(0L, 0L, 
unaligned(getDefault())));
+        buildChannel()
+                .checkpointStarted(
+                        new CheckpointBarrier(
+                                0L, 0L, unaligned(CheckpointType.CHECKPOINT, 
getDefault())));
     }
 
     private RecoveredInputChannel buildChannel() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 9c0418b..c3337d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -117,9 +118,10 @@ import static org.mockito.Mockito.when;
 public class RemoteInputChannelTest {
 
     private static final long CHECKPOINT_ID = 1L;
-    private static final CheckpointOptions UNALIGNED = 
CheckpointOptions.unaligned(getDefault());
+    private static final CheckpointOptions UNALIGNED =
+            CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, 
getDefault());
     private static final CheckpointOptions ALIGNED_WITH_TIMEOUT =
-            alignedWithTimeout(getDefault(), 10);
+            alignedWithTimeout(CheckpointType.CHECKPOINT, getDefault(), 10);
 
     @Test
     public void testGateNotifiedOnBarrierConversion() throws IOException, 
InterruptedException {
@@ -142,7 +144,12 @@ public class RemoteInputChannelTest {
             channel.onBuffer(
                     toBuffer(
                             new CheckpointBarrier(
-                                    1L, 123L, alignedWithTimeout(getDefault(), 
Integer.MAX_VALUE)),
+                                    1L,
+                                    123L,
+                                    alignedWithTimeout(
+                                            CheckpointType.CHECKPOINT,
+                                            getDefault(),
+                                            Integer.MAX_VALUE)),
                             false),
                     sequenceNumber,
                     0);
@@ -207,7 +214,9 @@ public class RemoteInputChannelTest {
 
         inputChannel.checkpointStarted(
                 new CheckpointBarrier(
-                        42, System.currentTimeMillis(), 
CheckpointOptions.unaligned(getDefault())));
+                        42,
+                        System.currentTimeMillis(),
+                        CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, 
getDefault())));
 
         final Buffer buffer = createBuffer(TestBufferFactory.BUFFER_SIZE);
 
@@ -1437,7 +1446,12 @@ public class RemoteInputChannelTest {
         Buffer barrier =
                 EventSerializer.toBuffer(
                         new CheckpointBarrier(
-                                1L, 123L, alignedWithTimeout(getDefault(), 
Integer.MAX_VALUE)),
+                                1L,
+                                123L,
+                                alignedWithTimeout(
+                                        CheckpointType.CHECKPOINT,
+                                        getDefault(),
+                                        Integer.MAX_VALUE)),
                         false);
         remoteChannel1.onBuffer(barrier, 0, 0);
         remoteChannel2.onBuffer(barrier, 0, 0);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index ab2c4fb..ece8db7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -70,6 +70,7 @@ public class DummyEnvironment implements Environment {
     private final AccumulatorRegistry accumulatorRegistry =
             new AccumulatorRegistry(jobId, executionId);
     private UserCodeClassLoader userClassLoader;
+    private final Configuration taskConfiguration = new Configuration();
 
     public DummyEnvironment() {
         this("Test Job", 1, 0, 1);
@@ -122,7 +123,7 @@ public class DummyEnvironment implements Environment {
 
     @Override
     public Configuration getTaskConfiguration() {
-        return new Configuration();
+        return taskConfiguration;
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index bc1da60..7fa2e50 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -302,6 +302,13 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
     //  State backend methods
     // ------------------------------------------------------------------------
 
+    @Override
+    public boolean supportsNoClaimRestoreMode() {
+        // We are able to create CheckpointType#FULL_CHECKPOINT. (we might 
potentially reupload some
+        // shared files when taking incremental snapshots)
+        return true;
+    }
+
     private void lazyInitializeForJob(
             Environment env, @SuppressWarnings("unused") String 
operatorIdentifier)
             throws IOException {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 3b0a71d..a3d5429 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -281,6 +281,13 @@ public class RocksDBStateBackend extends 
AbstractManagedMemoryStateBackend
         return checkpointStreamBackend;
     }
 
+    @Override
+    public boolean supportsNoClaimRestoreMode() {
+        // We are able to create CheckpointType#FULL_CHECKPOINT. (we might 
potentially reupload some
+        // shared files when taking incremental snapshots)
+        return true;
+    }
+
     // ------------------------------------------------------------------------
     //  Checkpoint initialization and persistent storage
     // ------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 7222ac9..bb96e01 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -64,6 +65,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -191,11 +193,27 @@ public class RocksIncrementalSnapshotStrategy<K>
             return registry -> SnapshotResult.empty();
         }
 
+        final PreviousSnapshot previousSnapshot;
+        final CheckpointType.SharingFilesStrategy sharingFilesStrategy =
+                
checkpointOptions.getCheckpointType().getSharingFilesStrategy();
+        switch (sharingFilesStrategy) {
+            case FORWARD_BACKWARD:
+                previousSnapshot = snapshotResources.previousSnapshot;
+                break;
+            case FORWARD:
+                previousSnapshot = EMPTY_PREVIOUS_SNAPSHOT;
+                break;
+            case NO_SHARING:
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported sharing files strategy: " + 
sharingFilesStrategy);
+        }
+
         return new RocksDBIncrementalSnapshotOperation(
                 checkpointId,
                 checkpointStreamFactory,
                 snapshotResources.snapshotDirectory,
-                snapshotResources.previousSnapshot,
+                previousSnapshot,
                 snapshotResources.stateMetaInfoSnapshots);
     }
 
@@ -589,6 +607,9 @@ public class RocksIncrementalSnapshotStrategy<K>
         }
     }
 
+    private static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
+            new PreviousSnapshot(Collections.emptySet(), 
Collections.emptyMap());
+
     private static class PreviousSnapshot {
 
         @Nullable private final Set<StateHandleID> confirmedSstFiles;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index f0715bc..1305284 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -116,6 +116,12 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
             } else {
                 return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);
             }
+        } else if (checkpointOptions.getCheckpointType() == 
CheckpointType.FULL_CHECKPOINT) {
+            // see FLINK-25256
+            throw new IllegalStateException(
+                    "Using externally induced sources, we can not enforce 
taking a full checkpoint."
+                            + "If you are restoring from a snapshot in 
NO_CLAIM mode, please use"
+                            + " either CLAIM or LEGACY mode.");
         } else {
             return CompletableFuture.completedFuture(isRunning());
         }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index e10934c..56903b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -258,6 +258,12 @@ public class SourceStreamTask<
             } else {
                 return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);
             }
+        } else if (checkpointOptions.getCheckpointType() == 
CheckpointType.FULL_CHECKPOINT) {
+            // see FLINK-25256
+            throw new IllegalStateException(
+                    "Using externally induced sources, we can not enforce 
taking a full checkpoint."
+                            + "If you are restoring from a snapshot in 
NO_CLAIM mode, please use"
+                            + " either CLAIM or LEGACY mode.");
         } else {
             // we do not trigger checkpoints here, we simply state whether we 
can trigger them
             synchronized (lock) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 01fb55c..f8e7fd8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -54,6 +54,7 @@ import 
org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
 import org.apache.flink.runtime.jobgraph.tasks.CoordinatedTask;
 import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
@@ -706,6 +707,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                     }
                 });
 
+        // We wait for all input channel state to recover before we go into 
RUNNING state, and thus
+        // start checkpointing. If we implement incremental checkpointing of 
input channel state
+        // we must make sure it supports CheckpointType#FULL_CHECKPOINT
         List<CompletableFuture<?>> recoveredFutures = new 
ArrayList<>(inputGates.length);
         for (InputGate inputGate : inputGates) {
             recoveredFutures.add(inputGate.getStateConsumedFuture());
@@ -1063,6 +1067,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
+        checkForcedFullSnapshotSupport(checkpointOptions);
 
         CompletableFuture<Boolean> result = new CompletableFuture<>();
         mainMailboxExecutor.execute(
@@ -1271,6 +1276,21 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         }
     }
 
+    private void checkForcedFullSnapshotSupport(CheckpointOptions 
checkpointOptions) {
+        if (checkpointOptions.getCheckpointType() == 
CheckpointType.FULL_CHECKPOINT
+                && !stateBackend.supportsNoClaimRestoreMode()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Configured state backend (%s) does not support 
enforcing a full"
+                                    + " snapshot. If you are restoring in %s 
mode, please"
+                                    + " consider choosing either %s or %s 
restore mode.",
+                            stateBackend,
+                            RestoreMode.NO_CLAIM,
+                            RestoreMode.CLAIM,
+                            RestoreMode.LEGACY));
+        }
+    }
+
     protected void declineCheckpoint(long checkpointId) {
         getEnvironment()
                 .declineCheckpoint(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
index 7cf2684..26d0e39 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
@@ -115,7 +115,9 @@ public class AlternatingCheckpointsTest {
             send(
                     toBuffer(
                             new CheckpointBarrier(
-                                    1, clock.relativeTimeMillis(), 
unaligned(getDefault())),
+                                    1,
+                                    clock.relativeTimeMillis(),
+                                    unaligned(CheckpointType.CHECKPOINT, 
getDefault())),
                             true),
                     1,
                     gate);
@@ -139,7 +141,8 @@ public class AlternatingCheckpointsTest {
                     new CheckpointBarrier(
                             1,
                             clock.relativeTimeMillis(),
-                            alignedWithTimeout(getDefault(), 
Integer.MAX_VALUE));
+                            alignedWithTimeout(
+                                    CheckpointType.CHECKPOINT, getDefault(), 
Integer.MAX_VALUE));
 
             send(
                     toBuffer(new EventAnnouncement(aligned, 0), true),
@@ -210,7 +213,8 @@ public class AlternatingCheckpointsTest {
                     barrier(
                             1,
                             clock.relativeTimeMillis(),
-                            alignedWithTimeout(getDefault(), 
alignmentTimeOut));
+                            alignedWithTimeout(
+                                    CheckpointType.CHECKPOINT, getDefault(), 
alignmentTimeOut));
             ((RemoteInputChannel) 
gate.getChannel(0)).onBuffer(barrier1.retainBuffer(), 0, 0);
             assertAnnouncement(gate);
             clock.advanceTime(alignmentTimeOut + 1, TimeUnit.MILLISECONDS);
@@ -221,7 +225,8 @@ public class AlternatingCheckpointsTest {
                     barrier(
                             2,
                             clock.relativeTimeMillis(),
-                            alignedWithTimeout(getDefault(), 
alignmentTimeOut));
+                            alignedWithTimeout(
+                                    CheckpointType.CHECKPOINT, getDefault(), 
alignmentTimeOut));
             ((RemoteInputChannel) 
gate.getChannel(0)).onBuffer(barrier2.retainBuffer(), 1, 0);
             assertAnnouncement(gate);
             assertBarrier(gate);
@@ -230,8 +235,9 @@ public class AlternatingCheckpointsTest {
             assertThat(
                     target.getTriggeredCheckpointOptions(),
                     contains(
-                            unaligned(getDefault()),
-                            alignedWithTimeout(getDefault(), 
alignmentTimeOut)));
+                            unaligned(CheckpointType.CHECKPOINT, getDefault()),
+                            alignedWithTimeout(
+                                    CheckpointType.CHECKPOINT, getDefault(), 
alignmentTimeOut)));
         }
     }
 
@@ -281,7 +287,8 @@ public class AlternatingCheckpointsTest {
                     barrier(
                             1,
                             clock.relativeTimeMillis(),
-                            alignedWithTimeout(getDefault(), 
Integer.MAX_VALUE)),
+                            alignedWithTimeout(
+                                    CheckpointType.CHECKPOINT, getDefault(), 
Integer.MAX_VALUE)),
                     2,
                     gate);
 
@@ -309,7 +316,9 @@ public class AlternatingCheckpointsTest {
         assertBarrier(gate);
         assertBarrier(gate);
         assertEquals(1, target.getTriggeredCheckpointCounter());
-        assertThat(target.getTriggeredCheckpointOptions(), 
contains(unaligned(getDefault())));
+        assertThat(
+                target.getTriggeredCheckpointOptions(),
+                contains(unaligned(CheckpointType.CHECKPOINT, getDefault())));
         // Followed by overtaken buffers
         assertData(gate);
         assertData(gate);
@@ -464,7 +473,9 @@ public class AlternatingCheckpointsTest {
             clock.executeCallables();
             assertBarrier(gate);
             assertEquals(1, target.getTriggeredCheckpointCounter());
-            assertThat(target.getTriggeredCheckpointOptions(), 
contains(unaligned(getDefault())));
+            assertThat(
+                    target.getTriggeredCheckpointOptions(),
+                    contains(unaligned(CheckpointType.CHECKPOINT, 
getDefault())));
             // Followed by overtaken buffers
             assertData(gate);
             assertData(gate);
@@ -572,7 +583,9 @@ public class AlternatingCheckpointsTest {
         send(checkpointBarrier, 0, gate);
 
         clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
-        assertThat(target.getTriggeredCheckpointOptions(), 
contains(unaligned(getDefault())));
+        assertThat(
+                target.getTriggeredCheckpointOptions(),
+                contains(unaligned(CheckpointType.CHECKPOINT, getDefault())));
     }
 
     @Test
@@ -591,7 +604,8 @@ public class AlternatingCheckpointsTest {
                 new CheckpointBarrier(
                         1,
                         clock.relativeTimeMillis(),
-                        alignedWithTimeout(getDefault(), alignmentTimeout));
+                        alignedWithTimeout(
+                                CheckpointType.CHECKPOINT, getDefault(), 
alignmentTimeout));
         Buffer checkpointBarrierBuffer = toBuffer(checkpointBarrier, false);
 
         // we set timer on announcement and test channels do not produce 
announcements by themselves
@@ -607,7 +621,9 @@ public class AlternatingCheckpointsTest {
         send(checkpointBarrierBuffer, 1, gate);
 
         assertThat(target.getTriggeredCheckpointOptions().size(), equalTo(1));
-        assertThat(target.getTriggeredCheckpointOptions(), 
contains(unaligned(getDefault())));
+        assertThat(
+                target.getTriggeredCheckpointOptions(),
+                contains(unaligned(CheckpointType.CHECKPOINT, getDefault())));
         assertFalse(((TestInputChannel) gate.getChannel(0)).isBlocked());
         assertFalse(((TestInputChannel) gate.getChannel(1)).isBlocked());
     }
@@ -630,7 +646,9 @@ public class AlternatingCheckpointsTest {
         send(checkpointBarrier, 1, gate);
         clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
 
-        assertThat(target.getTriggeredCheckpointOptions(), 
not(contains(unaligned(getDefault()))));
+        assertThat(
+                target.getTriggeredCheckpointOptions(),
+                not(contains(unaligned(CheckpointType.CHECKPOINT, 
getDefault()))));
     }
 
     @Test
@@ -715,7 +733,9 @@ public class AlternatingCheckpointsTest {
             assertBarrier(gate);
             assertBarrier(gate);
             assertEquals(1, target.getTriggeredCheckpointCounter());
-            assertThat(target.getTriggeredCheckpointOptions(), 
contains(unaligned(getDefault())));
+            assertThat(
+                    target.getTriggeredCheckpointOptions(),
+                    contains(unaligned(CheckpointType.CHECKPOINT, 
getDefault())));
             // Followed by overtaken buffers
             assertData(gate);
             assertData(gate);
@@ -753,7 +773,9 @@ public class AlternatingCheckpointsTest {
             assertBarrier(gate);
             assertBarrier(gate);
             assertEquals(1, target.getTriggeredCheckpointCounter());
-            assertThat(target.getTriggeredCheckpointOptions(), 
contains(unaligned(getDefault())));
+            assertThat(
+                    target.getTriggeredCheckpointOptions(),
+                    contains(unaligned(CheckpointType.CHECKPOINT, 
getDefault())));
             // Followed by overtaken buffers
             assertData(gate);
             assertData(gate);
@@ -796,7 +818,9 @@ public class AlternatingCheckpointsTest {
             assertBarrier(gate);
             assertBarrier(gate);
             assertEquals(1, target.getTriggeredCheckpointCounter());
-            assertThat(target.getTriggeredCheckpointOptions(), 
contains(unaligned(getDefault())));
+            assertThat(
+                    target.getTriggeredCheckpointOptions(),
+                    contains(unaligned(CheckpointType.CHECKPOINT, 
getDefault())));
             // Followed by overtaken buffers
             assertData(gate);
             assertData(gate);
@@ -837,7 +861,9 @@ public class AlternatingCheckpointsTest {
             assertBarrier(gate);
 
             assertEquals(1, target.getTriggeredCheckpointCounter());
-            assertThat(target.getTriggeredCheckpointOptions(), 
contains(unaligned(getDefault())));
+            assertThat(
+                    target.getTriggeredCheckpointOptions(),
+                    contains(unaligned(CheckpointType.CHECKPOINT, 
getDefault())));
             // Followed by overtaken buffers
             assertData(gate);
         }
@@ -871,7 +897,7 @@ public class AlternatingCheckpointsTest {
                                         new CheckpointBarrier(
                                                 1,
                                                 clock.relativeTimeMillis(),
-                                                unaligned(getDefault())),
+                                                
unaligned(CheckpointType.CHECKPOINT, getDefault())),
                                         true)
                                 .retainBuffer(),
                         2,
@@ -906,7 +932,10 @@ public class AlternatingCheckpointsTest {
                             barrier(
                                     1,
                                     clock.relativeTimeMillis(),
-                                    alignedWithTimeout(getDefault(), 
alignmentTimeOut)),
+                                    alignedWithTimeout(
+                                            CheckpointType.CHECKPOINT,
+                                            getDefault(),
+                                            alignmentTimeOut)),
                             0,
                             0);
             assertAnnouncement(gate);
@@ -928,7 +957,9 @@ public class AlternatingCheckpointsTest {
             assertEvent(gate, EndOfPartitionEvent.class);
 
             assertEquals(1, target.getTriggeredCheckpointCounter());
-            assertThat(target.getTriggeredCheckpointOptions(), 
contains(unaligned(getDefault())));
+            assertThat(
+                    target.getTriggeredCheckpointOptions(),
+                    contains(unaligned(CheckpointType.CHECKPOINT, 
getDefault())));
         }
     }
 
@@ -956,7 +987,10 @@ public class AlternatingCheckpointsTest {
                             barrier(
                                     1,
                                     clock.relativeTimeMillis(),
-                                    alignedWithTimeout(getDefault(), 
alignmentTimeOut)),
+                                    alignedWithTimeout(
+                                            CheckpointType.CHECKPOINT,
+                                            getDefault(),
+                                            alignmentTimeOut)),
                             0,
                             0);
             getChannel(gate, 1).onBuffer(endOfPartition(), 0, 0);
@@ -977,7 +1011,10 @@ public class AlternatingCheckpointsTest {
                             barrier(
                                     1,
                                     clock.relativeTimeMillis(),
-                                    alignedWithTimeout(getDefault(), 
alignmentTimeOut)),
+                                    alignedWithTimeout(
+                                            CheckpointType.CHECKPOINT,
+                                            getDefault(),
+                                            alignmentTimeOut)),
                             0,
                             0);
             assertAnnouncement(gate);
@@ -1048,13 +1085,25 @@ public class AlternatingCheckpointsTest {
 
         startNanos = clock.relativeTimeNanos();
         long checkpoint3CreationTime = clock.relativeTimeMillis() - 7;
-        send(barrier(3, checkpoint3CreationTime, unaligned(getDefault())), 0, 
gate);
+        send(
+                barrier(
+                        3,
+                        checkpoint3CreationTime,
+                        unaligned(CheckpointType.CHECKPOINT, getDefault())),
+                0,
+                gate);
         sendData(bufferSize, 0, gate);
         sendData(bufferSize, 1, gate);
         assertMetrics(
                 target, gate.getCheckpointBarrierHandler(), 3L, startNanos, 
0L, 7_000_000L, -1L);
         clock.advanceTime(10, TimeUnit.MILLISECONDS);
-        send(barrier(3, checkpoint2CreationTime, unaligned(getDefault())), 1, 
gate);
+        send(
+                barrier(
+                        3,
+                        checkpoint2CreationTime,
+                        unaligned(CheckpointType.CHECKPOINT, getDefault())),
+                1,
+                gate);
         assertMetrics(
                 target,
                 gate.getCheckpointBarrierHandler(),
@@ -1260,7 +1309,7 @@ public class AlternatingCheckpointsTest {
         CheckpointOptions options =
                 checkpointType.isSavepoint()
                         ? alignedNoTimeout(checkpointType, getDefault())
-                        : unaligned(getDefault());
+                        : unaligned(CheckpointType.CHECKPOINT, getDefault());
         Buffer barrier = barrier(barrierId, 1, options);
         send(barrier.retainBuffer(), fast, checkpointedGate);
         assertEquals(checkpointType.isSavepoint(), 
target.triggeredCheckpoints.isEmpty());
@@ -1318,7 +1367,8 @@ public class AlternatingCheckpointsTest {
         return barrier(
                 checkpointId,
                 clock.relativeTimeMillis(),
-                alignedWithTimeout(getDefault(), alignedCheckpointTimeout));
+                alignedWithTimeout(
+                        CheckpointType.CHECKPOINT, getDefault(), 
alignedCheckpointTimeout));
     }
 
     private Buffer barrier(long barrierId, long barrierTimestamp, 
CheckpointOptions options)
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
index ae531f15..ad3857f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io.checkpointing;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.MockChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
@@ -270,7 +271,9 @@ public class CheckpointedInputGateTest {
         return new CheckpointBarrier(
                 barrierId,
                 barrierId,
-                
CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault()));
+                CheckpointOptions.unaligned(
+                        CheckpointType.CHECKPOINT,
+                        CheckpointStorageLocationReference.getDefault()));
     }
 
     private void assertAddedInputSize(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java
index 00bf050..b3d417e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io.checkpointing;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.MockChannelStateWriter;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -105,6 +106,7 @@ public class InputProcessorUtilTest {
                                     1,
                                     42,
                                     CheckpointOptions.unaligned(
+                                            CheckpointType.CHECKPOINT,
                                             
CheckpointStorageLocationReference.getDefault())),
                             new InputChannelInfo(inputGate.getGateIndex(), 
channelId),
                             false);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsTest.java
index c1b348e..9cf01a6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsTest.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -899,7 +900,9 @@ public class UnalignedCheckpointsTest {
         sizeCounter++;
         return new BufferOrEvent(
                 new CheckpointBarrier(
-                        checkpointId, timestamp, 
CheckpointOptions.unaligned(getDefault())),
+                        checkpointId,
+                        timestamp,
+                        CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, 
getDefault())),
                 new InputChannelInfo(0, channel));
     }
 
@@ -1055,7 +1058,8 @@ public class UnalignedCheckpointsTest {
     }
 
     private CheckpointBarrier buildCheckpointBarrier(long id) {
-        return new CheckpointBarrier(id, 0, 
CheckpointOptions.unaligned(getDefault()));
+        return new CheckpointBarrier(
+                id, 0, CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, 
getDefault()));
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
index 521dbd0..46bc3a1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
@@ -366,7 +366,9 @@ public class 
MultipleInputStreamTaskChainedSourcesCheckpointingTest {
     public void 
testTriggerUnalignedCheckpointWithFinishedChannelsAndSourceChain()
             throws Exception {
         testTriggerCheckpointWithFinishedChannelsAndSourceChain(
-                
CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault()));
+                CheckpointOptions.unaligned(
+                        CheckpointType.CHECKPOINT,
+                        CheckpointStorageLocationReference.getDefault()));
     }
 
     @Test
@@ -374,7 +376,9 @@ public class 
MultipleInputStreamTaskChainedSourcesCheckpointingTest {
             throws Exception {
         testTriggerCheckpointWithFinishedChannelsAndSourceChain(
                 CheckpointOptions.alignedWithTimeout(
-                        CheckpointStorageLocationReference.getDefault(), 10L));
+                        CheckpointType.CHECKPOINT,
+                        CheckpointStorageLocationReference.getDefault(),
+                        10L));
     }
 
     private void testTriggerCheckpointWithFinishedChannelsAndSourceChain(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 682e873..2d8866c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -933,14 +933,18 @@ public class MultipleInputStreamTaskTest {
     @Test
     public void testTriggeringUnalignedCheckpointWithFinishedChannels() throws 
Exception {
         testTriggeringCheckpointWithFinishedChannels(
-                
CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault()));
+                CheckpointOptions.unaligned(
+                        CheckpointType.CHECKPOINT,
+                        CheckpointStorageLocationReference.getDefault()));
     }
 
     @Test
     public void 
testTriggeringAlignedWithTimeoutCheckpointWithFinishedChannels() throws 
Exception {
         testTriggeringCheckpointWithFinishedChannels(
                 CheckpointOptions.alignedWithTimeout(
-                        CheckpointStorageLocationReference.getDefault(), 10L));
+                        CheckpointType.CHECKPOINT,
+                        CheckpointStorageLocationReference.getDefault(),
+                        10L));
     }
 
     private void 
testTriggeringCheckpointWithFinishedChannels(CheckpointOptions 
checkpointOptions)
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
index 77df7e7..8780e70 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
@@ -507,14 +507,18 @@ public class StreamTaskFinalCheckpointsTest {
     @Test
     public void testTriggeringUnalignedCheckpointWithFinishedChannels() throws 
Exception {
         testTriggeringCheckpointWithFinishedChannels(
-                
CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault()));
+                CheckpointOptions.unaligned(
+                        CheckpointType.CHECKPOINT,
+                        CheckpointStorageLocationReference.getDefault()));
     }
 
     @Test
     public void 
testTriggeringAlignedWithTimeoutCheckpointWithFinishedChannels() throws 
Exception {
         testTriggeringCheckpointWithFinishedChannels(
                 CheckpointOptions.alignedWithTimeout(
-                        CheckpointStorageLocationReference.getDefault(), 10L));
+                        CheckpointType.CHECKPOINT,
+                        CheckpointStorageLocationReference.getDefault(),
+                        10L));
     }
 
     private void 
testTriggeringCheckpointWithFinishedChannels(CheckpointOptions 
checkpointOptions)
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 77ddadd..88e4195 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -86,6 +86,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
 import 
org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
 import org.apache.flink.runtime.taskmanager.AsynchronousException;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
@@ -198,6 +199,7 @@ import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.nullable;
@@ -625,6 +627,39 @@ public class StreamTaskTest extends TestLogger {
         task.waitForTaskCompletion(false);
     }
 
+    @Test
+    public void testForceFullSnapshotOnIncompatibleStateBackend() throws 
Exception {
+        try (StreamTaskMailboxTestHarness<Integer> harness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .modifyStreamConfig(
+                                config -> config.setStateBackend(new 
OnlyIncrementalStateBackend()))
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .setupOutputForSingletonOperatorChain(new 
StreamMap<>(value -> null))
+                        .build()) {
+            final IllegalStateException exception =
+                    assertThrows(
+                            IllegalStateException.class,
+                            () -> {
+                                harness.streamTask.triggerCheckpointAsync(
+                                        new CheckpointMetaData(42L, 1L),
+                                        CheckpointOptions.forConfig(
+                                                CheckpointType.FULL_CHECKPOINT,
+                                                getDefault(),
+                                                true,
+                                                false,
+                                                0L));
+                            });
+            assertThat(
+                    exception.getMessage(),
+                    equalTo(
+                            "Configured state backend 
(OnlyIncrementalStateBackend) does not"
+                                    + " support enforcing a full snapshot. If 
you are restoring in"
+                                    + " NO_CLAIM mode, please consider 
choosing either CLAIM or"
+                                    + " LEGACY restore mode."));
+        }
+    }
+
     /**
      * Tests that in case of a failing AsyncCheckpointRunnable all operator 
snapshot results are
      * cancelled and all non partitioned state handles are discarded.
@@ -2658,4 +2693,16 @@ public class StreamTaskTest extends TestLogger {
             return notifiedCheckpoint;
         }
     }
+
+    private static final class OnlyIncrementalStateBackend extends 
MockStateBackend {
+        @Override
+        public boolean supportsNoClaimRestoreMode() {
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            return "OnlyIncrementalStateBackend";
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index ab7a6fd..1674808 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -114,7 +114,7 @@ public class SubtaskCheckpointCoordinatorTest {
         coordinator.initInputsCheckpoint(
                 1L,
                 unalignedCheckpointEnabled
-                        ? CheckpointOptions.unaligned(locationReference)
+                        ? 
CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, locationReference)
                         : CheckpointOptions.alignedNoTimeout(checkpointType, 
locationReference));
         return writer.started;
     }
@@ -216,7 +216,9 @@ public class SubtaskCheckpointCoordinatorTest {
                     };
 
             CheckpointOptions forcedAlignedOptions =
-                    
CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())
+                    CheckpointOptions.unaligned(
+                                    CheckpointType.CHECKPOINT,
+                                    
CheckpointStorageLocationReference.getDefault())
                             .withUnalignedUnsupported();
             coordinator.checkpointState(
                     new CheckpointMetaData(checkpointId, 0),
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q 
b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index cc8ff64..361c360 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -39,7 +39,7 @@ CREATE TABLE hive_table (
 # list the configured configuration
 set;
 'execution.attached' = 'true'
-'execution.savepoint-restore-mode' = 'LEGACY'
+'execution.savepoint-restore-mode' = 'NO_CLAIM'
 'execution.savepoint.ignore-unclaimed-state' = 'false'
 'execution.shutdown-on-attached-exit' = 'false'
 'execution.target' = 'remote'
@@ -58,7 +58,7 @@ reset;
 
 set;
 'execution.attached' = 'true'
-'execution.savepoint-restore-mode' = 'LEGACY'
+'execution.savepoint-restore-mode' = 'NO_CLAIM'
 'execution.savepoint.ignore-unclaimed-state' = 'false'
 'execution.shutdown-on-attached-exit' = 'false'
 'execution.target' = 'remote'
@@ -92,7 +92,7 @@ Was expecting one of:
 
 set;
 'execution.attached' = 'true'
-'execution.savepoint-restore-mode' = 'LEGACY'
+'execution.savepoint-restore-mode' = 'NO_CLAIM'
 'execution.savepoint.ignore-unclaimed-state' = 'false'
 'execution.shutdown-on-attached-exit' = 'false'
 'execution.target' = 'remote'
@@ -113,7 +113,7 @@ reset 'execution.attached';
 
 set;
 'execution.attached' = 'true'
-'execution.savepoint-restore-mode' = 'LEGACY'
+'execution.savepoint-restore-mode' = 'NO_CLAIM'
 'execution.savepoint.ignore-unclaimed-state' = 'false'
 'execution.shutdown-on-attached-exit' = 'false'
 'execution.target' = 'remote'
@@ -135,7 +135,7 @@ $VAR_UDF_JAR_PATH
 
 set;
 'execution.attached' = 'true'
-'execution.savepoint-restore-mode' = 'LEGACY'
+'execution.savepoint-restore-mode' = 'NO_CLAIM'
 'execution.savepoint.ignore-unclaimed-state' = 'false'
 'execution.shutdown-on-attached-exit' = 'false'
 'execution.target' = 'remote'
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 16b0023..3320c66 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.time.Deadline;
@@ -39,6 +41,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -67,7 +70,9 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -82,7 +87,10 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
 import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -106,6 +114,7 @@ import java.net.URISyntaxException;
 import java.nio.file.FileVisitOption;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
@@ -354,6 +363,107 @@ public class SavepointITCase extends TestLogger {
                 });
     }
 
+    @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
+    @Test
+    public void testTriggerSavepointAndResumeWithNoClaim() throws Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+        final int parallelism = numTaskManagers * numSlotsPerTaskManager;
+
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
+        env.getCheckpointConfig()
+                .enableExternalizedCheckpoints(
+                        
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        
env.getCheckpointConfig().setCheckpointStorage(folder.newFolder().toURI());
+        env.setParallelism(parallelism);
+
+        final SharedReference<CountDownLatch> counter =
+                sharedObjects.add(new CountDownLatch(10_000));
+        env.fromSequence(1, Long.MAX_VALUE)
+                .keyBy(i -> i % parallelism)
+                .process(
+                        new KeyedProcessFunction<Long, Long, Long>() {
+                            private ListState<Long> last;
+
+                            @Override
+                            public void open(Configuration parameters) {
+                                // we use list state here to create sst files 
of a significant size
+                                // if sst files do not reach certain 
thresholds they are not stored
+                                // in files, but as a byte stream in 
checkpoints metadata
+                                last =
+                                        getRuntimeContext()
+                                                .getListState(
+                                                        new 
ListStateDescriptor<>(
+                                                                "last",
+                                                                
BasicTypeInfo.LONG_TYPE_INFO));
+                            }
+
+                            @Override
+                            public void processElement(
+                                    Long value,
+                                    KeyedProcessFunction<Long, Long, 
Long>.Context ctx,
+                                    Collector<Long> out)
+                                    throws Exception {
+                                last.add(value);
+                                out.collect(value);
+                            }
+                        })
+                .addSink(
+                        new SinkFunction<Long>() {
+                            @Override
+                            public void invoke(Long value) {
+                                counter.consumeSync(CountDownLatch::countDown);
+                            }
+                        })
+                .setParallelism(1);
+
+        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+        MiniClusterWithClientResource cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberTaskManagers(numTaskManagers)
+                                
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+                                .build());
+        cluster.before();
+        try {
+            final JobID jobID1 = new JobID();
+            jobGraph.setJobID(jobID1);
+            cluster.getClusterClient().submitJob(jobGraph).get();
+            CommonTestUtils.waitForAllTaskRunning(cluster.getMiniCluster(), 
jobID1, false);
+            // wait for some records to be processed before taking the 
checkpoint
+            counter.get().await();
+            final String firstCheckpoint = 
cluster.getMiniCluster().triggerCheckpoint(jobID1).get();
+
+            cluster.getClusterClient().cancel(jobID1).get();
+            jobGraph.setSavepointRestoreSettings(
+                    SavepointRestoreSettings.forPath(firstCheckpoint, false, 
RestoreMode.NO_CLAIM));
+            final JobID jobID2 = new JobID();
+            jobGraph.setJobID(jobID2);
+            cluster.getClusterClient().submitJob(jobGraph).get();
+            CommonTestUtils.waitForAllTaskRunning(cluster.getMiniCluster(), 
jobID2, false);
+            String secondCheckpoint = 
cluster.getMiniCluster().triggerCheckpoint(jobID2).get();
+            cluster.getClusterClient().cancel(jobID2).get();
+
+            // delete the checkpoint we restored from
+            FileUtils.deleteDirectory(Paths.get(new 
URI(firstCheckpoint)).getParent().toFile());
+
+            // we should be able to restore from the second checkpoint even 
though it has been built
+            // on top of the first checkpoint
+            jobGraph.setSavepointRestoreSettings(
+                    SavepointRestoreSettings.forPath(
+                            secondCheckpoint, false, RestoreMode.NO_CLAIM));
+            final JobID jobID3 = new JobID();
+            jobGraph.setJobID(jobID3);
+            cluster.getClusterClient().submitJob(jobGraph).get();
+            CommonTestUtils.waitForAllTaskRunning(cluster.getMiniCluster(), 
jobID3, false);
+        } finally {
+            cluster.after();
+        }
+    }
+
     @Test
     public void 
testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath()
             throws Exception {

Reply via email to