This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7adeecd3445 [FLINK-34510][Runtime/State]Rename RestoreMode to
RecoveryClaimMode (#25192)
7adeecd3445 is described below
commit 7adeecd3445947f42d3e3d1e2961b9464e910236
Author: lz <[email protected]>
AuthorDate: Tue Sep 10 10:32:06 2024 +0800
[FLINK-34510][Runtime/State]Rename RestoreMode to RecoveryClaimMode (#25192)
---
docs/static/generated/rest_v1_dispatcher.yml | 16 ++---
.../apache/flink/client/cli/CliFrontendParser.java | 16 ++---
.../apache/flink/client/cli/ProgramOptions.java | 6 +-
.../flink/client/cli/CliFrontendRunTest.java | 18 ++---
.../StandaloneApplicationClusterEntryPoint.java | 12 ++--
.../flink/configuration/StateRecoveryOptions.java | 8 +--
.../{RestoreMode.java => RecoveryClaimMode.java} | 8 +--
.../KubernetesCheckpointRecoveryFactory.java | 6 +-
.../flink/kubernetes/utils/KubernetesUtils.java | 8 +--
.../runtime/webmonitor/handlers/JarRunHandler.java | 19 +++---
.../webmonitor/handlers/JarRunRequestBody.java | 25 +++----
.../handlers/JarRunHandlerParameterTest.java | 6 +-
.../webmonitor/handlers/JarRunRequestBodyTest.java | 6 +-
.../runtime/checkpoint/CheckpointCoordinator.java | 4 +-
.../runtime/checkpoint/CheckpointProperties.java | 16 +++--
.../checkpoint/CheckpointRecoveryFactory.java | 6 +-
.../runtime/checkpoint/CompletedCheckpoint.java | 8 +--
.../EmbeddedCompletedCheckpointStore.java | 8 +--
.../PerJobCheckpointRecoveryFactory.java | 10 +--
.../StandaloneCheckpointRecoveryFactory.java | 6 +-
.../StandaloneCompletedCheckpointStore.java | 14 ++--
.../ZooKeeperCheckpointRecoveryFactory.java | 6 +-
.../cleanup/CheckpointResourcesCleanupRunner.java | 6 +-
.../EmbeddedHaServicesWithLeadershipControl.java | 4 +-
.../runtime/jobgraph/SavepointConfigOptions.java | 8 +--
.../runtime/jobgraph/SavepointRestoreSettings.java | 41 +++++++-----
.../flink/runtime/minicluster/MiniCluster.java | 6 +-
.../flink/runtime/scheduler/SchedulerUtils.java | 8 +--
.../flink/runtime/state/SharedStateRegistry.java | 13 ++--
.../runtime/state/SharedStateRegistryFactory.java | 6 +-
.../runtime/state/SharedStateRegistryImpl.java | 6 +-
.../apache/flink/runtime/state/StateBackend.java | 6 +-
.../apache/flink/runtime/util/ZooKeeperUtils.java | 8 +--
.../flink/streaming/runtime/tasks/StreamTask.java | 4 +-
.../CheckpointCoordinatorFailureTest.java | 4 +-
.../CheckpointCoordinatorRestoringTest.java | 8 ++-
.../checkpoint/CheckpointCoordinatorTest.java | 10 +--
.../CheckpointCoordinatorTriggeringTest.java | 8 +--
.../checkpoint/CompletedCheckpointTest.java | 8 ++-
.../DefaultCompletedCheckpointStoreTest.java | 4 +-
.../checkpoint/PerJobCheckpointRecoveryTest.java | 10 +--
.../TestingCheckpointRecoveryFactory.java | 4 +-
.../ZooKeeperCompletedCheckpointStoreITCase.java | 4 +-
.../ZooKeeperCompletedCheckpointStoreTest.java | 4 +-
.../dispatcher/DispatcherCleanupITCase.java | 8 +--
.../CheckpointResourcesCleanupRunnerTest.java | 4 +-
.../jobgraph/SavepointRestoreSettingsTest.java | 6 +-
.../runtime/scheduler/SchedulerUtilsTest.java | 10 +--
.../runtime/state/SharedStateRegistryTest.java | 4 +-
.../plan/nodes/exec/testutils/RestoreTestBase.java | 4 +-
.../ChangelogRecoverySwitchStateBackendITCase.java | 4 +-
.../ResumeCheckpointManuallyITCase.java | 77 ++++++++++++----------
.../test/checkpointing/SavepointFormatITCase.java | 4 +-
.../flink/test/checkpointing/SavepointITCase.java | 11 ++--
.../SnapshotFileMergingCompatibilityITCase.java | 48 ++++++++------
55 files changed, 313 insertions(+), 279 deletions(-)
diff --git a/docs/static/generated/rest_v1_dispatcher.yml
b/docs/static/generated/rest_v1_dispatcher.yml
index 1ea036fc53b..ccaa7c6bc24 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -2288,7 +2288,7 @@ components:
allowNonRestoredState:
type: boolean
claimMode:
- $ref: '#/components/schemas/RestoreMode'
+ $ref: '#/components/schemas/RecoveryClaimMode'
entryClass:
type: string
flinkConfiguration:
@@ -2307,7 +2307,7 @@ components:
items:
type: string
restoreMode:
- $ref: '#/components/schemas/RestoreMode'
+ $ref: '#/components/schemas/RecoveryClaimMode'
savepointPath:
type: string
JarRunResponseBody:
@@ -2773,6 +2773,12 @@ components:
$ref: '#/components/schemas/Id'
RawJson:
type: object
+ RecoveryClaimMode:
+ type: string
+ enum:
+ - CLAIM
+ - NO_CLAIM
+ - LEGACY
ResourceID:
pattern: "[0-9a-f]{32}"
type: string
@@ -2806,12 +2812,6 @@ components:
- UNALIGNED_CHECKPOINT
- SAVEPOINT
- SYNC_SAVEPOINT
- RestoreMode:
- type: string
- enum:
- - CLAIM
- - NO_CLAIM
- - LEGACY
RestoredCheckpointStatistics:
type: object
properties:
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index f7c0c4c0076..6730dceb29d 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -21,7 +21,7 @@ package org.apache.flink.client.cli;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.StateRecoveryOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.commons.cli.CommandLine;
@@ -697,25 +697,25 @@ public class CliFrontendParser {
String savepointPath =
commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
boolean allowNonRestoredState =
commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
- final RestoreMode restoreMode;
+ final RecoveryClaimMode recoveryClaimMode;
if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
- restoreMode =
+ recoveryClaimMode =
ConfigurationUtils.convertValue(
commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE),
- RestoreMode.class);
+ RecoveryClaimMode.class);
} else if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
- restoreMode =
+ recoveryClaimMode =
ConfigurationUtils.convertValue(
commandLine.getOptionValue(SAVEPOINT_RESTORE_MODE),
- RestoreMode.class);
+ RecoveryClaimMode.class);
System.out.printf(
"The option '%s' is deprecated. Please use '%s'
instead.%n",
SAVEPOINT_RESTORE_MODE.getLongOpt(),
SAVEPOINT_CLAIM_MODE.getLongOpt());
} else {
- restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue();
+ recoveryClaimMode =
StateRecoveryOptions.RESTORE_MODE.defaultValue();
}
return SavepointRestoreSettings.forPath(
- savepointPath, allowNonRestoredState, restoreMode);
+ savepointPath, allowNonRestoredState, recoveryClaimMode);
} else {
return SavepointRestoreSettings.none();
}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 2bec145bb81..7a69ad99c4d 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.commons.cli.CommandLine;
@@ -139,11 +139,11 @@ public class ProgramOptions extends CommandLineOptions {
if (getJarFilePath() == null) {
throw new CliArgsException("Java program should be specified a JAR
file.");
}
- if (savepointSettings.getRestoreMode().equals(RestoreMode.LEGACY)) {
+ if
(savepointSettings.getRecoveryClaimMode().equals(RecoveryClaimMode.LEGACY)) {
System.out.printf(
"Warning: The %s restore mode is deprecated, please use %s
or"
+ " %s mode instead.%n",
- RestoreMode.LEGACY, RestoreMode.CLAIM,
RestoreMode.NO_CLAIM);
+ RecoveryClaimMode.LEGACY, RecoveryClaimMode.CLAIM,
RecoveryClaimMode.NO_CLAIM);
}
}
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index af965f60f60..dad03880371 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.TestingClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.FlinkException;
@@ -139,35 +139,35 @@ public class CliFrontendRunTest extends
CliFrontendTestBase {
@Test
void testClaimRestoreModeParsing() throws Exception {
- testRestoreMode("-rm", "claim", RestoreMode.CLAIM);
+ testRestoreMode("-rm", "claim", RecoveryClaimMode.CLAIM);
}
@Test
void testLegacyRestoreModeParsing() throws Exception {
- testRestoreMode("-rm", "legacy", RestoreMode.LEGACY);
+ testRestoreMode("-rm", "legacy", RecoveryClaimMode.LEGACY);
}
@Test
void testNoClaimRestoreModeParsing() throws Exception {
- testRestoreMode("-rm", "no_claim", RestoreMode.NO_CLAIM);
+ testRestoreMode("-rm", "no_claim", RecoveryClaimMode.NO_CLAIM);
}
@Test
void testClaimRestoreModeParsingLongOption() throws Exception {
- testRestoreMode("--claimMode", "claim", RestoreMode.CLAIM);
+ testRestoreMode("--claimMode", "claim", RecoveryClaimMode.CLAIM);
}
@Test
void testLegacyRestoreModeParsingLongOption() throws Exception {
- testRestoreMode("--claimMode", "legacy", RestoreMode.LEGACY);
+ testRestoreMode("--claimMode", "legacy", RecoveryClaimMode.LEGACY);
}
@Test
void testNoClaimRestoreModeParsingLongOption() throws Exception {
- testRestoreMode("--claimMode", "no_claim", RestoreMode.NO_CLAIM);
+ testRestoreMode("--claimMode", "no_claim", RecoveryClaimMode.NO_CLAIM);
}
- private void testRestoreMode(String flag, String arg, RestoreMode
expectedMode)
+ private void testRestoreMode(String flag, String arg, RecoveryClaimMode
expectedMode)
throws Exception {
String[] parameters = {"-s", "expectedSavepointPath", "-n", flag, arg,
getTestJarPath()};
@@ -179,7 +179,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase
{
SavepointRestoreSettings savepointSettings =
executionOptions.getSavepointRestoreSettings();
assertThat(savepointSettings.restoreSavepoint()).isTrue();
- assertThat(savepointSettings.getRestoreMode()).isEqualTo(expectedMode);
+
assertThat(savepointSettings.getRecoveryClaimMode()).isEqualTo(expectedMode);
assertThat(savepointSettings.getRestorePath()).isEqualTo("expectedSavepointPath");
assertThat(savepointSettings.allowNonRestoredState()).isTrue();
}
diff --git
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
index 8d1f731df07..1dbd12b6cfc 100644
---
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
+++
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
@@ -28,7 +28,7 @@ import
org.apache.flink.client.program.PackagedProgramRetriever;
import org.apache.flink.client.program.artifact.ArtifactFetchManager;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptionsInternal;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
@@ -69,13 +69,13 @@ public final class StandaloneApplicationClusterEntryPoint
extends ApplicationClu
Configuration configuration =
loadConfigurationFromClusterConfig(clusterConfiguration);
if (clusterConfiguration
.getSavepointRestoreSettings()
- .getRestoreMode()
- .equals(RestoreMode.LEGACY)) {
+ .getRecoveryClaimMode()
+ .equals(RecoveryClaimMode.LEGACY)) {
LOG.warn(
"The {} restore mode is deprecated, please use {} or {}
mode instead.",
- RestoreMode.LEGACY,
- RestoreMode.CLAIM,
- RestoreMode.NO_CLAIM);
+ RecoveryClaimMode.LEGACY,
+ RecoveryClaimMode.CLAIM,
+ RecoveryClaimMode.NO_CLAIM);
}
PackagedProgram program = null;
try {
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java
index 3dd0d339602..e403b0c9c12 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import static org.apache.flink.configuration.ConfigOptions.key;
@@ -57,10 +57,10 @@ public class StateRecoveryOptions {
/**
* Describes the mode how Flink should restore from the given savepoint or
retained checkpoint.
*/
- public static final ConfigOption<RestoreMode> RESTORE_MODE =
+ public static final ConfigOption<RecoveryClaimMode> RESTORE_MODE =
key("execution.state-recovery.claim-mode")
- .enumType(RestoreMode.class)
- .defaultValue(RestoreMode.DEFAULT)
+ .enumType(RecoveryClaimMode.class)
+ .defaultValue(RecoveryClaimMode.DEFAULT)
.withDeprecatedKeys("execution.savepoint-restore-mode")
.withDescription(
"Describes the mode how Flink should restore from
the given"
diff --git
a/flink-core/src/main/java/org/apache/flink/core/execution/RestoreMode.java
b/flink-core/src/main/java/org/apache/flink/core/execution/RecoveryClaimMode.java
similarity index 90%
rename from
flink-core/src/main/java/org/apache/flink/core/execution/RestoreMode.java
rename to
flink-core/src/main/java/org/apache/flink/core/execution/RecoveryClaimMode.java
index 8a4abb6e6bf..e4393890c5a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/RestoreMode.java
+++
b/flink-core/src/main/java/org/apache/flink/core/execution/RecoveryClaimMode.java
@@ -27,11 +27,9 @@ import static
org.apache.flink.configuration.description.TextElement.text;
/**
* Defines state files ownership when Flink restore from a given savepoint or
retained checkpoint.
- * TODO: Rename 'RestoreMode' to 'RecoveryClaimMode' in Flink 2.0. Any related
variable names should
- * be adjusted accordingly.
*/
@PublicEvolving
-public enum RestoreMode implements DescribedEnum {
+public enum RecoveryClaimMode implements DescribedEnum {
CLAIM(
"Flink will take ownership of the given snapshot. It will clean
the"
+ " snapshot once it is subsumed by newer ones."),
@@ -51,7 +49,7 @@ public enum RestoreMode implements DescribedEnum {
private final String description;
- RestoreMode(String description) {
+ RecoveryClaimMode(String description) {
this.description = description;
}
@@ -61,5 +59,5 @@ public enum RestoreMode implements DescribedEnum {
return text(description);
}
- public static final RestoreMode DEFAULT = NO_CLAIM;
+ public static final RecoveryClaimMode DEFAULT = NO_CLAIM;
}
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
index b908827ec8a..1a8468049e2 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.kubernetes.highavailability;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -83,7 +83,7 @@ public class KubernetesCheckpointRecoveryFactory implements
CheckpointRecoveryFa
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
- RestoreMode restoreMode)
+ RecoveryClaimMode recoveryClaimMode)
throws Exception {
final String configMapName = getConfigMapNameFunction.apply(jobID);
KubernetesUtils.createConfigMapIfItDoesNotExist(kubeClient,
configMapName, clusterId);
@@ -97,7 +97,7 @@ public class KubernetesCheckpointRecoveryFactory implements
CheckpointRecoveryFa
maxNumberOfCheckpointsToRetain,
sharedStateRegistryFactory,
ioExecutor,
- restoreMode);
+ recoveryClaimMode);
}
@Override
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
index d27515b8221..09ff6653d7f 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
@@ -23,7 +23,7 @@ import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import
org.apache.flink.kubernetes.highavailability.KubernetesCheckpointStoreUtil;
import
org.apache.flink.kubernetes.highavailability.KubernetesJobGraphStoreUtil;
@@ -305,7 +305,7 @@ public class KubernetesUtils {
* @param lockIdentity lock identity to check the leadership
* @param maxNumberOfCheckpointsToRetain max number of checkpoints to
retain on state store
* handle
- * @param restoreMode the mode in which the job is restoring
+ * @param recoveryClaimMode the mode in which the job is restoring
* @return a {@link DefaultCompletedCheckpointStore} with {@link
KubernetesStateHandleStore}.
* @throws Exception when create the storage helper failed
*/
@@ -318,7 +318,7 @@ public class KubernetesUtils {
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
- RestoreMode restoreMode)
+ RecoveryClaimMode recoveryClaimMode)
throws Exception {
final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage =
@@ -342,7 +342,7 @@ public class KubernetesUtils {
stateHandleStore,
KubernetesCheckpointStoreUtil.INSTANCE,
checkpoints,
- sharedStateRegistryFactory.create(ioExecutor, checkpoints,
restoreMode),
+ sharedStateRegistryFactory.create(ioExecutor, checkpoints,
recoveryClaimMode),
executor);
}
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index f0835b223cb..865db07fda3 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -26,7 +26,7 @@ import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -149,27 +149,28 @@ public class JarRunHandler
request,
SavepointPathQueryParameter.class)),
effectiveConfiguration.get(StateRecoveryOptions.SAVEPOINT_PATH),
log);
- final RestoreMode restoreMode =
- Optional.ofNullable(requestBody.getRestoreMode())
+ final RecoveryClaimMode recoveryClaimMode =
+ Optional.ofNullable(requestBody.getRecoveryClaimMode())
.orElseGet(
() ->
effectiveConfiguration.get(
StateRecoveryOptions.RESTORE_MODE));
if (requestBody.isDeprecatedRestoreModeHasValue()) {
- log.warn("The option 'restoreMode' is deprecated, please use
'claimMode' instead.");
+ log.warn(
+ "The option 'restoreMode' is deprecated, please use
'recoveryClaimMode' instead.");
}
- if (restoreMode.equals(RestoreMode.LEGACY)) {
+ if (recoveryClaimMode.equals(RecoveryClaimMode.LEGACY)) {
log.warn(
"The {} restore mode is deprecated, please use {} or {}
mode instead.",
- RestoreMode.LEGACY,
- RestoreMode.CLAIM,
- RestoreMode.NO_CLAIM);
+ RecoveryClaimMode.LEGACY,
+ RecoveryClaimMode.CLAIM,
+ RecoveryClaimMode.NO_CLAIM);
}
final SavepointRestoreSettings savepointRestoreSettings;
if (savepointPath != null) {
savepointRestoreSettings =
SavepointRestoreSettings.forPath(
- savepointPath, allowNonRestoredState, restoreMode);
+ savepointPath, allowNonRestoredState,
recoveryClaimMode);
} else {
savepointRestoreSettings = SavepointRestoreSettings.none();
}
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
index cc0fbf92b4e..948d99e608f 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.rest.messages.RequestBody;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -57,11 +57,11 @@ public class JarRunRequestBody extends JarRequestBody {
@Nullable
@Deprecated
@Documentation.ExcludeFromDocumentation
- private RestoreMode deprecatedRestoreMode;
+ private RecoveryClaimMode deprecatedRecoveryClaimMode;
@JsonProperty(FIELD_NAME_SAVEPOINT_CLAIM_MODE)
@Nullable
- private RestoreMode restoreMode;
+ private RecoveryClaimMode recoveryClaimMode;
public JarRunRequestBody() {
this(null, null, null, null, null, null, null, null, null, null);
@@ -77,7 +77,7 @@ public class JarRunRequestBody extends JarRequestBody {
@Nullable JobID jobId,
@Nullable Boolean allowNonRestoredState,
@Nullable String savepointPath,
- @Nullable RestoreMode restoreMode,
+ @Nullable RecoveryClaimMode recoveryClaimMode,
@Nullable Map<String, String> flinkConfiguration) {
this(
entryClassName,
@@ -88,7 +88,7 @@ public class JarRunRequestBody extends JarRequestBody {
allowNonRestoredState,
savepointPath,
null,
- restoreMode,
+ recoveryClaimMode,
flinkConfiguration);
}
@@ -104,8 +104,9 @@ public class JarRunRequestBody extends JarRequestBody {
Boolean allowNonRestoredState,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String
savepointPath,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE)
- RestoreMode deprecatedRestoreMode,
- @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_CLAIM_MODE)
RestoreMode restoreMode,
+ RecoveryClaimMode deprecatedRecoveryClaimMode,
+ @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_CLAIM_MODE)
+ RecoveryClaimMode recoveryClaimMode,
@Nullable @JsonProperty(FIELD_NAME_FLINK_CONFIGURATION)
Map<String, String> flinkConfiguration) {
super(
@@ -117,8 +118,8 @@ public class JarRunRequestBody extends JarRequestBody {
flinkConfiguration);
this.allowNonRestoredState = allowNonRestoredState;
this.savepointPath = savepointPath;
- this.deprecatedRestoreMode = deprecatedRestoreMode;
- this.restoreMode = restoreMode;
+ this.deprecatedRecoveryClaimMode = deprecatedRecoveryClaimMode;
+ this.recoveryClaimMode = recoveryClaimMode;
}
@Nullable
@@ -135,12 +136,12 @@ public class JarRunRequestBody extends JarRequestBody {
@Nullable
@JsonIgnore
- public RestoreMode getRestoreMode() {
- return restoreMode == null ? deprecatedRestoreMode : restoreMode;
+ public RecoveryClaimMode getRecoveryClaimMode() {
+ return recoveryClaimMode == null ? deprecatedRecoveryClaimMode :
recoveryClaimMode;
}
@JsonIgnore
public boolean isDeprecatedRestoreModeHasValue() {
- return deprecatedRestoreMode != null;
+ return deprecatedRecoveryClaimMode != null;
}
}
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
index 4eecb9d174a..fb4de49f2b0 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -74,7 +74,7 @@ class JarRunHandlerParameterTest
extends JarHandlerParameterTest<JarRunRequestBody,
JarRunMessageParameters> {
private static final boolean ALLOW_NON_RESTORED_STATE_QUERY = true;
private static final String RESTORE_PATH = "/foo/bar";
- private static final RestoreMode RESTORE_MODE = RestoreMode.CLAIM;
+ private static final RecoveryClaimMode RESTORE_MODE =
RecoveryClaimMode.CLAIM;
@RegisterExtension
private static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_EXTENSION =
@@ -339,7 +339,7 @@ class JarRunHandlerParameterTest
final SavepointRestoreSettings savepointRestoreSettings =
jobGraph.getSavepointRestoreSettings();
- assertThat(savepointRestoreSettings.getRestoreMode())
+ assertThat(savepointRestoreSettings.getRecoveryClaimMode())
.isEqualTo(FLINK_CONFIGURATION.get(StateRecoveryOptions.RESTORE_MODE));
assertThat(savepointRestoreSettings.getRestorePath())
.isEqualTo(FLINK_CONFIGURATION.get(StateRecoveryOptions.SAVEPOINT_PATH));
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
index c53eaaaba90..5b8ceaeaf96 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
import
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;
@@ -49,7 +49,7 @@ class JarRunRequestBodyTest extends
RestRequestMarshallingTestBase<JarRunRequest
new JobID(),
true,
"foo/bar",
- RestoreMode.CLAIM,
+ RecoveryClaimMode.CLAIM,
Collections.singletonMap("key", "value"));
}
@@ -64,7 +64,7 @@ class JarRunRequestBodyTest extends
RestRequestMarshallingTestBase<JarRunRequest
assertThat(actual.getAllowNonRestoredState())
.isEqualTo(expected.getAllowNonRestoredState());
assertThat(actual.getSavepointPath()).isEqualTo(expected.getSavepointPath());
-
assertThat(actual.getRestoreMode()).isEqualTo(expected.getRestoreMode());
+
assertThat(actual.getRecoveryClaimMode()).isEqualTo(expected.getRecoveryClaimMode());
assertThat(actual.getFlinkConfiguration().toMap())
.containsExactlyEntriesOf(expected.getFlinkConfiguration().toMap());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 1606cad4126..ca9209ec19c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1890,7 +1890,7 @@ public class CheckpointCoordinator {
// convert to checkpoint so the system can fall back to it
final CheckpointProperties checkpointProperties;
- switch (restoreSettings.getRestoreMode()) {
+ switch (restoreSettings.getRecoveryClaimMode()) {
case CLAIM:
checkpointProperties = this.checkpointProperties;
break;
@@ -1923,7 +1923,7 @@ public class CheckpointCoordinator {
// because the latter might trigger subsumption so the ref counts must
be up-to-date
savepoint.registerSharedStatesAfterRestored(
completedCheckpointStore.getSharedStateRegistry(),
- restoreSettings.getRestoreMode());
+ restoreSettings.getRecoveryClaimMode());
completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
savepoint, checkpointsCleaner, this::scheduleTriggerRequest);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index d53720093ec..56d0830eaf4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import java.io.Serializable;
@@ -93,7 +93,10 @@ public class CheckpointProperties implements Serializable {
return forced;
}
- /** Returns whether the checkpoint should be restored in a {@link
RestoreMode#NO_CLAIM} mode. */
+ /**
+ * Returns whether the checkpoint should be restored in a {@link
RecoveryClaimMode#NO_CLAIM}
+ * mode.
+ */
public boolean isUnclaimed() {
return unclaimed;
}
@@ -302,11 +305,12 @@ public class CheckpointProperties implements Serializable
{
}
/**
- * Creates the checkpoint properties for a snapshot restored in {@link
RestoreMode#NO_CLAIM}.
- * Those properties should not be used when triggering a
checkpoint/savepoint. They're useful
- * when restoring a {@link CompletedCheckpointStore} after a JM failover.
+ * Creates the checkpoint properties for a snapshot restored in {@link
+ * RecoveryClaimMode#NO_CLAIM}. Those properties should not be used when
triggering a
+ * checkpoint/savepoint. They're useful when restoring a {@link
CompletedCheckpointStore} after
+ * a JM failover.
*
- * @return Checkpoint properties for a snapshot restored in {@link
RestoreMode#NO_CLAIM}.
+ * @return Checkpoint properties for a snapshot restored in {@link
RecoveryClaimMode#NO_CLAIM}.
*/
public static CheckpointProperties forUnclaimedSnapshot() {
return new CheckpointProperties(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
index b06fa9427c6..288faf53f9b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
@@ -38,7 +38,7 @@ public interface CheckpointRecoveryFactory {
* @param sharedStateRegistryFactory Simple factory to produce {@link
SharedStateRegistry}
* objects.
* @param ioExecutor Executor used to run (async) deletes.
- * @param restoreMode the claim mode with which the job is restoring.
+ * @param recoveryClaimMode the claim mode with which the job is restoring.
* @return {@link CompletedCheckpointStore} instance for the job
*/
CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
@@ -46,7 +46,7 @@ public interface CheckpointRecoveryFactory {
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
- RestoreMode restoreMode)
+ RecoveryClaimMode recoveryClaimMode)
throws Exception;
/**
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index ae602655e61..82c688eac52 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -250,13 +250,13 @@ public class CompletedCheckpoint implements Serializable,
Checkpoint {
* is added into the store.
*
* @param sharedStateRegistry The registry where shared states are
registered
- * @param restoreMode the mode in which this checkpoint was restored from
+ * @param recoveryClaimMode the mode in which this checkpoint was restored
from
*/
public void registerSharedStatesAfterRestored(
- SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
+ SharedStateRegistry sharedStateRegistry, RecoveryClaimMode
recoveryClaimMode) {
// in claim mode we should not register any shared handles
if (!props.isUnclaimed()) {
- sharedStateRegistry.registerAllAfterRestored(this, restoreMode);
+ sharedStateRegistry.registerAllAfterRestored(this,
recoveryClaimMode);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index a7ba6afd313..df80f36ccea 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
@@ -61,18 +61,18 @@ public class EmbeddedCompletedCheckpointStore extends
AbstractCompleteCheckpoint
maxRetainedCheckpoints,
Collections.emptyList(),
/* Using the default claim mode in tests to detect any
breaking changes early. */
- RestoreMode.DEFAULT);
+ RecoveryClaimMode.DEFAULT);
}
public EmbeddedCompletedCheckpointStore(
int maxRetainedCheckpoints,
Collection<CompletedCheckpoint> initialCheckpoints,
- RestoreMode restoreMode) {
+ RecoveryClaimMode recoveryClaimMode) {
this(
maxRetainedCheckpoints,
initialCheckpoints,
SharedStateRegistry.DEFAULT_FACTORY.create(
- Executors.directExecutor(), initialCheckpoints,
restoreMode));
+ Executors.directExecutor(), initialCheckpoints,
recoveryClaimMode));
}
public EmbeddedCompletedCheckpointStore(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
index 7b37a641759..37eda00d60d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import javax.annotation.Nullable;
@@ -43,7 +43,7 @@ public class PerJobCheckpointRecoveryFactory<T extends
CompletedCheckpointStore>
public static <T extends CompletedCheckpointStore>
CheckpointRecoveryFactory
withoutCheckpointStoreRecovery(IntFunction<T> storeFn) {
return new PerJobCheckpointRecoveryFactory<>(
- (maxCheckpoints, previous, sharedStateRegistry, ioExecutor,
restoreMode) -> {
+ (maxCheckpoints, previous, sharedStateRegistry, ioExecutor,
recoveryClaimMode) -> {
if (previous != null) {
throw new UnsupportedOperationException(
"Checkpoint store recovery is not supported.");
@@ -77,7 +77,7 @@ public class PerJobCheckpointRecoveryFactory<T extends
CompletedCheckpointStore>
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
- RestoreMode restoreMode) {
+ RecoveryClaimMode recoveryClaimMode) {
return store.compute(
jobId,
(key, previous) ->
@@ -86,7 +86,7 @@ public class PerJobCheckpointRecoveryFactory<T extends
CompletedCheckpointStore>
previous,
sharedStateRegistryFactory,
ioExecutor,
- restoreMode));
+ recoveryClaimMode));
}
@Override
@@ -102,6 +102,6 @@ public class PerJobCheckpointRecoveryFactory<T extends
CompletedCheckpointStore>
@Nullable StoreType previousStore,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
- RestoreMode restoreMode);
+ RecoveryClaimMode recoveryClaimMode);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index cb78fa39442..e273151e519 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
@@ -34,14 +34,14 @@ public class StandaloneCheckpointRecoveryFactory implements
CheckpointRecoveryFa
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
- RestoreMode restoreMode)
+ RecoveryClaimMode recoveryClaimMode)
throws Exception {
return new StandaloneCompletedCheckpointStore(
maxNumberOfCheckpointsToRetain,
sharedStateRegistryFactory,
ioExecutor,
- restoreMode);
+ recoveryClaimMode);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 801dced7a95..e4b098b0a38 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
@@ -62,13 +62,13 @@ public class StandaloneCompletedCheckpointStore extends
AbstractCompleteCheckpoi
SharedStateRegistry.DEFAULT_FACTORY,
Executors.directExecutor(),
/* Using the default mode in tests to detect any breaking
changes early. */
- RestoreMode.DEFAULT);
+ RecoveryClaimMode.DEFAULT);
}
/**
* Creates {@link StandaloneCompletedCheckpointStore}.
*
- * @param restoreMode
+ * @param recoveryClaimMode
* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints
to retain (at least
* 1). Adding more checkpoints than this results in older checkpoints
being discarded.
*/
@@ -76,13 +76,13 @@ public class StandaloneCompletedCheckpointStore extends
AbstractCompleteCheckpoi
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
- RestoreMode restoreMode) {
+ RecoveryClaimMode recoveryClaimMode) {
this(
maxNumberOfCheckpointsToRetain,
sharedStateRegistryFactory,
new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1),
ioExecutor,
- restoreMode);
+ recoveryClaimMode);
}
private StandaloneCompletedCheckpointStore(
@@ -90,8 +90,8 @@ public class StandaloneCompletedCheckpointStore extends
AbstractCompleteCheckpoi
SharedStateRegistryFactory sharedStateRegistryFactory,
ArrayDeque<CompletedCheckpoint> checkpoints,
Executor ioExecutor,
- RestoreMode restoreMode) {
- super(sharedStateRegistryFactory.create(ioExecutor, checkpoints,
restoreMode));
+ RecoveryClaimMode recoveryClaimMode) {
+ super(sharedStateRegistryFactory.create(ioExecutor, checkpoints,
recoveryClaimMode));
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at
least one checkpoint.");
this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
this.checkpoints = checkpoints;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 642d5a570d9..7f7494023f1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -53,7 +53,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements
CheckpointRecoveryFac
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
- RestoreMode restoreMode)
+ RecoveryClaimMode recoveryClaimMode)
throws Exception {
return ZooKeeperUtils.createCompletedCheckpoints(
@@ -64,7 +64,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements
CheckpointRecoveryFac
sharedStateRegistryFactory,
ioExecutor,
executor,
- restoreMode);
+ recoveryClaimMode);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
index 59128d853b1..2211062a2ca 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
@@ -150,10 +150,10 @@ public class CheckpointResourcesCleanupRunner implements
JobManagerRunner {
jobManagerConfiguration, LOG),
sharedStateRegistryFactory,
cleanupExecutor,
- // Using RestoreMode.CLAIM to be able to discard shared state,
if any.
+ // Using RecoveryClaimMode.CLAIM to be able to discard shared
state, if any.
// Note that it also means that the original shared state
might be discarded as well
// because the initial checkpoint might be subsumed.
- RestoreMode.CLAIM);
+ RecoveryClaimMode.CLAIM);
}
private CheckpointIDCounter createCheckpointIDCounter() throws Exception {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
index b9859ba0f27..69d97d18577 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
@@ -44,14 +44,14 @@ public class EmbeddedHaServicesWithLeadershipControl
extends EmbeddedHaServices
previous,
stateRegistryFactory,
ioExecutor,
- restoreMode) -> {
+ recoveryClaimMode) -> {
List<CompletedCheckpoint> checkpoints =
previous != null
? previous.getAllCheckpoints()
: Collections.emptyList();
SharedStateRegistry stateRegistry =
stateRegistryFactory.create(
- ioExecutor, checkpoints,
restoreMode);
+ ioExecutor, checkpoints,
recoveryClaimMode);
if (previous != null) {
if (!previous.getShutdownStatus().isPresent())
{
throw new IllegalStateException(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
index 7121f1d506f..4431ba3791f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobgraph;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import static org.apache.flink.configuration.ConfigOptions.key;
@@ -56,10 +56,10 @@ public class SavepointConfigOptions {
/**
* Describes the mode how Flink should restore from the given savepoint or
retained checkpoint.
*/
- public static final ConfigOption<RestoreMode> RESTORE_MODE =
+ public static final ConfigOption<RecoveryClaimMode> RESTORE_MODE =
key("execution.savepoint-restore-mode")
- .enumType(RestoreMode.class)
- .defaultValue(RestoreMode.DEFAULT)
+ .enumType(RecoveryClaimMode.class)
+ .defaultValue(RecoveryClaimMode.DEFAULT)
.withDescription(
"Describes the mode how Flink should restore from
the given"
+ " savepoint or retained checkpoint.");
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 663313bfb87..e7bdb2fdead 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobgraph;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateRecoveryOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import javax.annotation.Nonnull;
@@ -37,7 +37,7 @@ public class SavepointRestoreSettings implements Serializable
{
/** No restore should happen. */
private static final SavepointRestoreSettings NONE =
- new SavepointRestoreSettings(null, false, RestoreMode.NO_CLAIM);
+ new SavepointRestoreSettings(null, false,
RecoveryClaimMode.NO_CLAIM);
/** Savepoint restore path. */
private final String restorePath;
@@ -48,20 +48,22 @@ public class SavepointRestoreSettings implements
Serializable {
*/
private final boolean allowNonRestoredState;
- private final @Nonnull RestoreMode restoreMode;
+ private final @Nonnull RecoveryClaimMode recoveryClaimMode;
/**
* Creates the restore settings.
*
* @param restorePath Savepoint restore path.
* @param allowNonRestoredState Ignore unmapped state.
- * @param restoreMode how to restore from the savepoint
+ * @param recoveryClaimMode how to restore from the savepoint
*/
private SavepointRestoreSettings(
- String restorePath, boolean allowNonRestoredState, @Nonnull
RestoreMode restoreMode) {
+ String restorePath,
+ boolean allowNonRestoredState,
+ @Nonnull RecoveryClaimMode recoveryClaimMode) {
this.restorePath = restorePath;
this.allowNonRestoredState = allowNonRestoredState;
- this.restoreMode = restoreMode;
+ this.recoveryClaimMode = recoveryClaimMode;
}
/**
@@ -94,8 +96,8 @@ public class SavepointRestoreSettings implements Serializable
{
}
/** Tells how to restore from the given savepoint. */
- public @Nonnull RestoreMode getRestoreMode() {
- return restoreMode;
+ public @Nonnull RecoveryClaimMode getRecoveryClaimMode() {
+ return recoveryClaimMode;
}
@Override
@@ -110,13 +112,13 @@ public class SavepointRestoreSettings implements
Serializable {
SavepointRestoreSettings that = (SavepointRestoreSettings) o;
return allowNonRestoredState == that.allowNonRestoredState
&& Objects.equals(restorePath, that.restorePath)
- && Objects.equals(restoreMode, that.restoreMode);
+ && Objects.equals(recoveryClaimMode, that.recoveryClaimMode);
}
@Override
public int hashCode() {
int result = restorePath != null ? restorePath.hashCode() : 0;
- result = 31 * result + restoreMode.hashCode();
+ result = 31 * result + recoveryClaimMode.hashCode();
result = 31 * result + (allowNonRestoredState ? 1 : 0);
return result;
}
@@ -130,8 +132,8 @@ public class SavepointRestoreSettings implements
Serializable {
+ '\''
+ ", allowNonRestoredState="
+ allowNonRestoredState
- + ", restoreMode="
- + restoreMode
+ + ", recoveryClaimMode="
+ + recoveryClaimMode
+ ')';
} else {
return "SavepointRestoreSettings.none()";
@@ -160,9 +162,12 @@ public class SavepointRestoreSettings implements
Serializable {
}
public static SavepointRestoreSettings forPath(
- String savepointPath, boolean allowNonRestoredState, @Nonnull
RestoreMode restoreMode) {
+ String savepointPath,
+ boolean allowNonRestoredState,
+ @Nonnull RecoveryClaimMode recoveryClaimMode) {
checkNotNull(savepointPath, "Savepoint restore path.");
- return new SavepointRestoreSettings(savepointPath,
allowNonRestoredState, restoreMode);
+ return new SavepointRestoreSettings(
+ savepointPath, allowNonRestoredState, recoveryClaimMode);
}
// -------------------------- Parsing to and from a configuration object
@@ -175,7 +180,7 @@ public class SavepointRestoreSettings implements
Serializable {
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
savepointRestoreSettings.allowNonRestoredState());
configuration.set(
- StateRecoveryOptions.RESTORE_MODE,
savepointRestoreSettings.getRestoreMode());
+ StateRecoveryOptions.RESTORE_MODE,
savepointRestoreSettings.getRecoveryClaimMode());
final String savepointPath = savepointRestoreSettings.getRestorePath();
if (savepointPath != null) {
configuration.set(StateRecoveryOptions.SAVEPOINT_PATH,
savepointPath);
@@ -186,9 +191,11 @@ public class SavepointRestoreSettings implements
Serializable {
final String savepointPath =
configuration.get(StateRecoveryOptions.SAVEPOINT_PATH);
final boolean allowNonRestored =
configuration.get(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
- final RestoreMode restoreMode =
configuration.get(StateRecoveryOptions.RESTORE_MODE);
+ final RecoveryClaimMode recoveryClaimMode =
+ configuration.get(StateRecoveryOptions.RESTORE_MODE);
return savepointPath == null
? SavepointRestoreSettings.none()
- : SavepointRestoreSettings.forPath(savepointPath,
allowNonRestored, restoreMode);
+ : SavepointRestoreSettings.forPath(
+ savepointPath, allowNonRestored, recoveryClaimMode);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2bfe899380d..4e26d729cbb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -33,7 +33,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.execution.CheckpointType;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClient;
@@ -1093,10 +1093,10 @@ public class MiniCluster implements AutoCloseableAsync {
final SavepointRestoreSettings savepointRestoreSettings =
jobGraph.getSavepointRestoreSettings();
if (overrideRestoreModeForChangelogStateBackend
- && savepointRestoreSettings.getRestoreMode() ==
RestoreMode.NO_CLAIM) {
+ && savepointRestoreSettings.getRecoveryClaimMode() ==
RecoveryClaimMode.NO_CLAIM) {
final Configuration conf = new Configuration();
SavepointRestoreSettings.toConfiguration(savepointRestoreSettings,
conf);
- conf.set(StateRecoveryOptions.RESTORE_MODE, RestoreMode.LEGACY);
+ conf.set(StateRecoveryOptions.RESTORE_MODE,
RecoveryClaimMode.LEGACY);
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(conf));
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
index 13bdcd119d8..2456f4354d2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
@@ -64,7 +64,7 @@ public final class SchedulerUtils {
ioExecutor,
log,
jobId,
-
jobGraph.getSavepointRestoreSettings().getRestoreMode());
+
jobGraph.getSavepointRestoreSettings().getRecoveryClaimMode());
} catch (Exception e) {
throw new JobExecutionException(
jobId,
@@ -83,7 +83,7 @@ public final class SchedulerUtils {
Executor ioExecutor,
Logger log,
JobID jobId,
- RestoreMode restoreMode)
+ RecoveryClaimMode recoveryClaimMode)
throws Exception {
return recoveryFactory.createRecoveredCompletedCheckpointStore(
jobId,
@@ -91,7 +91,7 @@ public final class SchedulerUtils {
jobManagerConfig, log),
SharedStateRegistry.DEFAULT_FACTORY,
ioExecutor,
- restoreMode);
+ recoveryClaimMode);
}
public static CheckpointIDCounter
createCheckpointIDCounterIfCheckpointingIsEnabled(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 2cc7de88f98..8c76fbf6d97 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import java.util.Set;
@@ -35,11 +35,12 @@ public interface SharedStateRegistry extends AutoCloseable {
/** A singleton object for the default implementation of a {@link
SharedStateRegistryFactory} */
SharedStateRegistryFactory DEFAULT_FACTORY =
- (deleteExecutor, checkpoints, restoreMode) -> {
+ (deleteExecutor, checkpoints, recoveryClaimMode) -> {
SharedStateRegistry sharedStateRegistry =
new SharedStateRegistryImpl(deleteExecutor);
for (CompletedCheckpoint checkpoint : checkpoints) {
-
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry, restoreMode);
+ checkpoint.registerSharedStatesAfterRestored(
+ sharedStateRegistry, recoveryClaimMode);
}
return sharedStateRegistry;
};
@@ -87,7 +88,7 @@ public interface SharedStateRegistry extends AutoCloseable {
* Register given shared states in the registry.
*
* <p>NOTE: For state from checkpoints from other jobs or runs (i.e. after
recovery), please use
- * {@link #registerAllAfterRestored(CompletedCheckpoint, RestoreMode)}
+ * {@link #registerAllAfterRestored(CompletedCheckpoint,
RecoveryClaimMode)}
*
* @param stateHandles The shared states to register.
* @param checkpointID which uses the states.
@@ -99,12 +100,12 @@ public interface SharedStateRegistry extends AutoCloseable
{
*
* <p>After recovery from an incremental checkpoint, its state should NOT
be discarded, even if
* {@link #unregisterUnusedState(long) not used} anymore (unless
recovering in {@link
- * RestoreMode#CLAIM CLAIM} mode).
+ * RecoveryClaimMode#CLAIM CLAIM} mode).
*
* <p>This should hold for both cases: when recovering from that initial
checkpoint; and from
* any subsequent checkpoint derived from it.
*/
- void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode
mode);
+ void registerAllAfterRestored(CompletedCheckpoint checkpoint,
RecoveryClaimMode mode);
void checkpointCompleted(long checkpointId);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
index 288d60ed028..be62fdcf03e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import java.util.Collection;
@@ -32,11 +32,11 @@ public interface SharedStateRegistryFactory {
*
* @param deleteExecutor executor used to run (async) deletes.
* @param checkpoints whose shared state will be registered.
- * @param restoreMode the mode in which the given checkpoints were restored
+ * @param recoveryClaimMode the mode in which the given checkpoints were
restored
* @return a SharedStateRegistry object
*/
SharedStateRegistry create(
Executor deleteExecutor,
Collection<CompletedCheckpoint> checkpoints,
- RestoreMode restoreMode);
+ RecoveryClaimMode recoveryClaimMode);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
index 42995d41e7c..9dae5b1c0cc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.SnapshotType.SharingFilesStrategy;
@@ -210,7 +210,7 @@ public class SharedStateRegistryImpl implements
SharedStateRegistry {
}
@Override
- public void registerAllAfterRestored(CompletedCheckpoint checkpoint,
RestoreMode mode) {
+ public void registerAllAfterRestored(CompletedCheckpoint checkpoint,
RecoveryClaimMode mode) {
registerAll(checkpoint.getOperatorStates().values(),
checkpoint.getCheckpointID());
restoredCheckpointSharingStrategies.put(
checkpoint.getCheckpointID(),
@@ -222,7 +222,7 @@ public class SharedStateRegistryImpl implements
SharedStateRegistry {
// checking entry.createdByCheckpointID against it on checkpoint
subsumption.
// In CLAIM mode, the shared state of the initial checkpoints must be
// discarded as soon as it becomes unused - so
highestRetainCheckpointID is not updated.
- if (mode != RestoreMode.CLAIM) {
+ if (mode != RecoveryClaimMode.CLAIM) {
highestNotClaimedCheckpointID =
Math.max(highestNotClaimedCheckpointID,
checkpoint.getCheckpointID());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index e83d057abc9..c0c20e8193b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
@@ -157,13 +157,13 @@ public interface StateBackend extends
java.io.Serializable {
}
/**
- * Tells if a state backend supports the {@link RestoreMode#NO_CLAIM} mode.
+ * Tells if a state backend supports the {@link
RecoveryClaimMode#NO_CLAIM} mode.
*
* <p>If a state backend supports {@code NO_CLAIM} mode, it should create
an independent
* snapshot when it receives {@link CheckpointType#FULL_CHECKPOINT} in
{@link
* Snapshotable#snapshot(long, long, CheckpointStreamFactory,
CheckpointOptions)}.
*
- * @return If the state backend supports {@link RestoreMode#NO_CLAIM} mode.
+ * @return If the state backend supports {@link
RecoveryClaimMode#NO_CLAIM} mode.
*/
default boolean supportsNoClaimRestoreMode() {
return false;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index c923bea12c8..d2944a95a74 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
@@ -561,7 +561,7 @@ public class ZooKeeperUtils {
* @param configuration {@link Configuration} object
* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints
to retain
* @param executor to run ZooKeeper callbacks
- * @param restoreMode the mode in which the job is being restored
+ * @param recoveryClaimMode the mode in which the job is being restored
* @return {@link DefaultCompletedCheckpointStore} instance
* @throws Exception if the completed checkpoint store cannot be created
*/
@@ -572,7 +572,7 @@ public class ZooKeeperUtils {
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
Executor executor,
- RestoreMode restoreMode)
+ RecoveryClaimMode recoveryClaimMode)
throws Exception {
checkNotNull(configuration, "Configuration");
@@ -592,7 +592,7 @@ public class ZooKeeperUtils {
ZooKeeperCheckpointStoreUtil.INSTANCE,
completedCheckpoints,
sharedStateRegistryFactory.create(
- ioExecutor, completedCheckpoints, restoreMode),
+ ioExecutor, completedCheckpoints,
recoveryClaimMode),
executor);
LOG.info(
"Initialized {} in '{}' with {}.",
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e830b8a0343..e8591ad5cc8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -24,7 +24,7 @@ import
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTim
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.AutoCloseableRegistry;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.security.FlinkSecurityManager;
@@ -1475,7 +1475,7 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
"Configured state backend (%s) does not support
enforcing a full"
+ " snapshot. If you are restoring in %s
mode, please"
+ " consider choosing %s mode.",
- stateBackend, RestoreMode.NO_CLAIM,
RestoreMode.CLAIM));
+ stateBackend, RecoveryClaimMode.NO_CLAIM,
RecoveryClaimMode.CLAIM));
} else if (checkpointOptions.getCheckpointType().isSavepoint()) {
SavepointType savepointType = (SavepointType)
checkpointOptions.getCheckpointType();
if
(!stateBackend.supportsSavepointFormat(savepointType.getFormatType())) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index f2ddf91c612..52cc0d1bcf2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
@@ -285,7 +285,7 @@ class CheckpointCoordinatorFailureTest {
public FailingCompletedCheckpointStore(Exception addCheckpointFailure)
{
super(
SharedStateRegistry.DEFAULT_FACTORY.create(
- Executors.directExecutor(), emptyList(),
RestoreMode.DEFAULT));
+ Executors.directExecutor(), emptyList(),
RecoveryClaimMode.DEFAULT));
this.addCheckpointFailure = addCheckpointFailure;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 484200c3676..1dd9f2f2b69 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.OperatorIDPair;
import
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
@@ -238,7 +238,9 @@ class CheckpointCoordinatorRestoringTest {
final ExecutionGraph executionGraph = createExecutionGraph(vertices);
final EmbeddedCompletedCheckpointStore store =
new EmbeddedCompletedCheckpointStore(
- completedCheckpoints.size(), completedCheckpoints,
RestoreMode.DEFAULT);
+ completedCheckpoints.size(),
+ completedCheckpoints,
+ RecoveryClaimMode.DEFAULT);
// set up the coordinator and validate the initial state
final CheckpointCoordinator coordinator =
@@ -780,7 +782,7 @@ class CheckpointCoordinatorRestoringTest {
// set up the coordinator and validate the initial state
SharedStateRegistry sharedStateRegistry =
SharedStateRegistry.DEFAULT_FACTORY.create(
- Executors.directExecutor(), emptyList(),
RestoreMode.DEFAULT);
+ Executors.directExecutor(), emptyList(),
RecoveryClaimMode.DEFAULT);
CheckpointCoordinator coord =
new CheckpointCoordinatorBuilder()
.setCompletedCheckpointStore(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1ac7cf7fe3f..c5ac3596a61 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
@@ -2911,7 +2911,7 @@ class CheckpointCoordinatorTest {
@Test
void testSharedStateRegistrationOnRestore() throws Exception {
- for (RestoreMode restoreMode : RestoreMode.values()) {
+ for (RecoveryClaimMode recoveryClaimMode : RecoveryClaimMode.values())
{
JobVertexID jobVertexID1 = new JobVertexID();
int parallelism1 = 2;
@@ -2929,7 +2929,7 @@ class CheckpointCoordinatorTest {
SharedStateRegistry.DEFAULT_FACTORY.create(
org.apache.flink.util.concurrent.Executors.directExecutor(),
checkpoints,
- restoreMode);
+ recoveryClaimMode);
final EmbeddedCompletedCheckpointStore store =
new EmbeddedCompletedCheckpointStore(10, checkpoints,
firstInstance);
@@ -3035,7 +3035,7 @@ class CheckpointCoordinatorTest {
SharedStateRegistry.DEFAULT_FACTORY.create(
org.apache.flink.util.concurrent.Executors.directExecutor(),
store.getAllCheckpoints(),
- restoreMode);
+ recoveryClaimMode);
final EmbeddedCompletedCheckpointStore secondStore =
new EmbeddedCompletedCheckpointStore(
10, store.getAllCheckpoints(), secondInstance);
@@ -3079,7 +3079,7 @@ class CheckpointCoordinatorTest {
verifyDiscard(
sharedHandlesByCheckpoint,
cpId ->
- restoreMode == RestoreMode.CLAIM && cpId == 0
+ recoveryClaimMode == RecoveryClaimMode.CLAIM &&
cpId == 0
? TernaryBoolean.TRUE
: TernaryBoolean.FALSE);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 9d611970682..12b404bf42a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -211,7 +211,7 @@ class CheckpointCoordinatorTriggeringTest {
createCheckpointCoordinator(graph, checkpointStore,
checkpointIDCounter);
checkpointCoordinator.restoreSavepoint(
SavepointRestoreSettings.forPath(
- savepoint.getExternalPointer(), true,
RestoreMode.NO_CLAIM),
+ savepoint.getExternalPointer(), true,
RecoveryClaimMode.NO_CLAIM),
graph.getAllVertices(),
this.getClass().getClassLoader());
checkpointCoordinator.shutdown();
@@ -265,7 +265,7 @@ class CheckpointCoordinatorTriggeringTest {
createCheckpointCoordinator(graph, checkpointStore,
checkpointIDCounter);
checkpointCoordinator.restoreSavepoint(
SavepointRestoreSettings.forPath(
- savepoint.getExternalPointer(), true,
RestoreMode.NO_CLAIM),
+ savepoint.getExternalPointer(), true,
RecoveryClaimMode.NO_CLAIM),
graph.getAllVertices(),
this.getClass().getClassLoader());
@@ -431,7 +431,7 @@ class CheckpointCoordinatorTriggeringTest {
createCheckpointCoordinator(graph, checkpointStore,
checkpointIDCounter);
checkpointCoordinator.restoreSavepoint(
SavepointRestoreSettings.forPath(
- savepoint.getExternalPointer(), true,
RestoreMode.NO_CLAIM),
+ savepoint.getExternalPointer(), true,
RecoveryClaimMode.NO_CLAIM),
graph.getAllVertices(),
this.getClass().getClassLoader());
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index b518f3f948c..c7101adb38e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -230,7 +230,8 @@ public class CompletedCheckpointTest {
null);
SharedStateRegistry sharedStateRegistry = new
SharedStateRegistryImpl();
- checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry,
RestoreMode.DEFAULT);
+ checkpoint.registerSharedStatesAfterRestored(
+ sharedStateRegistry, RecoveryClaimMode.DEFAULT);
verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L);
}
@@ -262,7 +263,8 @@ public class CompletedCheckpointTest {
null);
SharedStateRegistry sharedStateRegistry = new
SharedStateRegistryImpl();
- checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry,
RestoreMode.DEFAULT);
+ checkpoint.registerSharedStatesAfterRestored(
+ sharedStateRegistry, RecoveryClaimMode.DEFAULT);
verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L);
// Subsume
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index 6658718ef7e..ef947dca6fd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.persistence.StateHandleStore;
import
org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper;
@@ -388,7 +388,7 @@ class DefaultCompletedCheckpointStoreTest {
SharedStateRegistry.DEFAULT_FACTORY.create(
org.apache.flink.util.concurrent.Executors.directExecutor(),
emptyList(),
- RestoreMode.DEFAULT),
+ RecoveryClaimMode.DEFAULT),
executorService);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
index 29df40671db..48ba3aa8fa7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.util.concurrent.Executors;
@@ -49,7 +49,7 @@ class PerJobCheckpointRecoveryTest {
1,
SharedStateRegistry.DEFAULT_FACTORY,
Executors.directExecutor(),
- RestoreMode.DEFAULT))
+ RecoveryClaimMode.DEFAULT))
.isSameAs(store);
assertThatExceptionOfType(UnsupportedOperationException.class)
.isThrownBy(
@@ -59,7 +59,7 @@ class PerJobCheckpointRecoveryTest {
1,
SharedStateRegistry.DEFAULT_FACTORY,
Executors.directExecutor(),
- RestoreMode.DEFAULT));
+ RecoveryClaimMode.DEFAULT));
final JobID secondJobId = new JobID();
assertThat(
@@ -68,7 +68,7 @@ class PerJobCheckpointRecoveryTest {
1,
SharedStateRegistry.DEFAULT_FACTORY,
Executors.directExecutor(),
- RestoreMode.DEFAULT))
+ RecoveryClaimMode.DEFAULT))
.isSameAs(store);
assertThatExceptionOfType(UnsupportedOperationException.class)
.isThrownBy(
@@ -78,6 +78,6 @@ class PerJobCheckpointRecoveryTest {
1,
SharedStateRegistry.DEFAULT_FACTORY,
Executors.directExecutor(),
- RestoreMode.DEFAULT));
+ RecoveryClaimMode.DEFAULT));
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
index bff19c303c7..d627bb6901d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import java.util.concurrent.Executor;
@@ -41,7 +41,7 @@ public class TestingCheckpointRecoveryFactory implements
CheckpointRecoveryFacto
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
- RestoreMode restoreMode) {
+ RecoveryClaimMode recoveryClaimMode) {
return store;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index f8fd6ad6655..37723e0feea 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
@@ -89,7 +89,7 @@ class ZooKeeperCompletedCheckpointStoreITCase extends
CompletedCheckpointStoreTe
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
checkpointsInZooKeeper, checkpointStoreUtil),
SharedStateRegistry.DEFAULT_FACTORY.create(
- Executors.directExecutor(), emptyList(),
RestoreMode.DEFAULT),
+ Executors.directExecutor(), emptyList(),
RecoveryClaimMode.DEFAULT),
executor);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index f9811a597a9..d963f9cf460 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.FlinkAssertions;
import
org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
@@ -202,7 +202,7 @@ class ZooKeeperCompletedCheckpointStoreTest {
zooKeeperCheckpointStoreUtil,
Collections.emptyList(),
SharedStateRegistry.DEFAULT_FACTORY.create(
- Executors.directExecutor(), emptyList(),
RestoreMode.DEFAULT),
+ Executors.directExecutor(), emptyList(),
RecoveryClaimMode.DEFAULT),
Executors.directExecutor());
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
index 12207ed7883..70346d0ba78 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CleanupOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
@@ -90,7 +90,7 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
previous,
sharedStateRegistryFactory,
ioExecutor,
- restoreMode) -> {
+ recoveryClaimMode) -> {
if (previous != null) {
// First job cleanup still succeeded for the
// CompletedCheckpointStore because the
JobGraph cleanup happens
@@ -103,7 +103,7 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
sharedStateRegistryFactory.create(
ioExecutor,
previous.getAllCheckpoints(),
- restoreMode));
+ recoveryClaimMode));
}
return new EmbeddedCompletedCheckpointStore(
maxCheckpoints,
@@ -111,7 +111,7 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
sharedStateRegistryFactory.create(
ioExecutor,
Collections.emptyList(),
- RestoreMode.DEFAULT));
+ RecoveryClaimMode.DEFAULT));
}));
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
index 882177e419b..5d8f4ad7b2e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -617,7 +617,7 @@ class CheckpointResourcesCleanupRunnerTest {
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
- RestoreMode restoreMode)
+ RecoveryClaimMode recoveryClaimMode)
throws Exception {
creationLatch.await();
return completedCheckpointStore;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java
index dd6b2a67b41..ba73417075a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.jobgraph;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.junit.jupiter.api.Test;
@@ -30,9 +30,9 @@ public class SavepointRestoreSettingsTest {
@Test
public void testEqualsWithDifferentRestoreMode() {
SavepointRestoreSettings claimSettings =
- SavepointRestoreSettings.forPath("/tmp", false,
RestoreMode.CLAIM);
+ SavepointRestoreSettings.forPath("/tmp", false,
RecoveryClaimMode.CLAIM);
SavepointRestoreSettings noClaimSettings =
- SavepointRestoreSettings.forPath("/tmp", false,
RestoreMode.NO_CLAIM);
+ SavepointRestoreSettings.forPath("/tmp", false,
RecoveryClaimMode.NO_CLAIM);
assertNotEquals(claimSettings, noClaimSettings);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index e5ce3166a88..79113f7fb39 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -80,7 +80,7 @@ class SchedulerUtilsTest {
Executors.directExecutor(),
log,
new JobID(),
- RestoreMode.CLAIM);
+ RecoveryClaimMode.CLAIM);
assertThat(completedCheckpointStore.getMaxNumberOfRetainedCheckpoints())
.isEqualTo(maxNumberOfCheckpointsToRetain);
@@ -106,7 +106,7 @@ class SchedulerUtilsTest {
Executors.directExecutor(),
log,
new JobID(),
- RestoreMode.CLAIM);
+ RecoveryClaimMode.CLAIM);
SharedStateRegistry sharedStateRegistry =
checkpointStore.getSharedStateRegistry();
@@ -135,13 +135,13 @@ class SchedulerUtilsTest {
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
- RestoreMode restoreMode) {
+ RecoveryClaimMode recoveryClaimMode) {
List<CompletedCheckpoint> checkpoints =
singletonList(checkpoint);
return new EmbeddedCompletedCheckpointStore(
maxNumberOfCheckpointsToRetain,
checkpoints,
sharedStateRegistryFactory.create(
- ioExecutor, checkpoints, RestoreMode.DEFAULT));
+ ioExecutor, checkpoints,
RecoveryClaimMode.DEFAULT));
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 39d8779c1d4..59f2f6ebefe 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
@@ -378,7 +378,7 @@ class SharedStateRegistryTest {
new TestCompletedCheckpointStorageLocation(),
null,
properties),
- RestoreMode.DEFAULT);
+ RecoveryClaimMode.DEFAULT);
}
private static class TestSharedState implements TestStreamStateHandle {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
index 106516b11b8..af80fd24759 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
@@ -21,7 +21,7 @@ package
org.apache.flink.table.planner.plan.nodes.exec.testutils;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -278,7 +278,7 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
SavepointRestoreSettings.forPath(
getSavepointPath(program, metadata).toString(),
false,
- RestoreMode.NO_CLAIM);
+ RecoveryClaimMode.NO_CLAIM);
}
SavepointRestoreSettings.toConfiguration(restoreSettings,
settings.getConfiguration());
settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND,
"rocksdb");
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java
index 7c9254d5ab6..a914ed03d28 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -176,6 +176,6 @@ public class ChangelogRecoverySwitchStateBackendITCase
extends ChangelogRecovery
private void setSavepointRestoreSettings(JobGraph jobGraph, String
restorePath) {
jobGraph.setSavepointRestoreSettings(
- SavepointRestoreSettings.forPath(restorePath, false,
RestoreMode.CLAIM));
+ SavepointRestoreSettings.forPath(restorePath, false,
RecoveryClaimMode.CLAIM));
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 8f86af079d4..9c51b73e3df 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -33,7 +33,7 @@ import
org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.StateBackend;
@@ -83,11 +83,11 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
private static final int NUM_TASK_MANAGERS = 2;
private static final int SLOTS_PER_TASK_MANAGER = 2;
- @Parameterized.Parameter public RestoreMode restoreMode;
+ @Parameterized.Parameter public RecoveryClaimMode recoveryClaimMode;
- @Parameterized.Parameters(name = "RestoreMode = {0}")
+ @Parameterized.Parameters(name = "RecoveryClaimMode = {0}")
public static Object[] parameters() {
- return RestoreMode.values();
+ return RecoveryClaimMode.values();
}
@ClassRule public static TemporaryFolder temporaryFolder = new
TemporaryFolder();
@@ -100,7 +100,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
null,
createRocksDBStateBackend(checkpointDir, true),
false,
- restoreMode);
+ recoveryClaimMode);
}
@Test
@@ -111,7 +111,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
null,
createRocksDBStateBackend(checkpointDir, false),
false,
- restoreMode);
+ recoveryClaimMode);
}
@Test
@@ -123,7 +123,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
null,
createRocksDBStateBackend(checkpointDir, true),
true,
- restoreMode);
+ recoveryClaimMode);
}
@Test
@@ -135,21 +135,21 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
null,
createRocksDBStateBackend(checkpointDir, false),
true,
- restoreMode);
+ recoveryClaimMode);
}
@Test
public void testExternalizedFSCheckpointsStandalone() throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
- checkpointDir, null, createFsStateBackend(checkpointDir),
false, restoreMode);
+ checkpointDir, null, createFsStateBackend(checkpointDir),
false, recoveryClaimMode);
}
@Test
public void testExternalizedFSCheckpointsWithLocalRecoveryStandalone()
throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
- checkpointDir, null, createFsStateBackend(checkpointDir),
true, restoreMode);
+ checkpointDir, null, createFsStateBackend(checkpointDir),
true, recoveryClaimMode);
}
@Test
@@ -161,7 +161,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
zkServer.getConnectString(),
createRocksDBStateBackend(checkpointDir, true),
false,
- restoreMode);
+ recoveryClaimMode);
}
}
@@ -174,7 +174,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
zkServer.getConnectString(),
createRocksDBStateBackend(checkpointDir, false),
false,
- restoreMode);
+ recoveryClaimMode);
}
}
@@ -188,7 +188,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
zkServer.getConnectString(),
createRocksDBStateBackend(checkpointDir, true),
true,
- restoreMode);
+ recoveryClaimMode);
}
}
@@ -202,7 +202,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
zkServer.getConnectString(),
createRocksDBStateBackend(checkpointDir, false),
true,
- restoreMode);
+ recoveryClaimMode);
}
}
@@ -215,7 +215,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
zkServer.getConnectString(),
createFsStateBackend(checkpointDir),
false,
- restoreMode);
+ recoveryClaimMode);
}
}
@@ -228,7 +228,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
zkServer.getConnectString(),
createFsStateBackend(checkpointDir),
true,
- restoreMode);
+ recoveryClaimMode);
}
}
@@ -244,7 +244,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
newStateBackend,
previousStateBackend,
false,
- restoreMode);
+ recoveryClaimMode);
}
@Test
@@ -260,7 +260,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
newStateBackend,
previousStateBackend,
true,
- restoreMode);
+ recoveryClaimMode);
}
@Test
@@ -276,7 +276,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
newStateBackend,
previousStateBackend,
false,
- restoreMode);
+ recoveryClaimMode);
}
}
@@ -294,7 +294,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
newStateBackend,
previousStateBackend,
true,
- restoreMode);
+ recoveryClaimMode);
}
}
@@ -313,7 +313,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
String zooKeeperQuorum,
StateBackend backend,
boolean localRecovery,
- RestoreMode restoreMode)
+ RecoveryClaimMode recoveryClaimMode)
throws Exception {
testExternalizedCheckpoints(
checkpointDir,
@@ -322,7 +322,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
backend,
backend,
localRecovery,
- restoreMode);
+ recoveryClaimMode);
}
private static void testExternalizedCheckpoints(
@@ -332,7 +332,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
StateBackend backend2,
StateBackend backend3,
boolean localRecovery,
- RestoreMode restoreMode)
+ RecoveryClaimMode recoveryClaimMode)
throws Exception {
final Configuration config = new Configuration();
@@ -371,12 +371,12 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
try {
// main test sequence: start job -> eCP -> restore job -> eCP ->
restore job
String firstExternalCheckpoint =
- runJobAndGetExternalizedCheckpoint(backend1, null,
cluster, restoreMode);
+ runJobAndGetExternalizedCheckpoint(backend1, null,
cluster, recoveryClaimMode);
assertNotNull(firstExternalCheckpoint);
String secondExternalCheckpoint =
runJobAndGetExternalizedCheckpoint(
- backend2, firstExternalCheckpoint, cluster,
restoreMode);
+ backend2, firstExternalCheckpoint, cluster,
recoveryClaimMode);
assertNotNull(secondExternalCheckpoint);
String thirdExternalCheckpoint =
@@ -385,11 +385,11 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
// in CLAIM mode, the previous run is only
guaranteed to preserve the
// latest checkpoint; in NO_CLAIM/LEGACY, even the
initial checkpoints
// must remain valid
- restoreMode == RestoreMode.CLAIM
+ recoveryClaimMode == RecoveryClaimMode.CLAIM
? secondExternalCheckpoint
: firstExternalCheckpoint,
cluster,
- restoreMode);
+ recoveryClaimMode);
assertNotNull(thirdExternalCheckpoint);
} finally {
cluster.after();
@@ -400,24 +400,35 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
StateBackend backend,
@Nullable String externalCheckpoint,
MiniClusterWithClientResource cluster,
- RestoreMode restoreMode)
+ RecoveryClaimMode recoveryClaimMode)
throws Exception {
// complete at least two checkpoints so that the initial checkpoint
can be subsumed
return runJobAndGetExternalizedCheckpoint(
- backend, externalCheckpoint, cluster, restoreMode, new
Configuration(), 2, true);
+ backend,
+ externalCheckpoint,
+ cluster,
+ recoveryClaimMode,
+ new Configuration(),
+ 2,
+ true);
}
static String runJobAndGetExternalizedCheckpoint(
StateBackend backend,
@Nullable String externalCheckpoint,
MiniClusterWithClientResource cluster,
- RestoreMode restoreMode,
+ RecoveryClaimMode recoveryClaimMode,
Configuration jobConfig,
int consecutiveCheckpoints,
boolean retainCheckpoints)
throws Exception {
JobGraph initialJobGraph =
- getJobGraph(backend, externalCheckpoint, restoreMode,
jobConfig, retainCheckpoints);
+ getJobGraph(
+ backend,
+ externalCheckpoint,
+ recoveryClaimMode,
+ jobConfig,
+ retainCheckpoints);
NotifyingInfiniteTupleSource.countDownLatch = new
CountDownLatch(PARALLELISM);
cluster.getClusterClient().submitJob(initialJobGraph).get();
@@ -440,7 +451,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
private static JobGraph getJobGraph(
StateBackend backend,
@Nullable String externalCheckpoint,
- RestoreMode restoreMode,
+ RecoveryClaimMode recoveryClaimMode,
Configuration jobConfig,
boolean retainCheckpoints) {
final StreamExecutionEnvironment env =
@@ -470,7 +481,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
// recover from previous iteration?
if (externalCheckpoint != null) {
jobGraph.setSavepointRestoreSettings(
- SavepointRestoreSettings.forPath(externalCheckpoint,
false, restoreMode));
+ SavepointRestoreSettings.forPath(externalCheckpoint,
false, recoveryClaimMode));
}
return jobGraph;
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
index b106011c085..e0214bf32f0 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
@@ -31,7 +31,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
@@ -296,7 +296,7 @@ public class SavepointFormatITCase extends TestLogger {
final JobGraph jobGraph = createJobGraph(config);
jobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(
- renamedSavepointDir.toUri().toString(), false,
RestoreMode.CLAIM));
+ renamedSavepointDir.toUri().toString(), false,
RecoveryClaimMode.CLAIM));
final JobID jobId = jobGraph.getJobID();
ClusterClient<?> client = cluster.getClusterClient();
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index c8ed8d2807f..d2fd4fed8e2 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -43,7 +43,7 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
@@ -421,7 +421,7 @@ public class SavepointITCase extends TestLogger {
restoreJobAndVerifyState(
clusterFactory,
parallelism,
- SavepointRestoreSettings.forPath(savepointPath, false,
RestoreMode.CLAIM),
+ SavepointRestoreSettings.forPath(savepointPath, false,
RecoveryClaimMode.CLAIM),
cluster -> {
cluster.after();
@@ -446,7 +446,7 @@ public class SavepointITCase extends TestLogger {
restoreJobAndVerifyState(
clusterFactory,
parallelism,
- SavepointRestoreSettings.forPath(savepointPath, false,
RestoreMode.LEGACY),
+ SavepointRestoreSettings.forPath(savepointPath, false,
RecoveryClaimMode.LEGACY),
cluster -> {
cluster.after();
@@ -533,7 +533,8 @@ public class SavepointITCase extends TestLogger {
cluster.getClusterClient().cancel(jobID1).get();
jobGraph.setSavepointRestoreSettings(
- SavepointRestoreSettings.forPath(firstCheckpoint, false,
RestoreMode.NO_CLAIM));
+ SavepointRestoreSettings.forPath(
+ firstCheckpoint, false,
RecoveryClaimMode.NO_CLAIM));
final JobID jobID2 = new JobID();
jobGraph.setJobID(jobID2);
cluster.getClusterClient().submitJob(jobGraph).get();
@@ -548,7 +549,7 @@ public class SavepointITCase extends TestLogger {
// on top of the first checkpoint
jobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(
- secondCheckpoint, false, RestoreMode.NO_CLAIM));
+ secondCheckpoint, false,
RecoveryClaimMode.NO_CLAIM));
final JobID jobID3 = new JobID();
jobGraph.setJobID(jobID3);
cluster.getClusterClient().submitJob(jobGraph).get();
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
index 1030c314405..e5f5a0a19ac 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -67,36 +67,40 @@ public class SnapshotFileMergingCompatibilityITCase extends
TestLogger {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[][] {
- {RestoreMode.CLAIM, true},
- {RestoreMode.CLAIM, false},
- {RestoreMode.NO_CLAIM, true},
- {RestoreMode.NO_CLAIM, false}
+ {RecoveryClaimMode.CLAIM, true},
+ {RecoveryClaimMode.CLAIM, false},
+ {RecoveryClaimMode.NO_CLAIM, true},
+ {RecoveryClaimMode.NO_CLAIM, false}
});
}
- @ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary =
{1}")
+ @ParameterizedTest(name = "RecoveryClaimMode = {0},
fileMergingAcrossBoundary = {1}")
@MethodSource("parameters")
public void testSwitchFromDisablingToEnablingFileMerging(
- RestoreMode restoreMode, boolean fileMergingAcrossBoundary,
@TempDir Path checkpointDir)
+ RecoveryClaimMode recoveryClaimMode,
+ boolean fileMergingAcrossBoundary,
+ @TempDir Path checkpointDir)
throws Exception {
testSwitchingFileMerging(
- checkpointDir, false, true, restoreMode,
fileMergingAcrossBoundary);
+ checkpointDir, false, true, recoveryClaimMode,
fileMergingAcrossBoundary);
}
- @ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary =
{1}")
+ @ParameterizedTest(name = "RecoveryClaimMode = {0},
fileMergingAcrossBoundary = {1}")
@MethodSource("parameters")
public void testSwitchFromEnablingToDisablingFileMerging(
- RestoreMode restoreMode, boolean fileMergingAcrossBoundary,
@TempDir Path checkpointDir)
+ RecoveryClaimMode recoveryClaimMode,
+ boolean fileMergingAcrossBoundary,
+ @TempDir Path checkpointDir)
throws Exception {
testSwitchingFileMerging(
- checkpointDir, true, false, restoreMode,
fileMergingAcrossBoundary);
+ checkpointDir, true, false, recoveryClaimMode,
fileMergingAcrossBoundary);
}
private void testSwitchingFileMerging(
Path checkpointDir,
boolean firstFileMergingSwitch,
boolean secondFileMergingSwitch,
- RestoreMode restoreMode,
+ RecoveryClaimMode recoveryClaimMode,
boolean fileMergingAcrossBoundary)
throws Exception {
final Configuration config = new Configuration();
@@ -125,7 +129,7 @@ public class SnapshotFileMergingCompatibilityITCase extends
TestLogger {
stateBackend1,
null,
firstCluster,
- restoreMode,
+ recoveryClaimMode,
config,
consecutiveCheckpoint,
true);
@@ -155,7 +159,7 @@ public class SnapshotFileMergingCompatibilityITCase extends
TestLogger {
stateBackend2,
firstCheckpoint,
secondCluster,
- restoreMode,
+ recoveryClaimMode,
config,
consecutiveCheckpoint,
true);
@@ -165,7 +169,7 @@ public class SnapshotFileMergingCompatibilityITCase extends
TestLogger {
verifyCheckpointExistOrWaitDeleted(
firstCheckpoint,
determineFileExist(
- restoreMode, firstFileMergingSwitch,
secondFileMergingSwitch),
+ recoveryClaimMode, firstFileMergingSwitch,
secondFileMergingSwitch),
firstFileMergingSwitch,
firstMetadata);
} finally {
@@ -190,7 +194,7 @@ public class SnapshotFileMergingCompatibilityITCase extends
TestLogger {
stateBackend3,
secondCheckpoint,
thirdCluster,
- restoreMode,
+ recoveryClaimMode,
config,
consecutiveCheckpoint,
true);
@@ -200,7 +204,7 @@ public class SnapshotFileMergingCompatibilityITCase extends
TestLogger {
verifyCheckpointExistOrWaitDeleted(
secondCheckpoint,
determineFileExist(
- restoreMode, secondFileMergingSwitch,
secondFileMergingSwitch),
+ recoveryClaimMode, secondFileMergingSwitch,
secondFileMergingSwitch),
secondFileMergingSwitch,
secondMetadata);
} finally {
@@ -225,7 +229,7 @@ public class SnapshotFileMergingCompatibilityITCase extends
TestLogger {
stateBackend4,
thirdCheckpoint,
fourthCluster,
- restoreMode,
+ recoveryClaimMode,
config,
consecutiveCheckpoint,
false);
@@ -233,7 +237,7 @@ public class SnapshotFileMergingCompatibilityITCase extends
TestLogger {
verifyCheckpointExistOrWaitDeleted(
thirdCheckpoint,
determineFileExist(
- restoreMode, secondFileMergingSwitch,
secondFileMergingSwitch),
+ recoveryClaimMode, secondFileMergingSwitch,
secondFileMergingSwitch),
secondFileMergingSwitch,
thirdMetadata);
verifyCheckpointExistOrWaitDeleted(
@@ -274,8 +278,10 @@ public class SnapshotFileMergingCompatibilityITCase
extends TestLogger {
}
private static TernaryBoolean determineFileExist(
- RestoreMode mode, boolean lastFileMergingEnabled, boolean
thisFileMergingEnabled) {
- if (mode == RestoreMode.CLAIM) {
+ RecoveryClaimMode mode,
+ boolean lastFileMergingEnabled,
+ boolean thisFileMergingEnabled) {
+ if (mode == RecoveryClaimMode.CLAIM) {
if (lastFileMergingEnabled || thisFileMergingEnabled) {
// file merging will not reference files from previous jobs.
return TernaryBoolean.FALSE;