This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7adeecd3445 [FLINK-34510][Runtime/State]Rename RestoreMode to 
RecoveryClaimMode (#25192)
7adeecd3445 is described below

commit 7adeecd3445947f42d3e3d1e2961b9464e910236
Author: lz <[email protected]>
AuthorDate: Tue Sep 10 10:32:06 2024 +0800

    [FLINK-34510][Runtime/State]Rename RestoreMode to RecoveryClaimMode (#25192)
---
 docs/static/generated/rest_v1_dispatcher.yml       | 16 ++---
 .../apache/flink/client/cli/CliFrontendParser.java | 16 ++---
 .../apache/flink/client/cli/ProgramOptions.java    |  6 +-
 .../flink/client/cli/CliFrontendRunTest.java       | 18 ++---
 .../StandaloneApplicationClusterEntryPoint.java    | 12 ++--
 .../flink/configuration/StateRecoveryOptions.java  |  8 +--
 .../{RestoreMode.java => RecoveryClaimMode.java}   |  8 +--
 .../KubernetesCheckpointRecoveryFactory.java       |  6 +-
 .../flink/kubernetes/utils/KubernetesUtils.java    |  8 +--
 .../runtime/webmonitor/handlers/JarRunHandler.java | 19 +++---
 .../webmonitor/handlers/JarRunRequestBody.java     | 25 +++----
 .../handlers/JarRunHandlerParameterTest.java       |  6 +-
 .../webmonitor/handlers/JarRunRequestBodyTest.java |  6 +-
 .../runtime/checkpoint/CheckpointCoordinator.java  |  4 +-
 .../runtime/checkpoint/CheckpointProperties.java   | 16 +++--
 .../checkpoint/CheckpointRecoveryFactory.java      |  6 +-
 .../runtime/checkpoint/CompletedCheckpoint.java    |  8 +--
 .../EmbeddedCompletedCheckpointStore.java          |  8 +--
 .../PerJobCheckpointRecoveryFactory.java           | 10 +--
 .../StandaloneCheckpointRecoveryFactory.java       |  6 +-
 .../StandaloneCompletedCheckpointStore.java        | 14 ++--
 .../ZooKeeperCheckpointRecoveryFactory.java        |  6 +-
 .../cleanup/CheckpointResourcesCleanupRunner.java  |  6 +-
 .../EmbeddedHaServicesWithLeadershipControl.java   |  4 +-
 .../runtime/jobgraph/SavepointConfigOptions.java   |  8 +--
 .../runtime/jobgraph/SavepointRestoreSettings.java | 41 +++++++-----
 .../flink/runtime/minicluster/MiniCluster.java     |  6 +-
 .../flink/runtime/scheduler/SchedulerUtils.java    |  8 +--
 .../flink/runtime/state/SharedStateRegistry.java   | 13 ++--
 .../runtime/state/SharedStateRegistryFactory.java  |  6 +-
 .../runtime/state/SharedStateRegistryImpl.java     |  6 +-
 .../apache/flink/runtime/state/StateBackend.java   |  6 +-
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |  8 +--
 .../flink/streaming/runtime/tasks/StreamTask.java  |  4 +-
 .../CheckpointCoordinatorFailureTest.java          |  4 +-
 .../CheckpointCoordinatorRestoringTest.java        |  8 ++-
 .../checkpoint/CheckpointCoordinatorTest.java      | 10 +--
 .../CheckpointCoordinatorTriggeringTest.java       |  8 +--
 .../checkpoint/CompletedCheckpointTest.java        |  8 ++-
 .../DefaultCompletedCheckpointStoreTest.java       |  4 +-
 .../checkpoint/PerJobCheckpointRecoveryTest.java   | 10 +--
 .../TestingCheckpointRecoveryFactory.java          |  4 +-
 .../ZooKeeperCompletedCheckpointStoreITCase.java   |  4 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java     |  4 +-
 .../dispatcher/DispatcherCleanupITCase.java        |  8 +--
 .../CheckpointResourcesCleanupRunnerTest.java      |  4 +-
 .../jobgraph/SavepointRestoreSettingsTest.java     |  6 +-
 .../runtime/scheduler/SchedulerUtilsTest.java      | 10 +--
 .../runtime/state/SharedStateRegistryTest.java     |  4 +-
 .../plan/nodes/exec/testutils/RestoreTestBase.java |  4 +-
 .../ChangelogRecoverySwitchStateBackendITCase.java |  4 +-
 .../ResumeCheckpointManuallyITCase.java            | 77 ++++++++++++----------
 .../test/checkpointing/SavepointFormatITCase.java  |  4 +-
 .../flink/test/checkpointing/SavepointITCase.java  | 11 ++--
 .../SnapshotFileMergingCompatibilityITCase.java    | 48 ++++++++------
 55 files changed, 313 insertions(+), 279 deletions(-)

diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index 1ea036fc53b..ccaa7c6bc24 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -2288,7 +2288,7 @@ components:
         allowNonRestoredState:
           type: boolean
         claimMode:
-          $ref: '#/components/schemas/RestoreMode'
+          $ref: '#/components/schemas/RecoveryClaimMode'
         entryClass:
           type: string
         flinkConfiguration:
@@ -2307,7 +2307,7 @@ components:
           items:
             type: string
         restoreMode:
-          $ref: '#/components/schemas/RestoreMode'
+          $ref: '#/components/schemas/RecoveryClaimMode'
         savepointPath:
           type: string
     JarRunResponseBody:
@@ -2773,6 +2773,12 @@ components:
           $ref: '#/components/schemas/Id'
     RawJson:
       type: object
+    RecoveryClaimMode:
+      type: string
+      enum:
+      - CLAIM
+      - NO_CLAIM
+      - LEGACY
     ResourceID:
       pattern: "[0-9a-f]{32}"
       type: string
@@ -2806,12 +2812,6 @@ components:
       - UNALIGNED_CHECKPOINT
       - SAVEPOINT
       - SYNC_SAVEPOINT
-    RestoreMode:
-      type: string
-      enum:
-      - CLAIM
-      - NO_CLAIM
-      - LEGACY
     RestoredCheckpointStatistics:
       type: object
       properties:
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index f7c0c4c0076..6730dceb29d 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -21,7 +21,7 @@ package org.apache.flink.client.cli;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.StateRecoveryOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import org.apache.commons.cli.CommandLine;
@@ -697,25 +697,25 @@ public class CliFrontendParser {
             String savepointPath = 
commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
             boolean allowNonRestoredState =
                     
commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
-            final RestoreMode restoreMode;
+            final RecoveryClaimMode recoveryClaimMode;
             if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
-                restoreMode =
+                recoveryClaimMode =
                         ConfigurationUtils.convertValue(
                                 
commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE),
-                                RestoreMode.class);
+                                RecoveryClaimMode.class);
             } else if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
-                restoreMode =
+                recoveryClaimMode =
                         ConfigurationUtils.convertValue(
                                 
commandLine.getOptionValue(SAVEPOINT_RESTORE_MODE),
-                                RestoreMode.class);
+                                RecoveryClaimMode.class);
                 System.out.printf(
                         "The option '%s' is deprecated. Please use '%s' 
instead.%n",
                         SAVEPOINT_RESTORE_MODE.getLongOpt(), 
SAVEPOINT_CLAIM_MODE.getLongOpt());
             } else {
-                restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue();
+                recoveryClaimMode = 
StateRecoveryOptions.RESTORE_MODE.defaultValue();
             }
             return SavepointRestoreSettings.forPath(
-                    savepointPath, allowNonRestoredState, restoreMode);
+                    savepointPath, allowNonRestoredState, recoveryClaimMode);
         } else {
             return SavepointRestoreSettings.none();
         }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 2bec145bb81..7a69ad99c4d 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import org.apache.commons.cli.CommandLine;
@@ -139,11 +139,11 @@ public class ProgramOptions extends CommandLineOptions {
         if (getJarFilePath() == null) {
             throw new CliArgsException("Java program should be specified a JAR 
file.");
         }
-        if (savepointSettings.getRestoreMode().equals(RestoreMode.LEGACY)) {
+        if 
(savepointSettings.getRecoveryClaimMode().equals(RecoveryClaimMode.LEGACY)) {
             System.out.printf(
                     "Warning: The %s restore mode is deprecated, please use %s 
or"
                             + " %s mode instead.%n",
-                    RestoreMode.LEGACY, RestoreMode.CLAIM, 
RestoreMode.NO_CLAIM);
+                    RecoveryClaimMode.LEGACY, RecoveryClaimMode.CLAIM, 
RecoveryClaimMode.NO_CLAIM);
         }
     }
 
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index af965f60f60..dad03880371 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.TestingClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.FlinkException;
 
@@ -139,35 +139,35 @@ public class CliFrontendRunTest extends 
CliFrontendTestBase {
 
     @Test
     void testClaimRestoreModeParsing() throws Exception {
-        testRestoreMode("-rm", "claim", RestoreMode.CLAIM);
+        testRestoreMode("-rm", "claim", RecoveryClaimMode.CLAIM);
     }
 
     @Test
     void testLegacyRestoreModeParsing() throws Exception {
-        testRestoreMode("-rm", "legacy", RestoreMode.LEGACY);
+        testRestoreMode("-rm", "legacy", RecoveryClaimMode.LEGACY);
     }
 
     @Test
     void testNoClaimRestoreModeParsing() throws Exception {
-        testRestoreMode("-rm", "no_claim", RestoreMode.NO_CLAIM);
+        testRestoreMode("-rm", "no_claim", RecoveryClaimMode.NO_CLAIM);
     }
 
     @Test
     void testClaimRestoreModeParsingLongOption() throws Exception {
-        testRestoreMode("--claimMode", "claim", RestoreMode.CLAIM);
+        testRestoreMode("--claimMode", "claim", RecoveryClaimMode.CLAIM);
     }
 
     @Test
     void testLegacyRestoreModeParsingLongOption() throws Exception {
-        testRestoreMode("--claimMode", "legacy", RestoreMode.LEGACY);
+        testRestoreMode("--claimMode", "legacy", RecoveryClaimMode.LEGACY);
     }
 
     @Test
     void testNoClaimRestoreModeParsingLongOption() throws Exception {
-        testRestoreMode("--claimMode", "no_claim", RestoreMode.NO_CLAIM);
+        testRestoreMode("--claimMode", "no_claim", RecoveryClaimMode.NO_CLAIM);
     }
 
-    private void testRestoreMode(String flag, String arg, RestoreMode 
expectedMode)
+    private void testRestoreMode(String flag, String arg, RecoveryClaimMode 
expectedMode)
             throws Exception {
         String[] parameters = {"-s", "expectedSavepointPath", "-n", flag, arg, 
getTestJarPath()};
 
@@ -179,7 +179,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase 
{
 
         SavepointRestoreSettings savepointSettings = 
executionOptions.getSavepointRestoreSettings();
         assertThat(savepointSettings.restoreSavepoint()).isTrue();
-        assertThat(savepointSettings.getRestoreMode()).isEqualTo(expectedMode);
+        
assertThat(savepointSettings.getRecoveryClaimMode()).isEqualTo(expectedMode);
         
assertThat(savepointSettings.getRestorePath()).isEqualTo("expectedSavepointPath");
         assertThat(savepointSettings.allowNonRestoredState()).isTrue();
     }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
index 8d1f731df07..1dbd12b6cfc 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.client.program.PackagedProgramRetriever;
 import org.apache.flink.client.program.artifact.ArtifactFetchManager;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptionsInternal;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
@@ -69,13 +69,13 @@ public final class StandaloneApplicationClusterEntryPoint 
extends ApplicationClu
         Configuration configuration = 
loadConfigurationFromClusterConfig(clusterConfiguration);
         if (clusterConfiguration
                 .getSavepointRestoreSettings()
-                .getRestoreMode()
-                .equals(RestoreMode.LEGACY)) {
+                .getRecoveryClaimMode()
+                .equals(RecoveryClaimMode.LEGACY)) {
             LOG.warn(
                     "The {} restore mode is deprecated, please use {} or {} 
mode instead.",
-                    RestoreMode.LEGACY,
-                    RestoreMode.CLAIM,
-                    RestoreMode.NO_CLAIM);
+                    RecoveryClaimMode.LEGACY,
+                    RecoveryClaimMode.CLAIM,
+                    RecoveryClaimMode.NO_CLAIM);
         }
         PackagedProgram program = null;
         try {
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java
index 3dd0d339602..e403b0c9c12 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -57,10 +57,10 @@ public class StateRecoveryOptions {
     /**
      * Describes the mode how Flink should restore from the given savepoint or 
retained checkpoint.
      */
-    public static final ConfigOption<RestoreMode> RESTORE_MODE =
+    public static final ConfigOption<RecoveryClaimMode> RESTORE_MODE =
             key("execution.state-recovery.claim-mode")
-                    .enumType(RestoreMode.class)
-                    .defaultValue(RestoreMode.DEFAULT)
+                    .enumType(RecoveryClaimMode.class)
+                    .defaultValue(RecoveryClaimMode.DEFAULT)
                     .withDeprecatedKeys("execution.savepoint-restore-mode")
                     .withDescription(
                             "Describes the mode how Flink should restore from 
the given"
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/RestoreMode.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/RecoveryClaimMode.java
similarity index 90%
rename from 
flink-core/src/main/java/org/apache/flink/core/execution/RestoreMode.java
rename to 
flink-core/src/main/java/org/apache/flink/core/execution/RecoveryClaimMode.java
index 8a4abb6e6bf..e4393890c5a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/RestoreMode.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/RecoveryClaimMode.java
@@ -27,11 +27,9 @@ import static 
org.apache.flink.configuration.description.TextElement.text;
 
 /**
  * Defines state files ownership when Flink restore from a given savepoint or 
retained checkpoint.
- * TODO: Rename 'RestoreMode' to 'RecoveryClaimMode' in Flink 2.0. Any related 
variable names should
- * be adjusted accordingly.
  */
 @PublicEvolving
-public enum RestoreMode implements DescribedEnum {
+public enum RecoveryClaimMode implements DescribedEnum {
     CLAIM(
             "Flink will take ownership of the given snapshot. It will clean 
the"
                     + " snapshot once it is subsumed by newer ones."),
@@ -51,7 +49,7 @@ public enum RestoreMode implements DescribedEnum {
 
     private final String description;
 
-    RestoreMode(String description) {
+    RecoveryClaimMode(String description) {
         this.description = description;
     }
 
@@ -61,5 +59,5 @@ public enum RestoreMode implements DescribedEnum {
         return text(description);
     }
 
-    public static final RestoreMode DEFAULT = NO_CLAIM;
+    public static final RecoveryClaimMode DEFAULT = NO_CLAIM;
 }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
index b908827ec8a..1a8468049e2 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.kubernetes.highavailability;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -83,7 +83,7 @@ public class KubernetesCheckpointRecoveryFactory implements 
CheckpointRecoveryFa
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor,
-            RestoreMode restoreMode)
+            RecoveryClaimMode recoveryClaimMode)
             throws Exception {
         final String configMapName = getConfigMapNameFunction.apply(jobID);
         KubernetesUtils.createConfigMapIfItDoesNotExist(kubeClient, 
configMapName, clusterId);
@@ -97,7 +97,7 @@ public class KubernetesCheckpointRecoveryFactory implements 
CheckpointRecoveryFa
                 maxNumberOfCheckpointsToRetain,
                 sharedStateRegistryFactory,
                 ioExecutor,
-                restoreMode);
+                recoveryClaimMode);
     }
 
     @Override
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
index d27515b8221..09ff6653d7f 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
@@ -23,7 +23,7 @@ import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.highavailability.KubernetesCheckpointStoreUtil;
 import 
org.apache.flink.kubernetes.highavailability.KubernetesJobGraphStoreUtil;
@@ -305,7 +305,7 @@ public class KubernetesUtils {
      * @param lockIdentity lock identity to check the leadership
      * @param maxNumberOfCheckpointsToRetain max number of checkpoints to 
retain on state store
      *     handle
-     * @param restoreMode the mode in which the job is restoring
+     * @param recoveryClaimMode the mode in which the job is restoring
      * @return a {@link DefaultCompletedCheckpointStore} with {@link 
KubernetesStateHandleStore}.
      * @throws Exception when create the storage helper failed
      */
@@ -318,7 +318,7 @@ public class KubernetesUtils {
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor,
-            RestoreMode restoreMode)
+            RecoveryClaimMode recoveryClaimMode)
             throws Exception {
 
         final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage =
@@ -342,7 +342,7 @@ public class KubernetesUtils {
                 stateHandleStore,
                 KubernetesCheckpointStoreUtil.INSTANCE,
                 checkpoints,
-                sharedStateRegistryFactory.create(ioExecutor, checkpoints, 
restoreMode),
+                sharedStateRegistryFactory.create(ioExecutor, checkpoints, 
recoveryClaimMode),
                 executor);
     }
 
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index f0835b223cb..865db07fda3 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -26,7 +26,7 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -149,27 +149,28 @@ public class JarRunHandler
                                                 request, 
SavepointPathQueryParameter.class)),
                         
effectiveConfiguration.get(StateRecoveryOptions.SAVEPOINT_PATH),
                         log);
-        final RestoreMode restoreMode =
-                Optional.ofNullable(requestBody.getRestoreMode())
+        final RecoveryClaimMode recoveryClaimMode =
+                Optional.ofNullable(requestBody.getRecoveryClaimMode())
                         .orElseGet(
                                 () ->
                                         effectiveConfiguration.get(
                                                 
StateRecoveryOptions.RESTORE_MODE));
         if (requestBody.isDeprecatedRestoreModeHasValue()) {
-            log.warn("The option 'restoreMode' is deprecated, please use 
'claimMode' instead.");
+            log.warn(
+                    "The option 'restoreMode' is deprecated, please use 
'recoveryClaimMode' instead.");
         }
-        if (restoreMode.equals(RestoreMode.LEGACY)) {
+        if (recoveryClaimMode.equals(RecoveryClaimMode.LEGACY)) {
             log.warn(
                     "The {} restore mode is deprecated, please use {} or {} 
mode instead.",
-                    RestoreMode.LEGACY,
-                    RestoreMode.CLAIM,
-                    RestoreMode.NO_CLAIM);
+                    RecoveryClaimMode.LEGACY,
+                    RecoveryClaimMode.CLAIM,
+                    RecoveryClaimMode.NO_CLAIM);
         }
         final SavepointRestoreSettings savepointRestoreSettings;
         if (savepointPath != null) {
             savepointRestoreSettings =
                     SavepointRestoreSettings.forPath(
-                            savepointPath, allowNonRestoredState, restoreMode);
+                            savepointPath, allowNonRestoredState, 
recoveryClaimMode);
         } else {
             savepointRestoreSettings = SavepointRestoreSettings.none();
         }
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
index cc0fbf92b4e..948d99e608f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -57,11 +57,11 @@ public class JarRunRequestBody extends JarRequestBody {
     @Nullable
     @Deprecated
     @Documentation.ExcludeFromDocumentation
-    private RestoreMode deprecatedRestoreMode;
+    private RecoveryClaimMode deprecatedRecoveryClaimMode;
 
     @JsonProperty(FIELD_NAME_SAVEPOINT_CLAIM_MODE)
     @Nullable
-    private RestoreMode restoreMode;
+    private RecoveryClaimMode recoveryClaimMode;
 
     public JarRunRequestBody() {
         this(null, null, null, null, null, null, null, null, null, null);
@@ -77,7 +77,7 @@ public class JarRunRequestBody extends JarRequestBody {
             @Nullable JobID jobId,
             @Nullable Boolean allowNonRestoredState,
             @Nullable String savepointPath,
-            @Nullable RestoreMode restoreMode,
+            @Nullable RecoveryClaimMode recoveryClaimMode,
             @Nullable Map<String, String> flinkConfiguration) {
         this(
                 entryClassName,
@@ -88,7 +88,7 @@ public class JarRunRequestBody extends JarRequestBody {
                 allowNonRestoredState,
                 savepointPath,
                 null,
-                restoreMode,
+                recoveryClaimMode,
                 flinkConfiguration);
     }
 
@@ -104,8 +104,9 @@ public class JarRunRequestBody extends JarRequestBody {
                     Boolean allowNonRestoredState,
             @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String 
savepointPath,
             @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE)
-                    RestoreMode deprecatedRestoreMode,
-            @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_CLAIM_MODE) 
RestoreMode restoreMode,
+                    RecoveryClaimMode deprecatedRecoveryClaimMode,
+            @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_CLAIM_MODE)
+                    RecoveryClaimMode recoveryClaimMode,
             @Nullable @JsonProperty(FIELD_NAME_FLINK_CONFIGURATION)
                     Map<String, String> flinkConfiguration) {
         super(
@@ -117,8 +118,8 @@ public class JarRunRequestBody extends JarRequestBody {
                 flinkConfiguration);
         this.allowNonRestoredState = allowNonRestoredState;
         this.savepointPath = savepointPath;
-        this.deprecatedRestoreMode = deprecatedRestoreMode;
-        this.restoreMode = restoreMode;
+        this.deprecatedRecoveryClaimMode = deprecatedRecoveryClaimMode;
+        this.recoveryClaimMode = recoveryClaimMode;
     }
 
     @Nullable
@@ -135,12 +136,12 @@ public class JarRunRequestBody extends JarRequestBody {
 
     @Nullable
     @JsonIgnore
-    public RestoreMode getRestoreMode() {
-        return restoreMode == null ? deprecatedRestoreMode : restoreMode;
+    public RecoveryClaimMode getRecoveryClaimMode() {
+        return recoveryClaimMode == null ? deprecatedRecoveryClaimMode : 
recoveryClaimMode;
     }
 
     @JsonIgnore
     public boolean isDeprecatedRestoreModeHasValue() {
-        return deprecatedRestoreMode != null;
+        return deprecatedRecoveryClaimMode != null;
     }
 }
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
index 4eecb9d174a..fb4de49f2b0 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -74,7 +74,7 @@ class JarRunHandlerParameterTest
         extends JarHandlerParameterTest<JarRunRequestBody, 
JarRunMessageParameters> {
     private static final boolean ALLOW_NON_RESTORED_STATE_QUERY = true;
     private static final String RESTORE_PATH = "/foo/bar";
-    private static final RestoreMode RESTORE_MODE = RestoreMode.CLAIM;
+    private static final RecoveryClaimMode RESTORE_MODE = 
RecoveryClaimMode.CLAIM;
 
     @RegisterExtension
     private static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_EXTENSION =
@@ -339,7 +339,7 @@ class JarRunHandlerParameterTest
 
         final SavepointRestoreSettings savepointRestoreSettings =
                 jobGraph.getSavepointRestoreSettings();
-        assertThat(savepointRestoreSettings.getRestoreMode())
+        assertThat(savepointRestoreSettings.getRecoveryClaimMode())
                 
.isEqualTo(FLINK_CONFIGURATION.get(StateRecoveryOptions.RESTORE_MODE));
         assertThat(savepointRestoreSettings.getRestorePath())
                 
.isEqualTo(FLINK_CONFIGURATION.get(StateRecoveryOptions.SAVEPOINT_PATH));
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
index c53eaaaba90..5b8ceaeaf96 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;
 
@@ -49,7 +49,7 @@ class JarRunRequestBodyTest extends 
RestRequestMarshallingTestBase<JarRunRequest
                 new JobID(),
                 true,
                 "foo/bar",
-                RestoreMode.CLAIM,
+                RecoveryClaimMode.CLAIM,
                 Collections.singletonMap("key", "value"));
     }
 
@@ -64,7 +64,7 @@ class JarRunRequestBodyTest extends 
RestRequestMarshallingTestBase<JarRunRequest
         assertThat(actual.getAllowNonRestoredState())
                 .isEqualTo(expected.getAllowNonRestoredState());
         
assertThat(actual.getSavepointPath()).isEqualTo(expected.getSavepointPath());
-        
assertThat(actual.getRestoreMode()).isEqualTo(expected.getRestoreMode());
+        
assertThat(actual.getRecoveryClaimMode()).isEqualTo(expected.getRecoveryClaimMode());
         assertThat(actual.getFlinkConfiguration().toMap())
                 
.containsExactlyEntriesOf(expected.getFlinkConfiguration().toMap());
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 1606cad4126..ca9209ec19c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1890,7 +1890,7 @@ public class CheckpointCoordinator {
 
         // convert to checkpoint so the system can fall back to it
         final CheckpointProperties checkpointProperties;
-        switch (restoreSettings.getRestoreMode()) {
+        switch (restoreSettings.getRecoveryClaimMode()) {
             case CLAIM:
                 checkpointProperties = this.checkpointProperties;
                 break;
@@ -1923,7 +1923,7 @@ public class CheckpointCoordinator {
         // because the latter might trigger subsumption so the ref counts must 
be up-to-date
         savepoint.registerSharedStatesAfterRestored(
                 completedCheckpointStore.getSharedStateRegistry(),
-                restoreSettings.getRestoreMode());
+                restoreSettings.getRecoveryClaimMode());
 
         completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
                 savepoint, checkpointsCleaner, this::scheduleTriggerRequest);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index d53720093ec..56d0830eaf4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 
 import java.io.Serializable;
@@ -93,7 +93,10 @@ public class CheckpointProperties implements Serializable {
         return forced;
     }
 
-    /** Returns whether the checkpoint should be restored in a {@link 
RestoreMode#NO_CLAIM} mode. */
+    /**
+     * Returns whether the checkpoint should be restored in a {@link 
RecoveryClaimMode#NO_CLAIM}
+     * mode.
+     */
     public boolean isUnclaimed() {
         return unclaimed;
     }
@@ -302,11 +305,12 @@ public class CheckpointProperties implements Serializable 
{
     }
 
     /**
-     * Creates the checkpoint properties for a snapshot restored in {@link 
RestoreMode#NO_CLAIM}.
-     * Those properties should not be used when triggering a 
checkpoint/savepoint. They're useful
-     * when restoring a {@link CompletedCheckpointStore} after a JM failover.
+     * Creates the checkpoint properties for a snapshot restored in {@link
+     * RecoveryClaimMode#NO_CLAIM}. Those properties should not be used when 
triggering a
+     * checkpoint/savepoint. They're useful when restoring a {@link 
CompletedCheckpointStore} after
+     * a JM failover.
      *
-     * @return Checkpoint properties for a snapshot restored in {@link 
RestoreMode#NO_CLAIM}.
+     * @return Checkpoint properties for a snapshot restored in {@link 
RecoveryClaimMode#NO_CLAIM}.
      */
     public static CheckpointProperties forUnclaimedSnapshot() {
         return new CheckpointProperties(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
index b06fa9427c6..288faf53f9b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 
@@ -38,7 +38,7 @@ public interface CheckpointRecoveryFactory {
      * @param sharedStateRegistryFactory Simple factory to produce {@link 
SharedStateRegistry}
      *     objects.
      * @param ioExecutor Executor used to run (async) deletes.
-     * @param restoreMode the claim mode with which the job is restoring.
+     * @param recoveryClaimMode the claim mode with which the job is restoring.
      * @return {@link CompletedCheckpointStore} instance for the job
      */
     CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
@@ -46,7 +46,7 @@ public interface CheckpointRecoveryFactory {
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor,
-            RestoreMode restoreMode)
+            RecoveryClaimMode recoveryClaimMode)
             throws Exception;
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index ae602655e61..82c688eac52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -250,13 +250,13 @@ public class CompletedCheckpoint implements Serializable, 
Checkpoint {
      * is added into the store.
      *
      * @param sharedStateRegistry The registry where shared states are 
registered
-     * @param restoreMode the mode in which this checkpoint was restored from
+     * @param recoveryClaimMode the mode in which this checkpoint was restored 
from
      */
     public void registerSharedStatesAfterRestored(
-            SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
+            SharedStateRegistry sharedStateRegistry, RecoveryClaimMode 
recoveryClaimMode) {
         // in claim mode we should not register any shared handles
         if (!props.isUnclaimed()) {
-            sharedStateRegistry.registerAllAfterRestored(this, restoreMode);
+            sharedStateRegistry.registerAllAfterRestored(this, 
recoveryClaimMode);
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index a7ba6afd313..df80f36ccea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.Executors;
@@ -61,18 +61,18 @@ public class EmbeddedCompletedCheckpointStore extends 
AbstractCompleteCheckpoint
                 maxRetainedCheckpoints,
                 Collections.emptyList(),
                 /* Using the default claim mode in tests to detect any 
breaking changes early. */
-                RestoreMode.DEFAULT);
+                RecoveryClaimMode.DEFAULT);
     }
 
     public EmbeddedCompletedCheckpointStore(
             int maxRetainedCheckpoints,
             Collection<CompletedCheckpoint> initialCheckpoints,
-            RestoreMode restoreMode) {
+            RecoveryClaimMode recoveryClaimMode) {
         this(
                 maxRetainedCheckpoints,
                 initialCheckpoints,
                 SharedStateRegistry.DEFAULT_FACTORY.create(
-                        Executors.directExecutor(), initialCheckpoints, 
restoreMode));
+                        Executors.directExecutor(), initialCheckpoints, 
recoveryClaimMode));
     }
 
     public EmbeddedCompletedCheckpointStore(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
index 7b37a641759..37eda00d60d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 
 import javax.annotation.Nullable;
@@ -43,7 +43,7 @@ public class PerJobCheckpointRecoveryFactory<T extends 
CompletedCheckpointStore>
     public static <T extends CompletedCheckpointStore>
             CheckpointRecoveryFactory 
withoutCheckpointStoreRecovery(IntFunction<T> storeFn) {
         return new PerJobCheckpointRecoveryFactory<>(
-                (maxCheckpoints, previous, sharedStateRegistry, ioExecutor, 
restoreMode) -> {
+                (maxCheckpoints, previous, sharedStateRegistry, ioExecutor, 
recoveryClaimMode) -> {
                     if (previous != null) {
                         throw new UnsupportedOperationException(
                                 "Checkpoint store recovery is not supported.");
@@ -77,7 +77,7 @@ public class PerJobCheckpointRecoveryFactory<T extends 
CompletedCheckpointStore>
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor,
-            RestoreMode restoreMode) {
+            RecoveryClaimMode recoveryClaimMode) {
         return store.compute(
                 jobId,
                 (key, previous) ->
@@ -86,7 +86,7 @@ public class PerJobCheckpointRecoveryFactory<T extends 
CompletedCheckpointStore>
                                 previous,
                                 sharedStateRegistryFactory,
                                 ioExecutor,
-                                restoreMode));
+                                recoveryClaimMode));
     }
 
     @Override
@@ -102,6 +102,6 @@ public class PerJobCheckpointRecoveryFactory<T extends 
CompletedCheckpointStore>
                 @Nullable StoreType previousStore,
                 SharedStateRegistryFactory sharedStateRegistryFactory,
                 Executor ioExecutor,
-                RestoreMode restoreMode);
+                RecoveryClaimMode recoveryClaimMode);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index cb78fa39442..e273151e519 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 
@@ -34,14 +34,14 @@ public class StandaloneCheckpointRecoveryFactory implements 
CheckpointRecoveryFa
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor,
-            RestoreMode restoreMode)
+            RecoveryClaimMode recoveryClaimMode)
             throws Exception {
 
         return new StandaloneCompletedCheckpointStore(
                 maxNumberOfCheckpointsToRetain,
                 sharedStateRegistryFactory,
                 ioExecutor,
-                restoreMode);
+                recoveryClaimMode);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 801dced7a95..e4b098b0a38 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
@@ -62,13 +62,13 @@ public class StandaloneCompletedCheckpointStore extends 
AbstractCompleteCheckpoi
                 SharedStateRegistry.DEFAULT_FACTORY,
                 Executors.directExecutor(),
                 /* Using the default mode in tests to detect any breaking 
changes early. */
-                RestoreMode.DEFAULT);
+                RecoveryClaimMode.DEFAULT);
     }
 
     /**
      * Creates {@link StandaloneCompletedCheckpointStore}.
      *
-     * @param restoreMode
+     * @param recoveryClaimMode
      * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints 
to retain (at least
      *     1). Adding more checkpoints than this results in older checkpoints 
being discarded.
      */
@@ -76,13 +76,13 @@ public class StandaloneCompletedCheckpointStore extends 
AbstractCompleteCheckpoi
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor,
-            RestoreMode restoreMode) {
+            RecoveryClaimMode recoveryClaimMode) {
         this(
                 maxNumberOfCheckpointsToRetain,
                 sharedStateRegistryFactory,
                 new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1),
                 ioExecutor,
-                restoreMode);
+                recoveryClaimMode);
     }
 
     private StandaloneCompletedCheckpointStore(
@@ -90,8 +90,8 @@ public class StandaloneCompletedCheckpointStore extends 
AbstractCompleteCheckpoi
             SharedStateRegistryFactory sharedStateRegistryFactory,
             ArrayDeque<CompletedCheckpoint> checkpoints,
             Executor ioExecutor,
-            RestoreMode restoreMode) {
-        super(sharedStateRegistryFactory.create(ioExecutor, checkpoints, 
restoreMode));
+            RecoveryClaimMode recoveryClaimMode) {
+        super(sharedStateRegistryFactory.create(ioExecutor, checkpoints, 
recoveryClaimMode));
         checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at 
least one checkpoint.");
         this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
         this.checkpoints = checkpoints;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 642d5a570d9..7f7494023f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -53,7 +53,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements 
CheckpointRecoveryFac
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor,
-            RestoreMode restoreMode)
+            RecoveryClaimMode recoveryClaimMode)
             throws Exception {
 
         return ZooKeeperUtils.createCompletedCheckpoints(
@@ -64,7 +64,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements 
CheckpointRecoveryFac
                 sharedStateRegistryFactory,
                 ioExecutor,
                 executor,
-                restoreMode);
+                recoveryClaimMode);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
index 59128d853b1..2211062a2ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
@@ -150,10 +150,10 @@ public class CheckpointResourcesCleanupRunner implements 
JobManagerRunner {
                         jobManagerConfiguration, LOG),
                 sharedStateRegistryFactory,
                 cleanupExecutor,
-                // Using RestoreMode.CLAIM to be able to discard shared state, 
if any.
+                // Using RecoveryClaimMode.CLAIM to be able to discard shared 
state, if any.
                 // Note that it also means that the original shared state 
might be discarded as well
                 // because the initial checkpoint might be subsumed.
-                RestoreMode.CLAIM);
+                RecoveryClaimMode.CLAIM);
     }
 
     private CheckpointIDCounter createCheckpointIDCounter() throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
index b9859ba0f27..69d97d18577 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
@@ -44,14 +44,14 @@ public class EmbeddedHaServicesWithLeadershipControl 
extends EmbeddedHaServices
                                 previous,
                                 stateRegistryFactory,
                                 ioExecutor,
-                                restoreMode) -> {
+                                recoveryClaimMode) -> {
                             List<CompletedCheckpoint> checkpoints =
                                     previous != null
                                             ? previous.getAllCheckpoints()
                                             : Collections.emptyList();
                             SharedStateRegistry stateRegistry =
                                     stateRegistryFactory.create(
-                                            ioExecutor, checkpoints, 
restoreMode);
+                                            ioExecutor, checkpoints, 
recoveryClaimMode);
                             if (previous != null) {
                                 if (!previous.getShutdownStatus().isPresent()) 
{
                                     throw new IllegalStateException(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
index 7121f1d506f..4431ba3791f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobgraph;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -56,10 +56,10 @@ public class SavepointConfigOptions {
     /**
      * Describes the mode how Flink should restore from the given savepoint or 
retained checkpoint.
      */
-    public static final ConfigOption<RestoreMode> RESTORE_MODE =
+    public static final ConfigOption<RecoveryClaimMode> RESTORE_MODE =
             key("execution.savepoint-restore-mode")
-                    .enumType(RestoreMode.class)
-                    .defaultValue(RestoreMode.DEFAULT)
+                    .enumType(RecoveryClaimMode.class)
+                    .defaultValue(RecoveryClaimMode.DEFAULT)
                     .withDescription(
                             "Describes the mode how Flink should restore from 
the given"
                                     + " savepoint or retained checkpoint.");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 663313bfb87..e7bdb2fdead 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobgraph;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.StateRecoveryOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 
 import javax.annotation.Nonnull;
 
@@ -37,7 +37,7 @@ public class SavepointRestoreSettings implements Serializable 
{
 
     /** No restore should happen. */
     private static final SavepointRestoreSettings NONE =
-            new SavepointRestoreSettings(null, false, RestoreMode.NO_CLAIM);
+            new SavepointRestoreSettings(null, false, 
RecoveryClaimMode.NO_CLAIM);
 
     /** Savepoint restore path. */
     private final String restorePath;
@@ -48,20 +48,22 @@ public class SavepointRestoreSettings implements 
Serializable {
      */
     private final boolean allowNonRestoredState;
 
-    private final @Nonnull RestoreMode restoreMode;
+    private final @Nonnull RecoveryClaimMode recoveryClaimMode;
 
     /**
      * Creates the restore settings.
      *
      * @param restorePath Savepoint restore path.
      * @param allowNonRestoredState Ignore unmapped state.
-     * @param restoreMode how to restore from the savepoint
+     * @param recoveryClaimMode how to restore from the savepoint
      */
     private SavepointRestoreSettings(
-            String restorePath, boolean allowNonRestoredState, @Nonnull 
RestoreMode restoreMode) {
+            String restorePath,
+            boolean allowNonRestoredState,
+            @Nonnull RecoveryClaimMode recoveryClaimMode) {
         this.restorePath = restorePath;
         this.allowNonRestoredState = allowNonRestoredState;
-        this.restoreMode = restoreMode;
+        this.recoveryClaimMode = recoveryClaimMode;
     }
 
     /**
@@ -94,8 +96,8 @@ public class SavepointRestoreSettings implements Serializable 
{
     }
 
     /** Tells how to restore from the given savepoint. */
-    public @Nonnull RestoreMode getRestoreMode() {
-        return restoreMode;
+    public @Nonnull RecoveryClaimMode getRecoveryClaimMode() {
+        return recoveryClaimMode;
     }
 
     @Override
@@ -110,13 +112,13 @@ public class SavepointRestoreSettings implements 
Serializable {
         SavepointRestoreSettings that = (SavepointRestoreSettings) o;
         return allowNonRestoredState == that.allowNonRestoredState
                 && Objects.equals(restorePath, that.restorePath)
-                && Objects.equals(restoreMode, that.restoreMode);
+                && Objects.equals(recoveryClaimMode, that.recoveryClaimMode);
     }
 
     @Override
     public int hashCode() {
         int result = restorePath != null ? restorePath.hashCode() : 0;
-        result = 31 * result + restoreMode.hashCode();
+        result = 31 * result + recoveryClaimMode.hashCode();
         result = 31 * result + (allowNonRestoredState ? 1 : 0);
         return result;
     }
@@ -130,8 +132,8 @@ public class SavepointRestoreSettings implements 
Serializable {
                     + '\''
                     + ", allowNonRestoredState="
                     + allowNonRestoredState
-                    + ", restoreMode="
-                    + restoreMode
+                    + ", recoveryClaimMode="
+                    + recoveryClaimMode
                     + ')';
         } else {
             return "SavepointRestoreSettings.none()";
@@ -160,9 +162,12 @@ public class SavepointRestoreSettings implements 
Serializable {
     }
 
     public static SavepointRestoreSettings forPath(
-            String savepointPath, boolean allowNonRestoredState, @Nonnull 
RestoreMode restoreMode) {
+            String savepointPath,
+            boolean allowNonRestoredState,
+            @Nonnull RecoveryClaimMode recoveryClaimMode) {
         checkNotNull(savepointPath, "Savepoint restore path.");
-        return new SavepointRestoreSettings(savepointPath, 
allowNonRestoredState, restoreMode);
+        return new SavepointRestoreSettings(
+                savepointPath, allowNonRestoredState, recoveryClaimMode);
     }
 
     // -------------------------- Parsing to and from a configuration object
@@ -175,7 +180,7 @@ public class SavepointRestoreSettings implements 
Serializable {
                 StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
                 savepointRestoreSettings.allowNonRestoredState());
         configuration.set(
-                StateRecoveryOptions.RESTORE_MODE, 
savepointRestoreSettings.getRestoreMode());
+                StateRecoveryOptions.RESTORE_MODE, 
savepointRestoreSettings.getRecoveryClaimMode());
         final String savepointPath = savepointRestoreSettings.getRestorePath();
         if (savepointPath != null) {
             configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, 
savepointPath);
@@ -186,9 +191,11 @@ public class SavepointRestoreSettings implements 
Serializable {
         final String savepointPath = 
configuration.get(StateRecoveryOptions.SAVEPOINT_PATH);
         final boolean allowNonRestored =
                 
configuration.get(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
-        final RestoreMode restoreMode = 
configuration.get(StateRecoveryOptions.RESTORE_MODE);
+        final RecoveryClaimMode recoveryClaimMode =
+                configuration.get(StateRecoveryOptions.RESTORE_MODE);
         return savepointPath == null
                 ? SavepointRestoreSettings.none()
-                : SavepointRestoreSettings.forPath(savepointPath, 
allowNonRestored, restoreMode);
+                : SavepointRestoreSettings.forPath(
+                        savepointPath, allowNonRestored, recoveryClaimMode);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2bfe899380d..4e26d729cbb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -33,7 +33,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.core.execution.CheckpointType;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.BlobClient;
@@ -1093,10 +1093,10 @@ public class MiniCluster implements AutoCloseableAsync {
         final SavepointRestoreSettings savepointRestoreSettings =
                 jobGraph.getSavepointRestoreSettings();
         if (overrideRestoreModeForChangelogStateBackend
-                && savepointRestoreSettings.getRestoreMode() == 
RestoreMode.NO_CLAIM) {
+                && savepointRestoreSettings.getRecoveryClaimMode() == 
RecoveryClaimMode.NO_CLAIM) {
             final Configuration conf = new Configuration();
             SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, 
conf);
-            conf.set(StateRecoveryOptions.RESTORE_MODE, RestoreMode.LEGACY);
+            conf.set(StateRecoveryOptions.RESTORE_MODE, 
RecoveryClaimMode.LEGACY);
             
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(conf));
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
index 13bdcd119d8..2456f4354d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
@@ -64,7 +64,7 @@ public final class SchedulerUtils {
                         ioExecutor,
                         log,
                         jobId,
-                        
jobGraph.getSavepointRestoreSettings().getRestoreMode());
+                        
jobGraph.getSavepointRestoreSettings().getRecoveryClaimMode());
             } catch (Exception e) {
                 throw new JobExecutionException(
                         jobId,
@@ -83,7 +83,7 @@ public final class SchedulerUtils {
             Executor ioExecutor,
             Logger log,
             JobID jobId,
-            RestoreMode restoreMode)
+            RecoveryClaimMode recoveryClaimMode)
             throws Exception {
         return recoveryFactory.createRecoveredCompletedCheckpointStore(
                 jobId,
@@ -91,7 +91,7 @@ public final class SchedulerUtils {
                         jobManagerConfig, log),
                 SharedStateRegistry.DEFAULT_FACTORY,
                 ioExecutor,
-                restoreMode);
+                recoveryClaimMode);
     }
 
     public static CheckpointIDCounter 
createCheckpointIDCounterIfCheckpointingIsEnabled(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 2cc7de88f98..8c76fbf6d97 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 
 import java.util.Set;
@@ -35,11 +35,12 @@ public interface SharedStateRegistry extends AutoCloseable {
 
     /** A singleton object for the default implementation of a {@link 
SharedStateRegistryFactory} */
     SharedStateRegistryFactory DEFAULT_FACTORY =
-            (deleteExecutor, checkpoints, restoreMode) -> {
+            (deleteExecutor, checkpoints, recoveryClaimMode) -> {
                 SharedStateRegistry sharedStateRegistry =
                         new SharedStateRegistryImpl(deleteExecutor);
                 for (CompletedCheckpoint checkpoint : checkpoints) {
-                    
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry, restoreMode);
+                    checkpoint.registerSharedStatesAfterRestored(
+                            sharedStateRegistry, recoveryClaimMode);
                 }
                 return sharedStateRegistry;
             };
@@ -87,7 +88,7 @@ public interface SharedStateRegistry extends AutoCloseable {
      * Register given shared states in the registry.
      *
      * <p>NOTE: For state from checkpoints from other jobs or runs (i.e. after 
recovery), please use
-     * {@link #registerAllAfterRestored(CompletedCheckpoint, RestoreMode)}
+     * {@link #registerAllAfterRestored(CompletedCheckpoint, 
RecoveryClaimMode)}
      *
      * @param stateHandles The shared states to register.
      * @param checkpointID which uses the states.
@@ -99,12 +100,12 @@ public interface SharedStateRegistry extends AutoCloseable 
{
      *
      * <p>After recovery from an incremental checkpoint, its state should NOT 
be discarded, even if
      * {@link #unregisterUnusedState(long) not used} anymore (unless 
recovering in {@link
-     * RestoreMode#CLAIM CLAIM} mode).
+     * RecoveryClaimMode#CLAIM CLAIM} mode).
      *
      * <p>This should hold for both cases: when recovering from that initial 
checkpoint; and from
      * any subsequent checkpoint derived from it.
      */
-    void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode 
mode);
+    void registerAllAfterRestored(CompletedCheckpoint checkpoint, 
RecoveryClaimMode mode);
 
     void checkpointCompleted(long checkpointId);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
index 288d60ed028..be62fdcf03e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 
 import java.util.Collection;
@@ -32,11 +32,11 @@ public interface SharedStateRegistryFactory {
      *
      * @param deleteExecutor executor used to run (async) deletes.
      * @param checkpoints whose shared state will be registered.
-     * @param restoreMode the mode in which the given checkpoints were restored
+     * @param recoveryClaimMode the mode in which the given checkpoints were 
restored
      * @return a SharedStateRegistry object
      */
     SharedStateRegistry create(
             Executor deleteExecutor,
             Collection<CompletedCheckpoint> checkpoints,
-            RestoreMode restoreMode);
+            RecoveryClaimMode recoveryClaimMode);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
index 42995d41e7c..9dae5b1c0cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SnapshotType.SharingFilesStrategy;
@@ -210,7 +210,7 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
     }
 
     @Override
-    public void registerAllAfterRestored(CompletedCheckpoint checkpoint, 
RestoreMode mode) {
+    public void registerAllAfterRestored(CompletedCheckpoint checkpoint, 
RecoveryClaimMode mode) {
         registerAll(checkpoint.getOperatorStates().values(), 
checkpoint.getCheckpointID());
         restoredCheckpointSharingStrategies.put(
                 checkpoint.getCheckpointID(),
@@ -222,7 +222,7 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
         // checking entry.createdByCheckpointID against it on checkpoint 
subsumption.
         // In CLAIM mode, the shared state of the initial checkpoints must be
         // discarded as soon as it becomes unused - so 
highestRetainCheckpointID is not updated.
-        if (mode != RestoreMode.CLAIM) {
+        if (mode != RecoveryClaimMode.CLAIM) {
             highestNotClaimedCheckpointID =
                     Math.max(highestNotClaimedCheckpointID, 
checkpoint.getCheckpointID());
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index e83d057abc9..c0c20e8193b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
@@ -157,13 +157,13 @@ public interface StateBackend extends 
java.io.Serializable {
     }
 
     /**
-     * Tells if a state backend supports the {@link RestoreMode#NO_CLAIM} mode.
+     * Tells if a state backend supports the {@link 
RecoveryClaimMode#NO_CLAIM} mode.
      *
      * <p>If a state backend supports {@code NO_CLAIM} mode, it should create 
an independent
      * snapshot when it receives {@link CheckpointType#FULL_CHECKPOINT} in 
{@link
      * Snapshotable#snapshot(long, long, CheckpointStreamFactory, 
CheckpointOptions)}.
      *
-     * @return If the state backend supports {@link RestoreMode#NO_CLAIM} mode.
+     * @return If the state backend supports {@link 
RecoveryClaimMode#NO_CLAIM} mode.
      */
     default boolean supportsNoClaimRestoreMode() {
         return false;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index c923bea12c8..d2944a95a74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
@@ -561,7 +561,7 @@ public class ZooKeeperUtils {
      * @param configuration {@link Configuration} object
      * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints 
to retain
      * @param executor to run ZooKeeper callbacks
-     * @param restoreMode the mode in which the job is being restored
+     * @param recoveryClaimMode the mode in which the job is being restored
      * @return {@link DefaultCompletedCheckpointStore} instance
      * @throws Exception if the completed checkpoint store cannot be created
      */
@@ -572,7 +572,7 @@ public class ZooKeeperUtils {
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor,
             Executor executor,
-            RestoreMode restoreMode)
+            RecoveryClaimMode recoveryClaimMode)
             throws Exception {
 
         checkNotNull(configuration, "Configuration");
@@ -592,7 +592,7 @@ public class ZooKeeperUtils {
                         ZooKeeperCheckpointStoreUtil.INSTANCE,
                         completedCheckpoints,
                         sharedStateRegistryFactory.create(
-                                ioExecutor, completedCheckpoints, restoreMode),
+                                ioExecutor, completedCheckpoints, 
recoveryClaimMode),
                         executor);
         LOG.info(
                 "Initialized {} in '{}' with {}.",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e830b8a0343..e8591ad5cc8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTim
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.fs.AutoCloseableRegistry;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.security.FlinkSecurityManager;
@@ -1475,7 +1475,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                             "Configured state backend (%s) does not support 
enforcing a full"
                                     + " snapshot. If you are restoring in %s 
mode, please"
                                     + " consider choosing %s mode.",
-                            stateBackend, RestoreMode.NO_CLAIM, 
RestoreMode.CLAIM));
+                            stateBackend, RecoveryClaimMode.NO_CLAIM, 
RecoveryClaimMode.CLAIM));
         } else if (checkpointOptions.getCheckpointType().isSavepoint()) {
             SavepointType savepointType = (SavepointType) 
checkpointOptions.getCheckpointType();
             if 
(!stateBackend.supportsSavepointFormat(savepointType.getFormatType())) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index f2ddf91c612..52cc0d1bcf2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
@@ -285,7 +285,7 @@ class CheckpointCoordinatorFailureTest {
         public FailingCompletedCheckpointStore(Exception addCheckpointFailure) 
{
             super(
                     SharedStateRegistry.DEFAULT_FACTORY.create(
-                            Executors.directExecutor(), emptyList(), 
RestoreMode.DEFAULT));
+                            Executors.directExecutor(), emptyList(), 
RecoveryClaimMode.DEFAULT));
             this.addCheckpointFailure = addCheckpointFailure;
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 484200c3676..1dd9f2f2b69 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.OperatorIDPair;
 import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
@@ -238,7 +238,9 @@ class CheckpointCoordinatorRestoringTest {
         final ExecutionGraph executionGraph = createExecutionGraph(vertices);
         final EmbeddedCompletedCheckpointStore store =
                 new EmbeddedCompletedCheckpointStore(
-                        completedCheckpoints.size(), completedCheckpoints, 
RestoreMode.DEFAULT);
+                        completedCheckpoints.size(),
+                        completedCheckpoints,
+                        RecoveryClaimMode.DEFAULT);
 
         // set up the coordinator and validate the initial state
         final CheckpointCoordinator coordinator =
@@ -780,7 +782,7 @@ class CheckpointCoordinatorRestoringTest {
         // set up the coordinator and validate the initial state
         SharedStateRegistry sharedStateRegistry =
                 SharedStateRegistry.DEFAULT_FACTORY.create(
-                        Executors.directExecutor(), emptyList(), 
RestoreMode.DEFAULT);
+                        Executors.directExecutor(), emptyList(), 
RecoveryClaimMode.DEFAULT);
         CheckpointCoordinator coord =
                 new CheckpointCoordinatorBuilder()
                         .setCompletedCheckpointStore(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1ac7cf7fe3f..c5ac3596a61 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -2911,7 +2911,7 @@ class CheckpointCoordinatorTest {
 
     @Test
     void testSharedStateRegistrationOnRestore() throws Exception {
-        for (RestoreMode restoreMode : RestoreMode.values()) {
+        for (RecoveryClaimMode recoveryClaimMode : RecoveryClaimMode.values()) 
{
             JobVertexID jobVertexID1 = new JobVertexID();
 
             int parallelism1 = 2;
@@ -2929,7 +2929,7 @@ class CheckpointCoordinatorTest {
                     SharedStateRegistry.DEFAULT_FACTORY.create(
                             
org.apache.flink.util.concurrent.Executors.directExecutor(),
                             checkpoints,
-                            restoreMode);
+                            recoveryClaimMode);
             final EmbeddedCompletedCheckpointStore store =
                     new EmbeddedCompletedCheckpointStore(10, checkpoints, 
firstInstance);
 
@@ -3035,7 +3035,7 @@ class CheckpointCoordinatorTest {
                     SharedStateRegistry.DEFAULT_FACTORY.create(
                             
org.apache.flink.util.concurrent.Executors.directExecutor(),
                             store.getAllCheckpoints(),
-                            restoreMode);
+                            recoveryClaimMode);
             final EmbeddedCompletedCheckpointStore secondStore =
                     new EmbeddedCompletedCheckpointStore(
                             10, store.getAllCheckpoints(), secondInstance);
@@ -3079,7 +3079,7 @@ class CheckpointCoordinatorTest {
             verifyDiscard(
                     sharedHandlesByCheckpoint,
                     cpId ->
-                            restoreMode == RestoreMode.CLAIM && cpId == 0
+                            recoveryClaimMode == RecoveryClaimMode.CLAIM && 
cpId == 0
                                     ? TernaryBoolean.TRUE
                                     : TernaryBoolean.FALSE);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 9d611970682..12b404bf42a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -211,7 +211,7 @@ class CheckpointCoordinatorTriggeringTest {
                 createCheckpointCoordinator(graph, checkpointStore, 
checkpointIDCounter);
         checkpointCoordinator.restoreSavepoint(
                 SavepointRestoreSettings.forPath(
-                        savepoint.getExternalPointer(), true, 
RestoreMode.NO_CLAIM),
+                        savepoint.getExternalPointer(), true, 
RecoveryClaimMode.NO_CLAIM),
                 graph.getAllVertices(),
                 this.getClass().getClassLoader());
         checkpointCoordinator.shutdown();
@@ -265,7 +265,7 @@ class CheckpointCoordinatorTriggeringTest {
                 createCheckpointCoordinator(graph, checkpointStore, 
checkpointIDCounter);
         checkpointCoordinator.restoreSavepoint(
                 SavepointRestoreSettings.forPath(
-                        savepoint.getExternalPointer(), true, 
RestoreMode.NO_CLAIM),
+                        savepoint.getExternalPointer(), true, 
RecoveryClaimMode.NO_CLAIM),
                 graph.getAllVertices(),
                 this.getClass().getClassLoader());
 
@@ -431,7 +431,7 @@ class CheckpointCoordinatorTriggeringTest {
                 createCheckpointCoordinator(graph, checkpointStore, 
checkpointIDCounter);
         checkpointCoordinator.restoreSavepoint(
                 SavepointRestoreSettings.forPath(
-                        savepoint.getExternalPointer(), true, 
RestoreMode.NO_CLAIM),
+                        savepoint.getExternalPointer(), true, 
RecoveryClaimMode.NO_CLAIM),
                 graph.getAllVertices(),
                 this.getClass().getClassLoader());
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index b518f3f948c..c7101adb38e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -230,7 +230,8 @@ public class CompletedCheckpointTest {
                         null);
 
         SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
-        checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry, 
RestoreMode.DEFAULT);
+        checkpoint.registerSharedStatesAfterRestored(
+                sharedStateRegistry, RecoveryClaimMode.DEFAULT);
         verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L);
     }
 
@@ -262,7 +263,8 @@ public class CompletedCheckpointTest {
                         null);
 
         SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
-        checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry, 
RestoreMode.DEFAULT);
+        checkpoint.registerSharedStatesAfterRestored(
+                sharedStateRegistry, RecoveryClaimMode.DEFAULT);
         verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L);
 
         // Subsume
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index 6658718ef7e..ef947dca6fd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.persistence.StateHandleStore;
 import 
org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper;
@@ -388,7 +388,7 @@ class DefaultCompletedCheckpointStoreTest {
                 SharedStateRegistry.DEFAULT_FACTORY.create(
                         
org.apache.flink.util.concurrent.Executors.directExecutor(),
                         emptyList(),
-                        RestoreMode.DEFAULT),
+                        RecoveryClaimMode.DEFAULT),
                 executorService);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
index 29df40671db..48ba3aa8fa7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.util.concurrent.Executors;
 
@@ -49,7 +49,7 @@ class PerJobCheckpointRecoveryTest {
                                 1,
                                 SharedStateRegistry.DEFAULT_FACTORY,
                                 Executors.directExecutor(),
-                                RestoreMode.DEFAULT))
+                                RecoveryClaimMode.DEFAULT))
                 .isSameAs(store);
         assertThatExceptionOfType(UnsupportedOperationException.class)
                 .isThrownBy(
@@ -59,7 +59,7 @@ class PerJobCheckpointRecoveryTest {
                                         1,
                                         SharedStateRegistry.DEFAULT_FACTORY,
                                         Executors.directExecutor(),
-                                        RestoreMode.DEFAULT));
+                                        RecoveryClaimMode.DEFAULT));
 
         final JobID secondJobId = new JobID();
         assertThat(
@@ -68,7 +68,7 @@ class PerJobCheckpointRecoveryTest {
                                 1,
                                 SharedStateRegistry.DEFAULT_FACTORY,
                                 Executors.directExecutor(),
-                                RestoreMode.DEFAULT))
+                                RecoveryClaimMode.DEFAULT))
                 .isSameAs(store);
         assertThatExceptionOfType(UnsupportedOperationException.class)
                 .isThrownBy(
@@ -78,6 +78,6 @@ class PerJobCheckpointRecoveryTest {
                                         1,
                                         SharedStateRegistry.DEFAULT_FACTORY,
                                         Executors.directExecutor(),
-                                        RestoreMode.DEFAULT));
+                                        RecoveryClaimMode.DEFAULT));
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
index bff19c303c7..d627bb6901d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
@@ -18,7 +18,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 
 import java.util.concurrent.Executor;
@@ -41,7 +41,7 @@ public class TestingCheckpointRecoveryFactory implements 
CheckpointRecoveryFacto
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor,
-            RestoreMode restoreMode) {
+            RecoveryClaimMode recoveryClaimMode) {
         return store;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index f8fd6ad6655..37723e0feea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.testutils.EachCallbackWrapper;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryImpl;
@@ -89,7 +89,7 @@ class ZooKeeperCompletedCheckpointStoreITCase extends 
CompletedCheckpointStoreTe
                 
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
                         checkpointsInZooKeeper, checkpointStoreUtil),
                 SharedStateRegistry.DEFAULT_FACTORY.create(
-                        Executors.directExecutor(), emptyList(), 
RestoreMode.DEFAULT),
+                        Executors.directExecutor(), emptyList(), 
RecoveryClaimMode.DEFAULT),
                 executor);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index f9811a597a9..d963f9cf460 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import 
org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
@@ -202,7 +202,7 @@ class ZooKeeperCompletedCheckpointStoreTest {
                 zooKeeperCheckpointStoreUtil,
                 Collections.emptyList(),
                 SharedStateRegistry.DEFAULT_FACTORY.create(
-                        Executors.directExecutor(), emptyList(), 
RestoreMode.DEFAULT),
+                        Executors.directExecutor(), emptyList(), 
RecoveryClaimMode.DEFAULT),
                 Executors.directExecutor());
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
index 12207ed7883..70346d0ba78 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.CleanupOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
@@ -90,7 +90,7 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
                                 previous,
                                 sharedStateRegistryFactory,
                                 ioExecutor,
-                                restoreMode) -> {
+                                recoveryClaimMode) -> {
                             if (previous != null) {
                                 // First job cleanup still succeeded for the
                                 // CompletedCheckpointStore because the 
JobGraph cleanup happens
@@ -103,7 +103,7 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
                                         sharedStateRegistryFactory.create(
                                                 ioExecutor,
                                                 previous.getAllCheckpoints(),
-                                                restoreMode));
+                                                recoveryClaimMode));
                             }
                             return new EmbeddedCompletedCheckpointStore(
                                     maxCheckpoints,
@@ -111,7 +111,7 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
                                     sharedStateRegistryFactory.create(
                                             ioExecutor,
                                             Collections.emptyList(),
-                                            RestoreMode.DEFAULT));
+                                            RecoveryClaimMode.DEFAULT));
                         }));
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
index 882177e419b..5d8f4ad7b2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -617,7 +617,7 @@ class CheckpointResourcesCleanupRunnerTest {
                 int maxNumberOfCheckpointsToRetain,
                 SharedStateRegistryFactory sharedStateRegistryFactory,
                 Executor ioExecutor,
-                RestoreMode restoreMode)
+                RecoveryClaimMode recoveryClaimMode)
                 throws Exception {
             creationLatch.await();
             return completedCheckpointStore;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java
index dd6b2a67b41..ba73417075a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobgraph;
 
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 
 import org.junit.jupiter.api.Test;
 
@@ -30,9 +30,9 @@ public class SavepointRestoreSettingsTest {
     @Test
     public void testEqualsWithDifferentRestoreMode() {
         SavepointRestoreSettings claimSettings =
-                SavepointRestoreSettings.forPath("/tmp", false, 
RestoreMode.CLAIM);
+                SavepointRestoreSettings.forPath("/tmp", false, 
RecoveryClaimMode.CLAIM);
         SavepointRestoreSettings noClaimSettings =
-                SavepointRestoreSettings.forPath("/tmp", false, 
RestoreMode.NO_CLAIM);
+                SavepointRestoreSettings.forPath("/tmp", false, 
RecoveryClaimMode.NO_CLAIM);
         assertNotEquals(claimSettings, noClaimSettings);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index e5ce3166a88..79113f7fb39 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -80,7 +80,7 @@ class SchedulerUtilsTest {
                         Executors.directExecutor(),
                         log,
                         new JobID(),
-                        RestoreMode.CLAIM);
+                        RecoveryClaimMode.CLAIM);
 
         
assertThat(completedCheckpointStore.getMaxNumberOfRetainedCheckpoints())
                 .isEqualTo(maxNumberOfCheckpointsToRetain);
@@ -106,7 +106,7 @@ class SchedulerUtilsTest {
                         Executors.directExecutor(),
                         log,
                         new JobID(),
-                        RestoreMode.CLAIM);
+                        RecoveryClaimMode.CLAIM);
 
         SharedStateRegistry sharedStateRegistry = 
checkpointStore.getSharedStateRegistry();
 
@@ -135,13 +135,13 @@ class SchedulerUtilsTest {
                     int maxNumberOfCheckpointsToRetain,
                     SharedStateRegistryFactory sharedStateRegistryFactory,
                     Executor ioExecutor,
-                    RestoreMode restoreMode) {
+                    RecoveryClaimMode recoveryClaimMode) {
                 List<CompletedCheckpoint> checkpoints = 
singletonList(checkpoint);
                 return new EmbeddedCompletedCheckpointStore(
                         maxNumberOfCheckpointsToRetain,
                         checkpoints,
                         sharedStateRegistryFactory.create(
-                                ioExecutor, checkpoints, RestoreMode.DEFAULT));
+                                ioExecutor, checkpoints, 
RecoveryClaimMode.DEFAULT));
             }
 
             @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 39d8779c1d4..59f2f6ebefe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
@@ -378,7 +378,7 @@ class SharedStateRegistryTest {
                         new TestCompletedCheckpointStorageLocation(),
                         null,
                         properties),
-                RestoreMode.DEFAULT);
+                RecoveryClaimMode.DEFAULT);
     }
 
     private static class TestSharedState implements TestStreamStateHandle {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
index 106516b11b8..af80fd24759 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
@@ -21,7 +21,7 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.testutils;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -278,7 +278,7 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
                     SavepointRestoreSettings.forPath(
                             getSavepointPath(program, metadata).toString(),
                             false,
-                            RestoreMode.NO_CLAIM);
+                            RecoveryClaimMode.NO_CLAIM);
         }
         SavepointRestoreSettings.toConfiguration(restoreSettings, 
settings.getConfiguration());
         settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, 
"rocksdb");
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java
index 7c9254d5ab6..a914ed03d28 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -176,6 +176,6 @@ public class ChangelogRecoverySwitchStateBackendITCase 
extends ChangelogRecovery
 
     private void setSavepointRestoreSettings(JobGraph jobGraph, String 
restorePath) {
         jobGraph.setSavepointRestoreSettings(
-                SavepointRestoreSettings.forPath(restorePath, false, 
RestoreMode.CLAIM));
+                SavepointRestoreSettings.forPath(restorePath, false, 
RecoveryClaimMode.CLAIM));
     }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 8f86af079d4..9c51b73e3df 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.StateBackend;
@@ -83,11 +83,11 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
     private static final int NUM_TASK_MANAGERS = 2;
     private static final int SLOTS_PER_TASK_MANAGER = 2;
 
-    @Parameterized.Parameter public RestoreMode restoreMode;
+    @Parameterized.Parameter public RecoveryClaimMode recoveryClaimMode;
 
-    @Parameterized.Parameters(name = "RestoreMode = {0}")
+    @Parameterized.Parameters(name = "RecoveryClaimMode = {0}")
     public static Object[] parameters() {
-        return RestoreMode.values();
+        return RecoveryClaimMode.values();
     }
 
     @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
@@ -100,7 +100,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                 null,
                 createRocksDBStateBackend(checkpointDir, true),
                 false,
-                restoreMode);
+                recoveryClaimMode);
     }
 
     @Test
@@ -111,7 +111,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                 null,
                 createRocksDBStateBackend(checkpointDir, false),
                 false,
-                restoreMode);
+                recoveryClaimMode);
     }
 
     @Test
@@ -123,7 +123,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                 null,
                 createRocksDBStateBackend(checkpointDir, true),
                 true,
-                restoreMode);
+                recoveryClaimMode);
     }
 
     @Test
@@ -135,21 +135,21 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                 null,
                 createRocksDBStateBackend(checkpointDir, false),
                 true,
-                restoreMode);
+                recoveryClaimMode);
     }
 
     @Test
     public void testExternalizedFSCheckpointsStandalone() throws Exception {
         final File checkpointDir = temporaryFolder.newFolder();
         testExternalizedCheckpoints(
-                checkpointDir, null, createFsStateBackend(checkpointDir), 
false, restoreMode);
+                checkpointDir, null, createFsStateBackend(checkpointDir), 
false, recoveryClaimMode);
     }
 
     @Test
     public void testExternalizedFSCheckpointsWithLocalRecoveryStandalone() 
throws Exception {
         final File checkpointDir = temporaryFolder.newFolder();
         testExternalizedCheckpoints(
-                checkpointDir, null, createFsStateBackend(checkpointDir), 
true, restoreMode);
+                checkpointDir, null, createFsStateBackend(checkpointDir), 
true, recoveryClaimMode);
     }
 
     @Test
@@ -161,7 +161,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     zkServer.getConnectString(),
                     createRocksDBStateBackend(checkpointDir, true),
                     false,
-                    restoreMode);
+                    recoveryClaimMode);
         }
     }
 
@@ -174,7 +174,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     zkServer.getConnectString(),
                     createRocksDBStateBackend(checkpointDir, false),
                     false,
-                    restoreMode);
+                    recoveryClaimMode);
         }
     }
 
@@ -188,7 +188,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     zkServer.getConnectString(),
                     createRocksDBStateBackend(checkpointDir, true),
                     true,
-                    restoreMode);
+                    recoveryClaimMode);
         }
     }
 
@@ -202,7 +202,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     zkServer.getConnectString(),
                     createRocksDBStateBackend(checkpointDir, false),
                     true,
-                    restoreMode);
+                    recoveryClaimMode);
         }
     }
 
@@ -215,7 +215,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     zkServer.getConnectString(),
                     createFsStateBackend(checkpointDir),
                     false,
-                    restoreMode);
+                    recoveryClaimMode);
         }
     }
 
@@ -228,7 +228,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     zkServer.getConnectString(),
                     createFsStateBackend(checkpointDir),
                     true,
-                    restoreMode);
+                    recoveryClaimMode);
         }
     }
 
@@ -244,7 +244,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                 newStateBackend,
                 previousStateBackend,
                 false,
-                restoreMode);
+                recoveryClaimMode);
     }
 
     @Test
@@ -260,7 +260,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                 newStateBackend,
                 previousStateBackend,
                 true,
-                restoreMode);
+                recoveryClaimMode);
     }
 
     @Test
@@ -276,7 +276,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     newStateBackend,
                     previousStateBackend,
                     false,
-                    restoreMode);
+                    recoveryClaimMode);
         }
     }
 
@@ -294,7 +294,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     newStateBackend,
                     previousStateBackend,
                     true,
-                    restoreMode);
+                    recoveryClaimMode);
         }
     }
 
@@ -313,7 +313,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
             String zooKeeperQuorum,
             StateBackend backend,
             boolean localRecovery,
-            RestoreMode restoreMode)
+            RecoveryClaimMode recoveryClaimMode)
             throws Exception {
         testExternalizedCheckpoints(
                 checkpointDir,
@@ -322,7 +322,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                 backend,
                 backend,
                 localRecovery,
-                restoreMode);
+                recoveryClaimMode);
     }
 
     private static void testExternalizedCheckpoints(
@@ -332,7 +332,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
             StateBackend backend2,
             StateBackend backend3,
             boolean localRecovery,
-            RestoreMode restoreMode)
+            RecoveryClaimMode recoveryClaimMode)
             throws Exception {
 
         final Configuration config = new Configuration();
@@ -371,12 +371,12 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
         try {
             // main test sequence:  start job -> eCP -> restore job -> eCP -> 
restore job
             String firstExternalCheckpoint =
-                    runJobAndGetExternalizedCheckpoint(backend1, null, 
cluster, restoreMode);
+                    runJobAndGetExternalizedCheckpoint(backend1, null, 
cluster, recoveryClaimMode);
             assertNotNull(firstExternalCheckpoint);
 
             String secondExternalCheckpoint =
                     runJobAndGetExternalizedCheckpoint(
-                            backend2, firstExternalCheckpoint, cluster, 
restoreMode);
+                            backend2, firstExternalCheckpoint, cluster, 
recoveryClaimMode);
             assertNotNull(secondExternalCheckpoint);
 
             String thirdExternalCheckpoint =
@@ -385,11 +385,11 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                             // in CLAIM mode, the previous run is only 
guaranteed to preserve the
                             // latest checkpoint; in NO_CLAIM/LEGACY, even the 
initial checkpoints
                             // must remain valid
-                            restoreMode == RestoreMode.CLAIM
+                            recoveryClaimMode == RecoveryClaimMode.CLAIM
                                     ? secondExternalCheckpoint
                                     : firstExternalCheckpoint,
                             cluster,
-                            restoreMode);
+                            recoveryClaimMode);
             assertNotNull(thirdExternalCheckpoint);
         } finally {
             cluster.after();
@@ -400,24 +400,35 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
             StateBackend backend,
             @Nullable String externalCheckpoint,
             MiniClusterWithClientResource cluster,
-            RestoreMode restoreMode)
+            RecoveryClaimMode recoveryClaimMode)
             throws Exception {
         // complete at least two checkpoints so that the initial checkpoint 
can be subsumed
         return runJobAndGetExternalizedCheckpoint(
-                backend, externalCheckpoint, cluster, restoreMode, new 
Configuration(), 2, true);
+                backend,
+                externalCheckpoint,
+                cluster,
+                recoveryClaimMode,
+                new Configuration(),
+                2,
+                true);
     }
 
     static String runJobAndGetExternalizedCheckpoint(
             StateBackend backend,
             @Nullable String externalCheckpoint,
             MiniClusterWithClientResource cluster,
-            RestoreMode restoreMode,
+            RecoveryClaimMode recoveryClaimMode,
             Configuration jobConfig,
             int consecutiveCheckpoints,
             boolean retainCheckpoints)
             throws Exception {
         JobGraph initialJobGraph =
-                getJobGraph(backend, externalCheckpoint, restoreMode, 
jobConfig, retainCheckpoints);
+                getJobGraph(
+                        backend,
+                        externalCheckpoint,
+                        recoveryClaimMode,
+                        jobConfig,
+                        retainCheckpoints);
         NotifyingInfiniteTupleSource.countDownLatch = new 
CountDownLatch(PARALLELISM);
         cluster.getClusterClient().submitJob(initialJobGraph).get();
 
@@ -440,7 +451,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
     private static JobGraph getJobGraph(
             StateBackend backend,
             @Nullable String externalCheckpoint,
-            RestoreMode restoreMode,
+            RecoveryClaimMode recoveryClaimMode,
             Configuration jobConfig,
             boolean retainCheckpoints) {
         final StreamExecutionEnvironment env =
@@ -470,7 +481,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
         // recover from previous iteration?
         if (externalCheckpoint != null) {
             jobGraph.setSavepointRestoreSettings(
-                    SavepointRestoreSettings.forPath(externalCheckpoint, 
false, restoreMode));
+                    SavepointRestoreSettings.forPath(externalCheckpoint, 
false, recoveryClaimMode));
         }
 
         return jobGraph;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
index b106011c085..e0214bf32f0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
@@ -31,7 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.StateChangelogOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
@@ -296,7 +296,7 @@ public class SavepointFormatITCase extends TestLogger {
         final JobGraph jobGraph = createJobGraph(config);
         jobGraph.setSavepointRestoreSettings(
                 SavepointRestoreSettings.forPath(
-                        renamedSavepointDir.toUri().toString(), false, 
RestoreMode.CLAIM));
+                        renamedSavepointDir.toUri().toString(), false, 
RecoveryClaimMode.CLAIM));
 
         final JobID jobId = jobGraph.getJobID();
         ClusterClient<?> client = cluster.getClusterClient();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index c8ed8d2807f..d2fd4fed8e2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -43,7 +43,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -421,7 +421,7 @@ public class SavepointITCase extends TestLogger {
         restoreJobAndVerifyState(
                 clusterFactory,
                 parallelism,
-                SavepointRestoreSettings.forPath(savepointPath, false, 
RestoreMode.CLAIM),
+                SavepointRestoreSettings.forPath(savepointPath, false, 
RecoveryClaimMode.CLAIM),
                 cluster -> {
                     cluster.after();
 
@@ -446,7 +446,7 @@ public class SavepointITCase extends TestLogger {
         restoreJobAndVerifyState(
                 clusterFactory,
                 parallelism,
-                SavepointRestoreSettings.forPath(savepointPath, false, 
RestoreMode.LEGACY),
+                SavepointRestoreSettings.forPath(savepointPath, false, 
RecoveryClaimMode.LEGACY),
                 cluster -> {
                     cluster.after();
 
@@ -533,7 +533,8 @@ public class SavepointITCase extends TestLogger {
 
             cluster.getClusterClient().cancel(jobID1).get();
             jobGraph.setSavepointRestoreSettings(
-                    SavepointRestoreSettings.forPath(firstCheckpoint, false, 
RestoreMode.NO_CLAIM));
+                    SavepointRestoreSettings.forPath(
+                            firstCheckpoint, false, 
RecoveryClaimMode.NO_CLAIM));
             final JobID jobID2 = new JobID();
             jobGraph.setJobID(jobID2);
             cluster.getClusterClient().submitJob(jobGraph).get();
@@ -548,7 +549,7 @@ public class SavepointITCase extends TestLogger {
             // on top of the first checkpoint
             jobGraph.setSavepointRestoreSettings(
                     SavepointRestoreSettings.forPath(
-                            secondCheckpoint, false, RestoreMode.NO_CLAIM));
+                            secondCheckpoint, false, 
RecoveryClaimMode.NO_CLAIM));
             final JobID jobID3 = new JobID();
             jobGraph.setJobID(jobID3);
             cluster.getClusterClient().submitJob(jobGraph).get();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
index 1030c314405..e5f5a0a19ac 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -67,36 +67,40 @@ public class SnapshotFileMergingCompatibilityITCase extends 
TestLogger {
     public static Collection<Object[]> parameters() {
         return Arrays.asList(
                 new Object[][] {
-                    {RestoreMode.CLAIM, true},
-                    {RestoreMode.CLAIM, false},
-                    {RestoreMode.NO_CLAIM, true},
-                    {RestoreMode.NO_CLAIM, false}
+                    {RecoveryClaimMode.CLAIM, true},
+                    {RecoveryClaimMode.CLAIM, false},
+                    {RecoveryClaimMode.NO_CLAIM, true},
+                    {RecoveryClaimMode.NO_CLAIM, false}
                 });
     }
 
-    @ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary = 
{1}")
+    @ParameterizedTest(name = "RecoveryClaimMode = {0}, 
fileMergingAcrossBoundary = {1}")
     @MethodSource("parameters")
     public void testSwitchFromDisablingToEnablingFileMerging(
-            RestoreMode restoreMode, boolean fileMergingAcrossBoundary, 
@TempDir Path checkpointDir)
+            RecoveryClaimMode recoveryClaimMode,
+            boolean fileMergingAcrossBoundary,
+            @TempDir Path checkpointDir)
             throws Exception {
         testSwitchingFileMerging(
-                checkpointDir, false, true, restoreMode, 
fileMergingAcrossBoundary);
+                checkpointDir, false, true, recoveryClaimMode, 
fileMergingAcrossBoundary);
     }
 
-    @ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary = 
{1}")
+    @ParameterizedTest(name = "RecoveryClaimMode = {0}, 
fileMergingAcrossBoundary = {1}")
     @MethodSource("parameters")
     public void testSwitchFromEnablingToDisablingFileMerging(
-            RestoreMode restoreMode, boolean fileMergingAcrossBoundary, 
@TempDir Path checkpointDir)
+            RecoveryClaimMode recoveryClaimMode,
+            boolean fileMergingAcrossBoundary,
+            @TempDir Path checkpointDir)
             throws Exception {
         testSwitchingFileMerging(
-                checkpointDir, true, false, restoreMode, 
fileMergingAcrossBoundary);
+                checkpointDir, true, false, recoveryClaimMode, 
fileMergingAcrossBoundary);
     }
 
     private void testSwitchingFileMerging(
             Path checkpointDir,
             boolean firstFileMergingSwitch,
             boolean secondFileMergingSwitch,
-            RestoreMode restoreMode,
+            RecoveryClaimMode recoveryClaimMode,
             boolean fileMergingAcrossBoundary)
             throws Exception {
         final Configuration config = new Configuration();
@@ -125,7 +129,7 @@ public class SnapshotFileMergingCompatibilityITCase extends 
TestLogger {
                             stateBackend1,
                             null,
                             firstCluster,
-                            restoreMode,
+                            recoveryClaimMode,
                             config,
                             consecutiveCheckpoint,
                             true);
@@ -155,7 +159,7 @@ public class SnapshotFileMergingCompatibilityITCase extends 
TestLogger {
                             stateBackend2,
                             firstCheckpoint,
                             secondCluster,
-                            restoreMode,
+                            recoveryClaimMode,
                             config,
                             consecutiveCheckpoint,
                             true);
@@ -165,7 +169,7 @@ public class SnapshotFileMergingCompatibilityITCase extends 
TestLogger {
             verifyCheckpointExistOrWaitDeleted(
                     firstCheckpoint,
                     determineFileExist(
-                            restoreMode, firstFileMergingSwitch, 
secondFileMergingSwitch),
+                            recoveryClaimMode, firstFileMergingSwitch, 
secondFileMergingSwitch),
                     firstFileMergingSwitch,
                     firstMetadata);
         } finally {
@@ -190,7 +194,7 @@ public class SnapshotFileMergingCompatibilityITCase extends 
TestLogger {
                             stateBackend3,
                             secondCheckpoint,
                             thirdCluster,
-                            restoreMode,
+                            recoveryClaimMode,
                             config,
                             consecutiveCheckpoint,
                             true);
@@ -200,7 +204,7 @@ public class SnapshotFileMergingCompatibilityITCase extends 
TestLogger {
             verifyCheckpointExistOrWaitDeleted(
                     secondCheckpoint,
                     determineFileExist(
-                            restoreMode, secondFileMergingSwitch, 
secondFileMergingSwitch),
+                            recoveryClaimMode, secondFileMergingSwitch, 
secondFileMergingSwitch),
                     secondFileMergingSwitch,
                     secondMetadata);
         } finally {
@@ -225,7 +229,7 @@ public class SnapshotFileMergingCompatibilityITCase extends 
TestLogger {
                             stateBackend4,
                             thirdCheckpoint,
                             fourthCluster,
-                            restoreMode,
+                            recoveryClaimMode,
                             config,
                             consecutiveCheckpoint,
                             false);
@@ -233,7 +237,7 @@ public class SnapshotFileMergingCompatibilityITCase extends 
TestLogger {
             verifyCheckpointExistOrWaitDeleted(
                     thirdCheckpoint,
                     determineFileExist(
-                            restoreMode, secondFileMergingSwitch, 
secondFileMergingSwitch),
+                            recoveryClaimMode, secondFileMergingSwitch, 
secondFileMergingSwitch),
                     secondFileMergingSwitch,
                     thirdMetadata);
             verifyCheckpointExistOrWaitDeleted(
@@ -274,8 +278,10 @@ public class SnapshotFileMergingCompatibilityITCase 
extends TestLogger {
     }
 
     private static TernaryBoolean determineFileExist(
-            RestoreMode mode, boolean lastFileMergingEnabled, boolean 
thisFileMergingEnabled) {
-        if (mode == RestoreMode.CLAIM) {
+            RecoveryClaimMode mode,
+            boolean lastFileMergingEnabled,
+            boolean thisFileMergingEnabled) {
+        if (mode == RecoveryClaimMode.CLAIM) {
             if (lastFileMergingEnabled || thisFileMergingEnabled) {
                 // file merging will not reference files from previous jobs.
                 return TernaryBoolean.FALSE;

Reply via email to