This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 5afff68c89c [FLINK-26985][runtime] Don't discard shared state of
restored checkpoints
5afff68c89c is described below
commit 5afff68c89c5e51a55f135109089c22d073b507f
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Apr 1 16:08:32 2022 +0200
[FLINK-26985][runtime] Don't discard shared state of restored checkpoints
Currently, in LEGACY restore mode, shared state of
incremental checkpoints can be discarded regardless
of whether they were created by this job or not.
This invalidates the checkpoint from which the job
was restored.
The bug was introduced in FLINK-24611. Before that,
reference count was maintained for each shared state
entry; "initial" checkpoints did not decrement this
count, preventing their shared state from being discarded.
This change makes SharedStateRegistry to:
1. Remember the max checkpiont ID encountered during recovery
2. Associate each shared state entry with a checkpoint ID that created it
3. Only discard the entry if its createdByCheckpointID >
highestRetainCheckpointID
(1) is called from:
- CheckpointCoordinator.restoreSavepoint - to cover initial restore from a
checkpoint
- SharedStateFactory, when building checkpoint store - to cover the
failover case
(see DefaultExecutionGraphFactory.createAndRestoreExecutionGraph)
Adjusting only the CheckpointCoordinator path isn't sufficient:
- job recovers from an existing checkpoints, adds it to the store
- a new checkpoint is created - with the default restore settings
- a failure happens, job recovers from a newer checkpoint
- when a newer checkpoint is subsumed, its (inherited) shared state
might be deleted
---
.../KubernetesCheckpointRecoveryFactory.java | 7 +-
.../flink/kubernetes/utils/KubernetesUtils.java | 7 +-
.../runtime/checkpoint/CheckpointCoordinator.java | 3 +-
.../checkpoint/CheckpointRecoveryFactory.java | 5 +-
.../runtime/checkpoint/CompletedCheckpoint.java | 7 +-
.../EmbeddedCompletedCheckpointStore.java | 13 +-
.../PerJobCheckpointRecoveryFactory.java | 12 +-
.../StandaloneCheckpointRecoveryFactory.java | 9 +-
.../StandaloneCompletedCheckpointStore.java | 17 +-
.../ZooKeeperCheckpointRecoveryFactory.java | 7 +-
.../cleanup/CheckpointResourcesCleanupRunner.java | 7 +-
.../EmbeddedHaServicesWithLeadershipControl.java | 9 +-
.../apache/flink/runtime/jobgraph/RestoreMode.java | 2 +
.../runtime/jobgraph/SavepointConfigOptions.java | 2 +-
.../flink/runtime/scheduler/SchedulerUtils.java | 14 +-
.../flink/runtime/state/SharedStateRegistry.java | 20 +-
.../runtime/state/SharedStateRegistryFactory.java | 8 +-
.../runtime/state/SharedStateRegistryImpl.java | 28 ++-
.../apache/flink/runtime/util/ZooKeeperUtils.java | 8 +-
.../CheckpointCoordinatorFailureTest.java | 3 +-
.../CheckpointCoordinatorRestoringTest.java | 6 +-
.../checkpoint/CheckpointCoordinatorTest.java | 266 +++++++++++----------
.../checkpoint/CompletedCheckpointTest.java | 5 +-
.../DefaultCompletedCheckpointStoreTest.java | 5 +-
.../checkpoint/PerJobCheckpointRecoveryTest.java | 13 +-
.../TestingCheckpointRecoveryFactory.java | 4 +-
.../ZooKeeperCompletedCheckpointStoreITCase.java | 4 +-
.../ZooKeeperCompletedCheckpointStoreTest.java | 4 +-
.../dispatcher/DispatcherCleanupITCase.java | 15 +-
.../CheckpointResourcesCleanupRunnerTest.java | 4 +-
.../runtime/scheduler/SchedulerUtilsTest.java | 13 +-
.../flink/runtime/testutils/CommonTestUtils.java | 34 ++-
.../ResumeCheckpointManuallyITCase.java | 181 +++++++-------
.../test/state/ChangelogCompatibilityITCase.java | 3 +-
.../flink/test/state/ChangelogRescalingITCase.java | 3 +-
.../java/org/apache/flink/test/util/TestUtils.java | 9 +
36 files changed, 471 insertions(+), 286 deletions(-)
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
index ea78ecbbd28..7150034bbb8 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import javax.annotation.Nullable;
@@ -81,7 +82,8 @@ public class KubernetesCheckpointRecoveryFactory implements
CheckpointRecoveryFa
JobID jobID,
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
- Executor ioExecutor)
+ Executor ioExecutor,
+ RestoreMode restoreMode)
throws Exception {
final String configMapName = getConfigMapNameFunction.apply(jobID);
KubernetesUtils.createConfigMapIfItDoesNotExist(kubeClient,
configMapName, clusterId);
@@ -94,7 +96,8 @@ public class KubernetesCheckpointRecoveryFactory implements
CheckpointRecoveryFa
lockIdentity,
maxNumberOfCheckpointsToRetain,
sharedStateRegistryFactory,
- ioExecutor);
+ ioExecutor,
+ restoreMode);
}
@Override
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
index e02d3a824e2..f8afbafde0a 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
@@ -37,6 +37,7 @@ import
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
import
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.NoOpJobGraphStoreWatcher;
@@ -296,6 +297,7 @@ public class KubernetesUtils {
* @param lockIdentity lock identity to check the leadership
* @param maxNumberOfCheckpointsToRetain max number of checkpoints to
retain on state store
* handle
+ * @param restoreMode the mode in which the job is restoring
* @return a {@link DefaultCompletedCheckpointStore} with {@link
KubernetesStateHandleStore}.
* @throws Exception when create the storage helper failed
*/
@@ -307,7 +309,8 @@ public class KubernetesUtils {
@Nullable String lockIdentity,
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
- Executor ioExecutor)
+ Executor ioExecutor,
+ RestoreMode restoreMode)
throws Exception {
final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage =
@@ -331,7 +334,7 @@ public class KubernetesUtils {
stateHandleStore,
KubernetesCheckpointStoreUtil.INSTANCE,
checkpoints,
- sharedStateRegistryFactory.create(ioExecutor, checkpoints),
+ sharedStateRegistryFactory.create(ioExecutor, checkpoints,
restoreMode),
executor);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 2efc034ca90..72a6b7032de 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1780,7 +1780,8 @@ public class CheckpointCoordinator {
// register shared state - even before adding the checkpoint to the
store
// because the latter might trigger subsumption so the ref counts must
be up-to-date
savepoint.registerSharedStatesAfterRestored(
- completedCheckpointStore.getSharedStateRegistry());
+ completedCheckpointStore.getSharedStateRegistry(),
+ restoreSettings.getRestoreMode());
completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
savepoint, checkpointsCleaner, this::scheduleTriggerRequest);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
index ab0b5ef8506..64c68caa8a9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
@@ -37,13 +38,15 @@ public interface CheckpointRecoveryFactory {
* @param sharedStateRegistryFactory Simple factory to produce {@link
SharedStateRegistry}
* objects.
* @param ioExecutor Executor used to run (async) deletes.
+ * @param restoreMode the restore mode with which the job is restoring.
* @return {@link CompletedCheckpointStore} instance for the job
*/
CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId,
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
- Executor ioExecutor)
+ Executor ioExecutor,
+ RestoreMode restoreMode)
throws Exception;
/**
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 41c577452bb..f0270ab661a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateUtil;
@@ -207,11 +208,13 @@ public class CompletedCheckpoint implements Serializable,
Checkpoint {
* checkpoint is added into the store.
*
* @param sharedStateRegistry The registry where shared states are
registered
+ * @param restoreMode the mode in which this checkpoint was restored from
*/
- public void registerSharedStatesAfterRestored(SharedStateRegistry
sharedStateRegistry) {
+ public void registerSharedStatesAfterRestored(
+ SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
// in claim mode we should not register any shared handles
if (!props.isUnclaimed()) {
- sharedStateRegistry.registerAll(operatorStates.values(),
checkpointID);
+ sharedStateRegistry.registerAllAfterRestored(this, restoreMode);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index 1e5e47c8855..744083ca6af 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
@@ -53,16 +54,22 @@ public class EmbeddedCompletedCheckpointStore extends
AbstractCompleteCheckpoint
@VisibleForTesting
public EmbeddedCompletedCheckpointStore(int maxRetainedCheckpoints) {
- this(maxRetainedCheckpoints, Collections.emptyList());
+ this(
+ maxRetainedCheckpoints,
+ Collections.emptyList(),
+ /* Using the default restore mode in tests to detect any
breaking changes early. */
+ RestoreMode.DEFAULT);
}
public EmbeddedCompletedCheckpointStore(
- int maxRetainedCheckpoints, Collection<CompletedCheckpoint>
initialCheckpoints) {
+ int maxRetainedCheckpoints,
+ Collection<CompletedCheckpoint> initialCheckpoints,
+ RestoreMode restoreMode) {
this(
maxRetainedCheckpoints,
initialCheckpoints,
SharedStateRegistry.DEFAULT_FACTORY.create(
- Executors.directExecutor(), initialCheckpoints));
+ Executors.directExecutor(), initialCheckpoints,
restoreMode));
}
public EmbeddedCompletedCheckpointStore(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
index bc18c453969..7a70f5624a5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import javax.annotation.Nullable;
@@ -42,7 +43,7 @@ public class PerJobCheckpointRecoveryFactory<T extends
CompletedCheckpointStore>
public static <T extends CompletedCheckpointStore>
CheckpointRecoveryFactory
withoutCheckpointStoreRecovery(IntFunction<T> storeFn) {
return new PerJobCheckpointRecoveryFactory<>(
- (maxCheckpoints, previous, sharedStateRegistry, ioExecutor) ->
{
+ (maxCheckpoints, previous, sharedStateRegistry, ioExecutor,
restoreMode) -> {
if (previous != null) {
throw new UnsupportedOperationException(
"Checkpoint store recovery is not supported.");
@@ -75,7 +76,8 @@ public class PerJobCheckpointRecoveryFactory<T extends
CompletedCheckpointStore>
JobID jobId,
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
- Executor ioExecutor) {
+ Executor ioExecutor,
+ RestoreMode restoreMode) {
return store.compute(
jobId,
(key, previous) ->
@@ -83,7 +85,8 @@ public class PerJobCheckpointRecoveryFactory<T extends
CompletedCheckpointStore>
maxNumberOfCheckpointsToRetain,
previous,
sharedStateRegistryFactory,
- ioExecutor));
+ ioExecutor,
+ restoreMode));
}
@Override
@@ -98,6 +101,7 @@ public class PerJobCheckpointRecoveryFactory<T extends
CompletedCheckpointStore>
int maxNumberOfCheckpointsToRetain,
@Nullable StoreType previousStore,
SharedStateRegistryFactory sharedStateRegistryFactory,
- Executor ioExecutor);
+ Executor ioExecutor,
+ RestoreMode restoreMode);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index 95f9da72406..abcb704ad7c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
@@ -32,11 +33,15 @@ public class StandaloneCheckpointRecoveryFactory implements
CheckpointRecoveryFa
JobID jobId,
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
- Executor ioExecutor)
+ Executor ioExecutor,
+ RestoreMode restoreMode)
throws Exception {
return new StandaloneCompletedCheckpointStore(
- maxNumberOfCheckpointsToRetain, sharedStateRegistryFactory,
ioExecutor);
+ maxNumberOfCheckpointsToRetain,
+ sharedStateRegistryFactory,
+ ioExecutor,
+ restoreMode);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 87a6486a911..6c89dcc7127 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
@@ -56,32 +57,38 @@ public class StandaloneCompletedCheckpointStore extends
AbstractCompleteCheckpoi
this(
maxNumberOfCheckpointsToRetain,
SharedStateRegistry.DEFAULT_FACTORY,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ /* Using the default restore mode in tests to detect any
breaking changes early. */
+ RestoreMode.DEFAULT);
}
/**
* Creates {@link StandaloneCompletedCheckpointStore}.
*
+ * @param restoreMode
* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints
to retain (at least
* 1). Adding more checkpoints than this results in older checkpoints
being discarded.
*/
public StandaloneCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
- Executor ioExecutor) {
+ Executor ioExecutor,
+ RestoreMode restoreMode) {
this(
maxNumberOfCheckpointsToRetain,
sharedStateRegistryFactory,
new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1),
- ioExecutor);
+ ioExecutor,
+ restoreMode);
}
private StandaloneCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
ArrayDeque<CompletedCheckpoint> checkpoints,
- Executor ioExecutor) {
- super(sharedStateRegistryFactory.create(ioExecutor, checkpoints));
+ Executor ioExecutor,
+ RestoreMode restoreMode) {
+ super(sharedStateRegistryFactory.create(ioExecutor, checkpoints,
restoreMode));
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at
least one checkpoint.");
this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
this.checkpoints = checkpoints;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 052b3405235..c522296cf89 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -51,7 +52,8 @@ public class ZooKeeperCheckpointRecoveryFactory implements
CheckpointRecoveryFac
JobID jobId,
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
- Executor ioExecutor)
+ Executor ioExecutor,
+ RestoreMode restoreMode)
throws Exception {
return ZooKeeperUtils.createCompletedCheckpoints(
@@ -61,7 +63,8 @@ public class ZooKeeperCheckpointRecoveryFactory implements
CheckpointRecoveryFac
maxNumberOfCheckpointsToRetain,
sharedStateRegistryFactory,
ioExecutor,
- executor);
+ executor,
+ restoreMode);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
index c97f8644a1f..e657c8d7400 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
@@ -30,6 +30,7 @@ import
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
import org.apache.flink.runtime.dispatcher.JobCancellationFailedException;
import
org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -145,7 +146,11 @@ public class CheckpointResourcesCleanupRunner implements
JobManagerRunner {
DefaultCompletedCheckpointStoreUtils.getMaximumNumberOfRetainedCheckpoints(
jobManagerConfiguration, LOG),
sharedStateRegistryFactory,
- cleanupExecutor);
+ cleanupExecutor,
+ // Using RestoreMode.CLAIM to be able to discard shared state,
if any.
+ // Note that it also means that the original shared state
might be discarded as well
+ // because the initial checkpoint might be subsumed.
+ RestoreMode.CLAIM);
}
private CheckpointIDCounter createCheckpointIDCounter() throws Exception {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
index ce59d0d99e9..b9859ba0f27 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
@@ -40,13 +40,18 @@ public class EmbeddedHaServicesWithLeadershipControl
extends EmbeddedHaServices
this(
executor,
new
PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>(
- (maxCheckpoints, previous, stateRegistryFactory,
ioExecutor) -> {
+ (maxCheckpoints,
+ previous,
+ stateRegistryFactory,
+ ioExecutor,
+ restoreMode) -> {
List<CompletedCheckpoint> checkpoints =
previous != null
? previous.getAllCheckpoints()
: Collections.emptyList();
SharedStateRegistry stateRegistry =
- stateRegistryFactory.create(ioExecutor,
checkpoints);
+ stateRegistryFactory.create(
+ ioExecutor, checkpoints,
restoreMode);
if (previous != null) {
if (!previous.getShutdownStatus().isPresent())
{
throw new IllegalStateException(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
index 10a4f2ac60d..da6c325265a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
@@ -53,4 +53,6 @@ public enum RestoreMode implements DescribedEnum {
public InlineElement getDescription() {
return text(description);
}
+
+ public static final RestoreMode DEFAULT = NO_CLAIM;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
index e8a9dd86d1c..a38e05c5f3f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
@@ -52,7 +52,7 @@ public class SavepointConfigOptions {
public static final ConfigOption<RestoreMode> RESTORE_MODE =
key("execution.savepoint-restore-mode")
.enumType(RestoreMode.class)
- .defaultValue(RestoreMode.NO_CLAIM)
+ .defaultValue(RestoreMode.DEFAULT)
.withDescription(
"Describes the mode how Flink should restore from
the given"
+ " savepoint or retained checkpoint.");
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
index 6b14801d8eb..87f2e56c6ba 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
@@ -30,6 +30,7 @@ import
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.slf4j.Logger;
@@ -55,7 +56,12 @@ public final class SchedulerUtils {
if (DefaultExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
try {
return createCompletedCheckpointStore(
- configuration, checkpointRecoveryFactory, ioExecutor,
log, jobId);
+ configuration,
+ checkpointRecoveryFactory,
+ ioExecutor,
+ log,
+ jobId,
+
jobGraph.getSavepointRestoreSettings().getRestoreMode());
} catch (Exception e) {
throw new JobExecutionException(
jobId,
@@ -73,14 +79,16 @@ public final class SchedulerUtils {
CheckpointRecoveryFactory recoveryFactory,
Executor ioExecutor,
Logger log,
- JobID jobId)
+ JobID jobId,
+ RestoreMode restoreMode)
throws Exception {
return recoveryFactory.createRecoveredCompletedCheckpointStore(
jobId,
DefaultCompletedCheckpointStoreUtils.getMaximumNumberOfRetainedCheckpoints(
jobManagerConfig, log),
SharedStateRegistry.DEFAULT_FACTORY,
- ioExecutor);
+ ioExecutor,
+ restoreMode);
}
public static CheckpointIDCounter
createCheckpointIDCounterIfCheckpointingIsEnabled(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 7172bec4c24..b816f09e767 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
/**
* This registry manages state that is shared across (incremental)
checkpoints, and is responsible
@@ -32,11 +33,11 @@ public interface SharedStateRegistry extends AutoCloseable {
/** A singleton object for the default implementation of a {@link
SharedStateRegistryFactory} */
SharedStateRegistryFactory DEFAULT_FACTORY =
- (deleteExecutor, checkpoints) -> {
+ (deleteExecutor, checkpoints, restoreMode) -> {
SharedStateRegistry sharedStateRegistry =
new SharedStateRegistryImpl(deleteExecutor);
for (CompletedCheckpoint checkpoint : checkpoints) {
-
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry, restoreMode);
}
return sharedStateRegistry;
};
@@ -66,10 +67,25 @@ public interface SharedStateRegistry extends AutoCloseable {
/**
* Register given shared states in the registry.
*
+ * <p>NOTE: For state from checkpoints from other jobs or runs (i.e. after
recovery), please use
+ * {@link #registerAllAfterRestored(CompletedCheckpoint, RestoreMode)}
+ *
* @param stateHandles The shared states to register.
* @param checkpointID which uses the states.
*/
void registerAll(Iterable<? extends CompositeStateHandle> stateHandles,
long checkpointID);
+ /**
+ * Set the lowest checkpoint ID below which no state is discarded,
inclusive.
+ *
+ * <p>After recovery from an incremental checkpoint, its state should NOT
be discarded, even if
+ * {@link #unregisterUnusedState(long) not used} anymore (unless
recovering in {@link
+ * RestoreMode#CLAIM CLAIM} mode).
+ *
+ * <p>This should hold for both cases: when recovering from that initial
checkpoint; and from
+ * any subsequent checkpoint derived from it.
+ */
+ void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode
mode);
+
void checkpointCompleted(long checkpointId);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
index bbdd2fd0959..bc8118cce42 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import java.util.Collection;
import java.util.concurrent.Executor;
@@ -29,10 +30,13 @@ public interface SharedStateRegistryFactory {
/**
* Factory method for {@link SharedStateRegistry}.
*
- * @param checkpoints whose shared state will be registered.
* @param deleteExecutor executor used to run (async) deletes.
+ * @param checkpoints whose shared state will be registered.
+ * @param restoreMode the mode in which the given checkpoints were restored
* @return a SharedStateRegistry object
*/
SharedStateRegistry create(
- Executor deleteExecutor, Collection<CompletedCheckpoint>
checkpoints);
+ Executor deleteExecutor,
+ Collection<CompletedCheckpoint> checkpoints,
+ RestoreMode restoreMode);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
index b87d8646086..4dce6455277 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.state;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.util.concurrent.Executors;
import org.slf4j.Logger;
@@ -51,6 +53,9 @@ public class SharedStateRegistryImpl implements
SharedStateRegistry {
/** Executor for async state deletion */
private final Executor asyncDisposalExecutor;
+ /** Checkpoint ID below which no state is discarded, inclusive. */
+ private long highestNotClaimedCheckpointID = -1L;
+
/** Default uses direct executor to delete unreferenced state */
public SharedStateRegistryImpl() {
this(Executors.directExecutor());
@@ -147,7 +152,9 @@ public class SharedStateRegistryImpl implements
SharedStateRegistry {
while (it.hasNext()) {
SharedStateEntry entry = it.next();
if (entry.lastUsedCheckpointID < lowestCheckpointID) {
- subsumed.add(entry.stateHandle);
+ if (entry.createdByCheckpointID >
highestNotClaimedCheckpointID) {
+ subsumed.add(entry.stateHandle);
+ }
it.remove();
}
}
@@ -174,6 +181,20 @@ public class SharedStateRegistryImpl implements
SharedStateRegistry {
}
}
+ @Override
+ public void registerAllAfterRestored(CompletedCheckpoint checkpoint,
RestoreMode mode) {
+ registerAll(checkpoint.getOperatorStates().values(),
checkpoint.getCheckpointID());
+ // In NO_CLAIM and LEGACY restore modes, shared state of the initial
checkpoints must be
+ // preserved. This is achieved by advancing highestRetainCheckpointID
here, and then
+ // checking entry.createdByCheckpointID against it on checkpoint
subsumption.
+ // In CLAIM restore mode, the shared state of the initial checkpoints
must be
+ // discarded as soon as it becomes unused - so
highestRetainCheckpointID is not updated.
+ if (mode != RestoreMode.CLAIM) {
+ highestNotClaimedCheckpointID =
+ Math.max(highestNotClaimedCheckpointID,
checkpoint.getCheckpointID());
+ }
+ }
+
@Override
public void checkpointCompleted(long checkpointId) {
for (SharedStateEntry entry : registeredStates.values()) {
@@ -251,6 +272,8 @@ public class SharedStateRegistryImpl implements
SharedStateRegistry {
/** The shared state handle */
StreamStateHandle stateHandle;
+ private final long createdByCheckpointID;
+
private long lastUsedCheckpointID;
/** Whether this entry is included into a confirmed checkpoint. */
@@ -258,6 +281,7 @@ public class SharedStateRegistryImpl implements
SharedStateRegistry {
SharedStateEntry(StreamStateHandle value, long checkpointID) {
this.stateHandle = value;
+ this.createdByCheckpointID = checkpointID;
this.lastUsedCheckpointID = checkpointID;
}
@@ -266,6 +290,8 @@ public class SharedStateRegistryImpl implements
SharedStateRegistry {
return "SharedStateEntry{"
+ "stateHandle="
+ stateHandle
+ + ", createdByCheckpointID="
+ + createdByCheckpointID
+ ", lastUsedCheckpointID="
+ lastUsedCheckpointID
+ '}';
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index c62b0c3bde0..2de67d006d2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -34,6 +34,7 @@ import
org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import
org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
@@ -569,6 +570,7 @@ public class ZooKeeperUtils {
* @param configuration {@link Configuration} object
* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints
to retain
* @param executor to run ZooKeeper callbacks
+ * @param restoreMode the mode in which the job is being restored
* @return {@link DefaultCompletedCheckpointStore} instance
* @throws Exception if the completed checkpoint store cannot be created
*/
@@ -578,7 +580,8 @@ public class ZooKeeperUtils {
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
- Executor executor)
+ Executor executor,
+ RestoreMode restoreMode)
throws Exception {
checkNotNull(configuration, "Configuration");
@@ -597,7 +600,8 @@ public class ZooKeeperUtils {
completedCheckpointStateHandleStore,
ZooKeeperCheckpointStoreUtil.INSTANCE,
completedCheckpoints,
- sharedStateRegistryFactory.create(ioExecutor,
completedCheckpoints),
+ sharedStateRegistryFactory.create(
+ ioExecutor, completedCheckpoints, restoreMode),
executor);
LOG.info(
"Initialized {} in '{}' with {}.",
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 36745153959..f242f92e387 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.state.InputChannelStateHandle;
@@ -280,7 +281,7 @@ public class CheckpointCoordinatorFailureTest extends
TestLogger {
public FailingCompletedCheckpointStore(Exception addCheckpointFailure)
{
super(
SharedStateRegistry.DEFAULT_FACTORY.create(
- Executors.directExecutor(), emptyList()));
+ Executors.directExecutor(), emptyList(),
RestoreMode.DEFAULT));
this.addCheckpointFailure = addCheckpointFailure;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 3b839739663..3e9fb12d51a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -234,7 +235,7 @@ public class CheckpointCoordinatorRestoringTest extends
TestLogger {
final ExecutionGraph executionGraph = createExecutionGraph(vertices);
final EmbeddedCompletedCheckpointStore store =
new EmbeddedCompletedCheckpointStore(
- completedCheckpoints.size(), completedCheckpoints);
+ completedCheckpoints.size(), completedCheckpoints,
RestoreMode.DEFAULT);
// set up the coordinator and validate the initial state
final CheckpointCoordinator coordinator =
@@ -778,7 +779,8 @@ public class CheckpointCoordinatorRestoringTest extends
TestLogger {
// set up the coordinator and validate the initial state
SharedStateRegistry sharedStateRegistry =
-
SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(),
emptyList());
+ SharedStateRegistry.DEFAULT_FACTORY.create(
+ Executors.directExecutor(), emptyList(),
RestoreMode.DEFAULT);
CheckpointCoordinator coord =
new CheckpointCoordinatorBuilder()
.setExecutionGraph(newGraph)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 30af82fbe7d..9d3a87ddd84 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -38,6 +38,7 @@ import
org.apache.flink.runtime.executiongraph.ExecutionVertex;
import
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
@@ -2867,163 +2868,178 @@ public class CheckpointCoordinatorTest extends
TestLogger {
@Test
public void testSharedStateRegistrationOnRestore() throws Exception {
- JobVertexID jobVertexID1 = new JobVertexID();
+ for (RestoreMode restoreMode : RestoreMode.values()) {
+ JobVertexID jobVertexID1 = new JobVertexID();
- int parallelism1 = 2;
- int maxParallelism1 = 4;
+ int parallelism1 = 2;
+ int maxParallelism1 = 4;
- ExecutionGraph graph =
- new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
- .addJobVertex(jobVertexID1, parallelism1,
maxParallelism1)
- .build();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID1, parallelism1,
maxParallelism1)
+ .build();
- ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
+ ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
- List<CompletedCheckpoint> checkpoints = Collections.emptyList();
- SharedStateRegistry firstInstance =
- SharedStateRegistry.DEFAULT_FACTORY.create(
-
org.apache.flink.util.concurrent.Executors.directExecutor(), checkpoints);
- final EmbeddedCompletedCheckpointStore store =
- new EmbeddedCompletedCheckpointStore(10, checkpoints,
firstInstance);
+ List<CompletedCheckpoint> checkpoints = Collections.emptyList();
+ SharedStateRegistry firstInstance =
+ SharedStateRegistry.DEFAULT_FACTORY.create(
+
org.apache.flink.util.concurrent.Executors.directExecutor(),
+ checkpoints,
+ restoreMode);
+ final EmbeddedCompletedCheckpointStore store =
+ new EmbeddedCompletedCheckpointStore(10, checkpoints,
firstInstance);
- // set up the coordinator and validate the initial state
- final CheckpointCoordinatorBuilder coordinatorBuilder =
- new CheckpointCoordinatorBuilder()
- .setExecutionGraph(graph)
- .setTimer(manuallyTriggeredScheduledExecutor);
- final CheckpointCoordinator coordinator =
- coordinatorBuilder.setCompletedCheckpointStore(store).build();
+ // set up the coordinator and validate the initial state
+ final CheckpointCoordinatorBuilder coordinatorBuilder =
+ new CheckpointCoordinatorBuilder()
+ .setExecutionGraph(graph)
+ .setTimer(manuallyTriggeredScheduledExecutor);
+ final CheckpointCoordinator coordinator =
+
coordinatorBuilder.setCompletedCheckpointStore(store).build();
- final int numCheckpoints = 3;
+ final int numCheckpoints = 3;
- List<KeyGroupRange> keyGroupPartitions1 =
-
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1,
parallelism1);
+ List<KeyGroupRange> keyGroupPartitions1 =
+ StateAssignmentOperation.createKeyGroupPartitions(
+ maxParallelism1, parallelism1);
- for (int i = 0; i < numCheckpoints; ++i) {
- performIncrementalCheckpoint(
- graph.getJobID(), coordinator, jobVertex1,
keyGroupPartitions1, i);
- }
+ for (int i = 0; i < numCheckpoints; ++i) {
+ performIncrementalCheckpoint(
+ graph.getJobID(), coordinator, jobVertex1,
keyGroupPartitions1, i);
+ }
- List<CompletedCheckpoint> completedCheckpoints =
coordinator.getSuccessfulCheckpoints();
- assertEquals(numCheckpoints, completedCheckpoints.size());
+ List<CompletedCheckpoint> completedCheckpoints =
coordinator.getSuccessfulCheckpoints();
+ assertEquals(numCheckpoints, completedCheckpoints.size());
- int sharedHandleCount = 0;
+ int sharedHandleCount = 0;
- List<Map<StateHandleID, StreamStateHandle>> sharedHandlesByCheckpoint =
- new ArrayList<>(numCheckpoints);
+ List<Map<StateHandleID, StreamStateHandle>>
sharedHandlesByCheckpoint =
+ new ArrayList<>(numCheckpoints);
- for (int i = 0; i < numCheckpoints; ++i) {
- sharedHandlesByCheckpoint.add(new HashMap<>(2));
- }
+ for (int i = 0; i < numCheckpoints; ++i) {
+ sharedHandlesByCheckpoint.add(new HashMap<>(2));
+ }
- int cp = 0;
- for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
- for (OperatorState taskState :
completedCheckpoint.getOperatorStates().values()) {
- for (OperatorSubtaskState subtaskState :
taskState.getStates()) {
- for (KeyedStateHandle keyedStateHandle :
subtaskState.getManagedKeyedState()) {
- // test we are once registered with the current
registry
- verify(keyedStateHandle, times(1))
- .registerSharedStates(
- firstInstance,
completedCheckpoint.getCheckpointID());
- IncrementalRemoteKeyedStateHandle
incrementalKeyedStateHandle =
- (IncrementalRemoteKeyedStateHandle)
keyedStateHandle;
-
- sharedHandlesByCheckpoint
- .get(cp)
-
.putAll(incrementalKeyedStateHandle.getSharedState());
-
- for (StreamStateHandle streamStateHandle :
-
incrementalKeyedStateHandle.getSharedState().values()) {
- assertTrue(
- !(streamStateHandle instanceof
PlaceholderStreamStateHandle));
- verify(streamStateHandle, never()).discardState();
- ++sharedHandleCount;
- }
+ int cp = 0;
+ for (CompletedCheckpoint completedCheckpoint :
completedCheckpoints) {
+ for (OperatorState taskState :
completedCheckpoint.getOperatorStates().values()) {
+ for (OperatorSubtaskState subtaskState :
taskState.getStates()) {
+ for (KeyedStateHandle keyedStateHandle :
+ subtaskState.getManagedKeyedState()) {
+ // test we are once registered with the current
registry
+ verify(keyedStateHandle, times(1))
+ .registerSharedStates(
+ firstInstance,
completedCheckpoint.getCheckpointID());
+ IncrementalRemoteKeyedStateHandle
incrementalKeyedStateHandle =
+ (IncrementalRemoteKeyedStateHandle)
keyedStateHandle;
+
+ sharedHandlesByCheckpoint
+ .get(cp)
+
.putAll(incrementalKeyedStateHandle.getSharedState());
+
+ for (StreamStateHandle streamStateHandle :
+
incrementalKeyedStateHandle.getSharedState().values()) {
+ assertTrue(
+ !(streamStateHandle
+ instanceof
PlaceholderStreamStateHandle));
+ verify(streamStateHandle,
never()).discardState();
+ ++sharedHandleCount;
+ }
+
+ for (StreamStateHandle streamStateHandle :
+
incrementalKeyedStateHandle.getPrivateState().values()) {
+ verify(streamStateHandle,
never()).discardState();
+ }
- for (StreamStateHandle streamStateHandle :
-
incrementalKeyedStateHandle.getPrivateState().values()) {
- verify(streamStateHandle, never()).discardState();
+
verify(incrementalKeyedStateHandle.getMetaStateHandle(), never())
+ .discardState();
}
-
verify(incrementalKeyedStateHandle.getMetaStateHandle(), never())
- .discardState();
+ verify(subtaskState, never()).discardState();
}
-
- verify(subtaskState, never()).discardState();
}
+ ++cp;
}
- ++cp;
- }
- // 2 (parallelism) x (1 (CP0) + 2 (CP1) + 2 (CP2)) = 10
- assertEquals(10, sharedHandleCount);
+ // 2 (parallelism) x (1 (CP0) + 2 (CP1) + 2 (CP2)) = 10
+ assertEquals(10, sharedHandleCount);
- // discard CP0
- store.removeOldestCheckpoint();
+ // discard CP0
+ store.removeOldestCheckpoint();
- // we expect no shared state was discarded because the state of CP0 is
still referenced by
- // CP1
- for (Map<StateHandleID, StreamStateHandle> cpList :
sharedHandlesByCheckpoint) {
- for (StreamStateHandle streamStateHandle : cpList.values()) {
- verify(streamStateHandle, never()).discardState();
+ // we expect no shared state was discarded because the state of
CP0 is still referenced
+ // by
+ // CP1
+ for (Map<StateHandleID, StreamStateHandle> cpList :
sharedHandlesByCheckpoint) {
+ for (StreamStateHandle streamStateHandle : cpList.values()) {
+ verify(streamStateHandle, never()).discardState();
+ }
}
- }
- // shutdown the store
- store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
-
- // restore the store
- Set<ExecutionJobVertex> tasks = new HashSet<>();
- tasks.add(jobVertex1);
-
- assertEquals(JobStatus.SUSPENDED,
store.getShutdownStatus().orElse(null));
- SharedStateRegistry secondInstance =
- SharedStateRegistry.DEFAULT_FACTORY.create(
-
org.apache.flink.util.concurrent.Executors.directExecutor(),
- store.getAllCheckpoints());
- final EmbeddedCompletedCheckpointStore secondStore =
- new EmbeddedCompletedCheckpointStore(10,
store.getAllCheckpoints(), secondInstance);
- final CheckpointCoordinator secondCoordinator =
-
coordinatorBuilder.setCompletedCheckpointStore(secondStore).build();
-
assertTrue(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
-
- // validate that all shared states are registered again after the
recovery.
- cp = 0;
- for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
- for (OperatorState taskState :
completedCheckpoint.getOperatorStates().values()) {
- for (OperatorSubtaskState subtaskState :
taskState.getStates()) {
- for (KeyedStateHandle keyedStateHandle :
subtaskState.getManagedKeyedState()) {
- VerificationMode verificationMode;
- // test we are once registered with the new registry
- if (cp > 0) {
- verificationMode = times(1);
- } else {
- verificationMode = never();
- }
+ // shutdown the store
+ store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
+
+ // restore the store
+ Set<ExecutionJobVertex> tasks = new HashSet<>();
+ tasks.add(jobVertex1);
+
+ assertEquals(JobStatus.SUSPENDED,
store.getShutdownStatus().orElse(null));
+ SharedStateRegistry secondInstance =
+ SharedStateRegistry.DEFAULT_FACTORY.create(
+
org.apache.flink.util.concurrent.Executors.directExecutor(),
+ store.getAllCheckpoints(),
+ restoreMode);
+ final EmbeddedCompletedCheckpointStore secondStore =
+ new EmbeddedCompletedCheckpointStore(
+ 10, store.getAllCheckpoints(), secondInstance);
+ final CheckpointCoordinator secondCoordinator =
+
coordinatorBuilder.setCompletedCheckpointStore(secondStore).build();
+
assertTrue(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
+
+ // validate that all shared states are registered again after the
recovery.
+ cp = 0;
+ for (CompletedCheckpoint completedCheckpoint :
completedCheckpoints) {
+ for (OperatorState taskState :
completedCheckpoint.getOperatorStates().values()) {
+ for (OperatorSubtaskState subtaskState :
taskState.getStates()) {
+ for (KeyedStateHandle keyedStateHandle :
+ subtaskState.getManagedKeyedState()) {
+ VerificationMode verificationMode;
+ // test we are once registered with the new
registry
+ if (cp > 0) {
+ verificationMode = times(1);
+ } else {
+ verificationMode = never();
+ }
- // check that all are registered with the new registry
- verify(keyedStateHandle, verificationMode)
- .registerSharedStates(
- secondInstance,
completedCheckpoint.getCheckpointID());
+ // check that all are registered with the new
registry
+ verify(keyedStateHandle, verificationMode)
+ .registerSharedStates(
+ secondInstance,
completedCheckpoint.getCheckpointID());
+ }
}
}
+ ++cp;
}
- ++cp;
- }
- // discard CP1
- secondStore.removeOldestCheckpoint();
+ // discard CP1
+ secondStore.removeOldestCheckpoint();
- // we expect that all shared state from CP0 is no longer referenced
and discarded. CP2 is
- // still live and also
- // references the state from CP1, so we expect they are not discarded.
- verifyDiscard(sharedHandlesByCheckpoint, cpId -> cpId == 0 ? times(1)
: never());
+ // we expect that all shared state from CP0 is no longer
referenced and discarded. CP2
+ // is
+ // still live and also
+ // references the state from CP1, so we expect they are not
discarded.
+ verifyDiscard(
+ sharedHandlesByCheckpoint,
+ cpId -> restoreMode == RestoreMode.CLAIM && cpId == 0 ?
times(1) : never());
- // discard CP2
- secondStore.removeOldestCheckpoint();
+ // discard CP2
+ secondStore.removeOldestCheckpoint();
- // still expect shared state not to be discarded because it may be
used in later checkpoints
- verifyDiscard(sharedHandlesByCheckpoint, cpId -> cpId == 1 ? never() :
atLeast(0));
+ // still expect shared state not to be discarded because it may be
used in later
+ // checkpoints
+ verifyDiscard(sharedHandlesByCheckpoint, cpId -> cpId == 1 ?
never() : atLeast(0));
+ }
}
@Test
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index b6461a9c984..a0b67cd0dfa 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
@@ -235,7 +236,7 @@ public class CompletedCheckpointTest {
null);
SharedStateRegistry sharedStateRegistry = new
SharedStateRegistryImpl();
- checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+ checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry,
RestoreMode.DEFAULT);
verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L);
}
@@ -267,7 +268,7 @@ public class CompletedCheckpointTest {
null);
SharedStateRegistry sharedStateRegistry = new
SharedStateRegistryImpl();
- checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+ checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry,
RestoreMode.DEFAULT);
verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L);
// Subsume
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index 064e26a9b9f..22cd5dd021c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.persistence.StateHandleStore;
import
org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.TestingStateHandleStore;
@@ -397,7 +398,9 @@ public class DefaultCompletedCheckpointStoreTest extends
TestLogger {
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
stateHandleStore, checkpointStoreUtil),
SharedStateRegistry.DEFAULT_FACTORY.create(
-
org.apache.flink.util.concurrent.Executors.directExecutor(), emptyList()),
+
org.apache.flink.util.concurrent.Executors.directExecutor(),
+ emptyList(),
+ RestoreMode.DEFAULT),
executorService);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
index 58a0d954318..7203edfc709 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
@@ -49,7 +50,8 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
firstJobId,
1,
SharedStateRegistry.DEFAULT_FACTORY,
- Executors.directExecutor()));
+ Executors.directExecutor(),
+ RestoreMode.DEFAULT));
assertThrows(
UnsupportedOperationException.class,
() ->
@@ -57,7 +59,8 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
firstJobId,
1,
SharedStateRegistry.DEFAULT_FACTORY,
- Executors.directExecutor()));
+ Executors.directExecutor(),
+ RestoreMode.DEFAULT));
final JobID secondJobId = new JobID();
assertSame(
@@ -66,7 +69,8 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
secondJobId,
1,
SharedStateRegistry.DEFAULT_FACTORY,
- Executors.directExecutor()));
+ Executors.directExecutor(),
+ RestoreMode.DEFAULT));
assertThrows(
UnsupportedOperationException.class,
() ->
@@ -74,6 +78,7 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
secondJobId,
1,
SharedStateRegistry.DEFAULT_FACTORY,
- Executors.directExecutor()));
+ Executors.directExecutor(),
+ RestoreMode.DEFAULT));
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
index e164543d0ab..687196dff82 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import java.util.concurrent.Executor;
@@ -39,7 +40,8 @@ public class TestingCheckpointRecoveryFactory implements
CheckpointRecoveryFacto
JobID jobId,
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
- Executor ioExecutor) {
+ Executor ioExecutor,
+ RestoreMode restoreMode) {
return store;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 240955927bb..5daad82414d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
@@ -92,7 +93,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends
CompletedCheckpoint
checkpointStoreUtil,
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
checkpointsInZooKeeper, checkpointStoreUtil),
-
SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(),
emptyList()),
+ SharedStateRegistry.DEFAULT_FACTORY.create(
+ Executors.directExecutor(), emptyList(),
RestoreMode.DEFAULT),
executor);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 804d285ae28..cd8561157cb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import
org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.state.RetrievableStateHandle;
@@ -196,7 +197,8 @@ public class ZooKeeperCompletedCheckpointStoreTest extends
TestLogger {
checkpointsInZooKeeper,
zooKeeperCheckpointStoreUtil,
Collections.emptyList(),
-
SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(),
emptyList()),
+ SharedStateRegistry.DEFAULT_FACTORY.create(
+ Executors.directExecutor(), emptyList(),
RestoreMode.DEFAULT),
Executors.directExecutor());
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
index 4b4ce274333..e7b9c57db97 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
@@ -35,6 +35,7 @@ import
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResul
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
@@ -87,7 +88,11 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
super.setUp();
haServices.setCheckpointRecoveryFactory(
new
PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>(
- (maxCheckpoints, previous, sharedStateRegistryFactory,
ioExecutor) -> {
+ (maxCheckpoints,
+ previous,
+ sharedStateRegistryFactory,
+ ioExecutor,
+ restoreMode) -> {
if (previous != null) {
// First job cleanup still succeeded for the
// CompletedCheckpointStore because the
JobGraph cleanup happens
@@ -98,13 +103,17 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
maxCheckpoints,
previous.getAllCheckpoints(),
sharedStateRegistryFactory.create(
- ioExecutor,
previous.getAllCheckpoints()));
+ ioExecutor,
+ previous.getAllCheckpoints(),
+ restoreMode));
}
return new EmbeddedCompletedCheckpointStore(
maxCheckpoints,
Collections.emptyList(),
sharedStateRegistryFactory.create(
- ioExecutor,
Collections.emptyList()));
+ ioExecutor,
+ Collections.emptyList(),
+ RestoreMode.DEFAULT));
}));
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
index 8a226fed8aa..cedcfc3d435 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
@@ -32,6 +32,7 @@ import
org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import
org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
@@ -622,7 +623,8 @@ public class CheckpointResourcesCleanupRunnerTest {
JobID jobId,
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
- Executor ioExecutor)
+ Executor ioExecutor,
+ RestoreMode restoreMode)
throws Exception {
creationLatch.await();
return completedCheckpointStore;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index dfc3b40da07..9e21dd42e08 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -32,6 +32,7 @@ import
org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -78,7 +79,8 @@ public class SchedulerUtilsTest extends TestLogger {
new StandaloneCheckpointRecoveryFactory(),
Executors.directExecutor(),
log,
- new JobID());
+ new JobID(),
+ RestoreMode.CLAIM);
assertEquals(
maxNumberOfCheckpointsToRetain,
@@ -104,7 +106,8 @@ public class SchedulerUtilsTest extends TestLogger {
recoveryFactory,
Executors.directExecutor(),
log,
- new JobID());
+ new JobID(),
+ RestoreMode.CLAIM);
SharedStateRegistry sharedStateRegistry =
checkpointStore.getSharedStateRegistry();
@@ -122,12 +125,14 @@ public class SchedulerUtilsTest extends TestLogger {
JobID jobId,
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
- Executor ioExecutor) {
+ Executor ioExecutor,
+ RestoreMode restoreMode) {
List<CompletedCheckpoint> checkpoints =
singletonList(checkpoint);
return new EmbeddedCompletedCheckpointStore(
maxNumberOfCheckpointsToRetain,
checkpoints,
- sharedStateRegistryFactory.create(ioExecutor,
checkpoints));
+ sharedStateRegistryFactory.create(
+ ioExecutor, checkpoints, RestoreMode.DEFAULT));
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 678eb2bac1f..4392a4ba67b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -57,6 +57,7 @@ import java.util.function.Predicate;
import java.util.stream.Stream;
import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.jupiter.api.Assertions.fail;
/** This class contains auxiliary methods for unit tests. */
@@ -363,18 +364,31 @@ public class CommonTestUtils {
Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
}
- /** Wait for at least one successful checkpoint. */
- public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster,
Deadline timeout)
+ /** Wait for (at least) the given number of successful checkpoints. */
+ public static void waitForCheckpoint(
+ JobID jobID, MiniCluster miniCluster, Deadline timeout, int
numCheckpoints)
throws Exception, FlinkJobNotFoundException {
waitUntilCondition(
- () ->
- Optional.ofNullable(
- miniCluster
- .getExecutionGraph(jobID)
- .get()
- .getCheckpointStatsSnapshot())
- .filter(st ->
st.getCounts().getNumberOfCompletedCheckpoints() > 0)
- .isPresent(),
+ () -> {
+ AccessExecutionGraph graph =
miniCluster.getExecutionGraph(jobID).get();
+ if (Optional.ofNullable(graph.getCheckpointStatsSnapshot())
+ .filter(
+ st ->
+
st.getCounts().getNumberOfCompletedCheckpoints()
+ >= numCheckpoints)
+ .isPresent()) {
+ return true;
+ } else if (graph.getState().isGloballyTerminalState()) {
+ checkState(
+ graph.getFailureInfo() != null,
+ "Job terminated before taking required %s
checkpoints: %s",
+ numCheckpoints,
+ graph.getState());
+ throw graph.getFailureInfo().getException();
+ } else {
+ return false;
+ }
+ },
timeout);
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index eeda1b08b32..03dfa55cf22 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -18,25 +18,26 @@
package org.apache.flink.test.checkpointing;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -51,18 +52,18 @@ import org.apache.curator.test.TestingServer;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Optional;
+import java.time.Duration;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Stream;
+import static
org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint;
+import static org.apache.flink.test.util.TestUtils.waitUntilJobCanceled;
import static org.junit.Assert.assertNotNull;
/**
@@ -74,26 +75,42 @@ import static org.junit.Assert.assertNotNull;
* <p>This tests considers full and incremental checkpoints and was introduced
to guard against
* problems like FLINK-6964.
*/
+@RunWith(Parameterized.class)
public class ResumeCheckpointManuallyITCase extends TestLogger {
private static final int PARALLELISM = 2;
private static final int NUM_TASK_MANAGERS = 2;
private static final int SLOTS_PER_TASK_MANAGER = 2;
+ @Parameterized.Parameter public RestoreMode restoreMode;
+
+ @Parameterized.Parameters(name = "RestoreMode = {0}")
+ public static Object[] parameters() {
+ return RestoreMode.values();
+ }
+
@ClassRule public static TemporaryFolder temporaryFolder = new
TemporaryFolder();
@Test
public void testExternalizedIncrementalRocksDBCheckpointsStandalone()
throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
- checkpointDir, null, createRocksDBStateBackend(checkpointDir,
true), false);
+ checkpointDir,
+ null,
+ createRocksDBStateBackend(checkpointDir, true),
+ false,
+ restoreMode);
}
@Test
public void testExternalizedFullRocksDBCheckpointsStandalone() throws
Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
- checkpointDir, null, createRocksDBStateBackend(checkpointDir,
false), false);
+ checkpointDir,
+ null,
+ createRocksDBStateBackend(checkpointDir, false),
+ false,
+ restoreMode);
}
@Test
@@ -101,7 +118,11 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
- checkpointDir, null, createRocksDBStateBackend(checkpointDir,
true), true);
+ checkpointDir,
+ null,
+ createRocksDBStateBackend(checkpointDir, true),
+ true,
+ restoreMode);
}
@Test
@@ -109,20 +130,25 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
- checkpointDir, null, createRocksDBStateBackend(checkpointDir,
false), true);
+ checkpointDir,
+ null,
+ createRocksDBStateBackend(checkpointDir, false),
+ true,
+ restoreMode);
}
@Test
public void testExternalizedFSCheckpointsStandalone() throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
- checkpointDir, null, createFsStateBackend(checkpointDir),
false);
+ checkpointDir, null, createFsStateBackend(checkpointDir),
false, restoreMode);
}
@Test
public void testExternalizedFSCheckpointsWithLocalRecoveryStandalone()
throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
- testExternalizedCheckpoints(checkpointDir, null,
createFsStateBackend(checkpointDir), true);
+ testExternalizedCheckpoints(
+ checkpointDir, null, createFsStateBackend(checkpointDir),
true, restoreMode);
}
@Test
@@ -135,7 +161,8 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
checkpointDir,
zkServer.getConnectString(),
createRocksDBStateBackend(checkpointDir, true),
- false);
+ false,
+ restoreMode);
} finally {
zkServer.stop();
}
@@ -151,7 +178,8 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
checkpointDir,
zkServer.getConnectString(),
createRocksDBStateBackend(checkpointDir, false),
- false);
+ false,
+ restoreMode);
} finally {
zkServer.stop();
}
@@ -168,7 +196,8 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
checkpointDir,
zkServer.getConnectString(),
createRocksDBStateBackend(checkpointDir, true),
- true);
+ true,
+ restoreMode);
} finally {
zkServer.stop();
}
@@ -185,7 +214,8 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
checkpointDir,
zkServer.getConnectString(),
createRocksDBStateBackend(checkpointDir, false),
- true);
+ true,
+ restoreMode);
} finally {
zkServer.stop();
}
@@ -201,7 +231,8 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
checkpointDir,
zkServer.getConnectString(),
createFsStateBackend(checkpointDir),
- false);
+ false,
+ restoreMode);
} finally {
zkServer.stop();
}
@@ -217,7 +248,8 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
checkpointDir,
zkServer.getConnectString(),
createFsStateBackend(checkpointDir),
- true);
+ true,
+ restoreMode);
} finally {
zkServer.stop();
}
@@ -233,8 +265,12 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
return new RocksDBStateBackend(checkpointDir.toURI().toString(),
incrementalCheckpointing);
}
- private void testExternalizedCheckpoints(
- File checkpointDir, String zooKeeperQuorum, StateBackend backend,
boolean localRecovery)
+ private static void testExternalizedCheckpoints(
+ File checkpointDir,
+ String zooKeeperQuorum,
+ StateBackend backend,
+ boolean localRecovery,
+ RestoreMode restoreMode)
throws Exception {
final Configuration config = new Configuration();
@@ -270,22 +306,28 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
cluster.before();
- ClusterClient<?> client = cluster.getClusterClient();
-
try {
// main test sequence: start job -> eCP -> restore job -> eCP ->
restore job
String firstExternalCheckpoint =
- runJobAndGetExternalizedCheckpoint(backend, checkpointDir,
null, client);
+ runJobAndGetExternalizedCheckpoint(backend, null, cluster,
restoreMode);
assertNotNull(firstExternalCheckpoint);
String secondExternalCheckpoint =
runJobAndGetExternalizedCheckpoint(
- backend, checkpointDir, firstExternalCheckpoint,
client);
+ backend, firstExternalCheckpoint, cluster,
restoreMode);
assertNotNull(secondExternalCheckpoint);
String thirdExternalCheckpoint =
runJobAndGetExternalizedCheckpoint(
- backend, checkpointDir, secondExternalCheckpoint,
client);
+ backend,
+ // in CLAIM mode, the previous run is only
guaranteed to preserve the
+ // latest checkpoint; in NO_CLAIM/LEGACY, even the
initial checkpoints
+ // must remain valid
+ restoreMode == RestoreMode.CLAIM
+ ? secondExternalCheckpoint
+ : firstExternalCheckpoint,
+ cluster,
+ restoreMode);
assertNotNull(thirdExternalCheckpoint);
} finally {
cluster.after();
@@ -294,77 +336,35 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
private static String runJobAndGetExternalizedCheckpoint(
StateBackend backend,
- File checkpointDir,
@Nullable String externalCheckpoint,
- ClusterClient<?> client)
+ MiniClusterWithClientResource cluster,
+ RestoreMode restoreMode)
throws Exception {
- JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint);
+ JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint,
restoreMode);
NotifyingInfiniteTupleSource.countDownLatch = new
CountDownLatch(PARALLELISM);
-
- client.submitJob(initialJobGraph).get();
+ cluster.getClusterClient().submitJob(initialJobGraph).get();
// wait until all sources have been started
NotifyingInfiniteTupleSource.countDownLatch.await();
- waitUntilExternalizedCheckpointCreated(checkpointDir,
initialJobGraph.getJobID());
- client.cancel(initialJobGraph.getJobID()).get();
- waitUntilCanceled(initialJobGraph.getJobID(), client);
-
- return getExternalizedCheckpointCheckpointPath(checkpointDir,
initialJobGraph.getJobID());
- }
-
- private static String getExternalizedCheckpointCheckpointPath(File
checkpointDir, JobID jobId)
- throws IOException {
- Optional<Path> checkpoint = findExternalizedCheckpoint(checkpointDir,
jobId);
- if (!checkpoint.isPresent()) {
- throw new AssertionError("No complete checkpoint could be found.");
- } else {
- return checkpoint.get().toString();
- }
- }
-
- private static void waitUntilExternalizedCheckpointCreated(File
checkpointDir, JobID jobId)
- throws InterruptedException, IOException {
- while (true) {
- Thread.sleep(50);
- Optional<Path> externalizedCheckpoint =
- findExternalizedCheckpoint(checkpointDir, jobId);
- if (externalizedCheckpoint.isPresent()) {
- break;
- }
- }
- }
-
- private static Optional<Path> findExternalizedCheckpoint(File
checkpointDir, JobID jobId)
- throws IOException {
- try (Stream<Path> checkpoints =
- Files.list(checkpointDir.toPath().resolve(jobId.toString()))) {
- return checkpoints
- .filter(path ->
path.getFileName().toString().startsWith("chk-"))
- .filter(
- path -> {
- try (Stream<Path> checkpointFiles =
Files.list(path)) {
- return checkpointFiles.anyMatch(
- child ->
- child.getFileName()
- .toString()
- .contains("meta"));
- } catch (IOException ignored) {
- return false;
- }
- })
- .findAny();
- }
- }
-
- private static void waitUntilCanceled(JobID jobId, ClusterClient<?> client)
- throws ExecutionException, InterruptedException {
- while (client.getJobStatus(jobId).get() != JobStatus.CANCELED) {
- Thread.sleep(50);
- }
+ // complete at least two checkpoints so that the initial
checkpoint can be subsumed
+ waitForCheckpoint(
+ initialJobGraph.getJobID(),
+ cluster.getMiniCluster(),
+ Deadline.fromNow(Duration.ofMinutes(5)),
+ 2);
+ cluster.getClusterClient().cancel(initialJobGraph.getJobID()).get();
+ waitUntilJobCanceled(initialJobGraph.getJobID(),
cluster.getClusterClient());
+ return CommonTestUtils.getLatestCompletedCheckpointPath(
+ initialJobGraph.getJobID(), cluster.getMiniCluster())
+ .<IllegalStateException>orElseThrow(
+ () -> {
+ throw new IllegalStateException("Checkpoint not
generated");
+ });
}
- private static JobGraph getJobGraph(StateBackend backend, @Nullable String
externalCheckpoint) {
+ private static JobGraph getJobGraph(
+ StateBackend backend, @Nullable String externalCheckpoint,
RestoreMode restoreMode) {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
@@ -373,6 +373,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
env.getCheckpointConfig()
.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ env.setRestartStrategy(RestartStrategies.noRestart());
env.addSource(new NotifyingInfiniteTupleSource(10_000))
.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
@@ -388,7 +389,7 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
// recover from previous iteration?
if (externalCheckpoint != null) {
jobGraph.setSavepointRestoreSettings(
- SavepointRestoreSettings.forPath(externalCheckpoint));
+ SavepointRestoreSettings.forPath(externalCheckpoint,
false, restoreMode));
}
return jobGraph;
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
index ef4052c76a9..d4791ac2e63 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
@@ -148,7 +148,8 @@ public class ChangelogCompatibilityITCase {
waitForCheckpoint(
jobGraph.getJobID(),
miniClusterResource.getMiniCluster(),
- Deadline.fromNow(Duration.ofMinutes(5)));
+ Deadline.fromNow(Duration.ofMinutes(5)),
+ 1);
client.cancel(jobGraph.getJobID()).get();
// obtain the latest checkpoint *after* cancellation - that one
won't be subsumed
return CommonTestUtils.getLatestCompletedCheckpointPath(
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
index b3f70047537..c56b233d2e2 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
@@ -327,7 +327,8 @@ public class ChangelogRescalingITCase extends TestLogger {
}
private String checkpointAndCancel(JobID jobID) throws Exception {
- waitForCheckpoint(jobID, cluster.getMiniCluster(),
Deadline.fromNow(Duration.ofMinutes(5)));
+ waitForCheckpoint(
+ jobID, cluster.getMiniCluster(),
Deadline.fromNow(Duration.ofMinutes(5)), 1);
cluster.getClusterClient().cancel(jobID).get();
checkStatus(jobID);
return CommonTestUtils.getLatestCompletedCheckpointPath(jobID,
cluster.getMiniCluster())
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
index 033f1780a29..af58c57aad0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
@@ -19,6 +19,7 @@
package org.apache.flink.test.util;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.core.execution.JobClient;
@@ -43,6 +44,7 @@ import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Comparator;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import static
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_DIR_PREFIX;
import static
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.METADATA_FILE_NAME;
@@ -165,4 +167,11 @@ public class TestUtils {
return false; // should never happen
}
}
+
+ public static void waitUntilJobCanceled(JobID jobId, ClusterClient<?>
client)
+ throws ExecutionException, InterruptedException {
+ while (client.getJobStatus(jobId).get() != JobStatus.CANCELED) {
+ Thread.sleep(50);
+ }
+ }
}