rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r894476700
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -50,6 +56,9 @@ public class CheckpointsCleaner implements Serializable,
AutoCloseableAsync {
@Nullable
private CompletableFuture<Void> cleanUpFuture;
+ /** All subsumed checkpoints. */
+ private final Map<Long, CompletedCheckpoint> subsumedCheckpoints = new
HashMap<>();
Review Comment:
Looks like this field can be a `List` now?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
}
+ public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint)
{
+ synchronized (subsumedCheckpoints) {
+ subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(),
completedCheckpoint);
+ }
+ }
+
+ public void cleanSubsumedCheckpoints(
+ long upTo, Set<Long> stillInUse, Runnable postCleanAction,
Executor executor) {
Review Comment:
Could you please add javadoc to this method?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -57,12 +59,13 @@ public interface SharedStateRegistry extends AutoCloseable {
*/
StreamStateHandle registerReference(
SharedStateRegistryKey registrationKey, StreamStateHandle state,
long checkpointID);
+
/**
* Unregister state that is not referenced by the given checkpoint ID or
any newer.
*
* @param lowestCheckpointID which is still valid
*/
- void unregisterUnusedState(long lowestCheckpointID);
+ Set<Long> unregisterUnusedState(long lowestCheckpointID);
Review Comment:
Could you please update javadoc and describe what does this method returns
now?
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void
testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
}
+ @Test
+ public void testSwitchFromDisablingToEnablingInClaimMode() throws
Exception {
+ File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ MiniCluster miniCluster = cluster.getMiniCluster();
+ StreamExecutionEnvironment env1 =
+ getEnv(delegatedStateBackend, firstCheckpointFolder, false,
100, 600000);
+ JobGraph firstJobGraph = buildJobGraph(env1);
+
+ miniCluster.submitJob(firstJobGraph).get();
+ waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+ miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+ String firstRestorePath =
+ getLatestCompletedCheckpointPath(firstJobGraph.getJobID(),
miniCluster).get();
+
+ // 1st restore, switch from disable to enable
+ File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env2 =
+ getEnv(delegatedStateBackend, secondCheckpointFolder, true,
100, 60000);
+ JobGraph secondJobGraph = buildJobGraph(env2);
+ setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+ miniCluster.submitJob(secondJobGraph).get();
+ waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+ miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+ String secondRestorePath =
+ getLatestCompletedCheckpointPath(secondJobGraph.getJobID(),
miniCluster).get();
+
+ // 2nd restore, private state of first restore checkpoint still exist.
+ File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env3 =
+ getEnv(delegatedStateBackend, thirdCheckpointFolder, true,
100, 100);
+ JobGraph thirdJobGraph = buildJobGraph(env3);
+ setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+ miniCluster.submitJob(thirdJobGraph).get();
+ waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+ Thread.sleep(500);
+ miniCluster.triggerCheckpoint(thirdJobGraph.getJobID()).get();
+ miniCluster.cancelJob(thirdJobGraph.getJobID()).get();
+ }
+
+ @Test
+ public void testCheckpointFolderDeletion() throws Exception {
+ File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ MiniCluster miniCluster = cluster.getMiniCluster();
+ StreamExecutionEnvironment env1 =
+ getEnv(delegatedStateBackend, firstCheckpointFolder, false,
100, 600000);
+ JobGraph firstJobGraph = buildJobGraph(env1);
+
+ miniCluster.submitJob(firstJobGraph).get();
+ waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+ miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+ String firstRestorePath =
+ getLatestCompletedCheckpointPath(firstJobGraph.getJobID(),
miniCluster).get();
+
+ // cancel after next materialization
+ File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env2 =
+ getEnv(delegatedStateBackend, secondCheckpointFolder, true,
100, 100);
+ JobGraph secondJobGraph = buildJobGraph(env2);
+ setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+ miniCluster.submitJob(secondJobGraph).get();
+ waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+ Thread.sleep(1000);
Review Comment:
Could you please explain why this `sleep` is needed?
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void
testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
}
+ @Test
+ public void testSwitchFromDisablingToEnablingInClaimMode() throws
Exception {
+ File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ MiniCluster miniCluster = cluster.getMiniCluster();
+ StreamExecutionEnvironment env1 =
+ getEnv(delegatedStateBackend, firstCheckpointFolder, false,
100, 600000);
Review Comment:
This test relies on materialization, but the interval is quite big and might
prevent it from happening - should it be decreased?
And ideally, we should verify the presense of the materialized part -
similar to `ChangelogPeriodicMaterializationITCase`.
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void
testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
}
+ @Test
+ public void testSwitchFromDisablingToEnablingInClaimMode() throws
Exception {
+ File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ MiniCluster miniCluster = cluster.getMiniCluster();
+ StreamExecutionEnvironment env1 =
+ getEnv(delegatedStateBackend, firstCheckpointFolder, false,
100, 600000);
+ JobGraph firstJobGraph = buildJobGraph(env1);
+
+ miniCluster.submitJob(firstJobGraph).get();
+ waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+ miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+ String firstRestorePath =
+ getLatestCompletedCheckpointPath(firstJobGraph.getJobID(),
miniCluster).get();
+
+ // 1st restore, switch from disable to enable
+ File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env2 =
+ getEnv(delegatedStateBackend, secondCheckpointFolder, true,
100, 60000);
+ JobGraph secondJobGraph = buildJobGraph(env2);
+ setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+ miniCluster.submitJob(secondJobGraph).get();
+ waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+ miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+ String secondRestorePath =
+ getLatestCompletedCheckpointPath(secondJobGraph.getJobID(),
miniCluster).get();
+
+ // 2nd restore, private state of first restore checkpoint still exist.
+ File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env3 =
+ getEnv(delegatedStateBackend, thirdCheckpointFolder, true,
100, 100);
+ JobGraph thirdJobGraph = buildJobGraph(env3);
+ setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+ miniCluster.submitJob(thirdJobGraph).get();
+ waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+ Thread.sleep(500);
Review Comment:
Could you please explain why this `sleep` is needed?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
}
+ public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint)
{
+ synchronized (subsumedCheckpoints) {
+ subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(),
completedCheckpoint);
+ }
+ }
+
+ public void cleanSubsumedCheckpoints(
+ long upTo, Set<Long> stillInUse, Runnable postCleanAction,
Executor executor) {
+ synchronized (subsumedCheckpoints) {
+ Iterator<CompletedCheckpoint> iterator =
subsumedCheckpoints.values().iterator();
+ while (iterator.hasNext()) {
+ CompletedCheckpoint checkpoint = iterator.next();
+ if (checkpoint.getCheckpointID() < upTo
+ && !stillInUse.contains(checkpoint.getCheckpointID()))
{
+ try {
+ LOG.debug("Try to discard checkpoint {}.",
checkpoint.getCheckpointID());
+ cleanCheckpoint(
+ checkpoint,
+ checkpoint.shouldBeDiscardedOnSubsume(),
+ postCleanAction,
+ executor);
+ iterator.remove();
+ } catch (Exception e) {
+ LOG.warn("Fail to discard the old checkpoint {}.",
checkpoint);
+ }
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void cleanSubsumedCheckpointsWithException(
+ long upTo,
+ Set<Long> stillInUse,
+ Runnable postCleanAction,
+ Executor executor,
+ DiscardException discardException) {
Review Comment:
IIUC, this method is only used in test; and that test verifies this method
behavior. This doesn't make much sense to me, maybe we can just remove both?
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void
testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
}
+ @Test
+ public void testSwitchFromDisablingToEnablingInClaimMode() throws
Exception {
+ File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ MiniCluster miniCluster = cluster.getMiniCluster();
+ StreamExecutionEnvironment env1 =
+ getEnv(delegatedStateBackend, firstCheckpointFolder, false,
100, 600000);
+ JobGraph firstJobGraph = buildJobGraph(env1);
+
+ miniCluster.submitJob(firstJobGraph).get();
+ waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+ miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+ String firstRestorePath =
+ getLatestCompletedCheckpointPath(firstJobGraph.getJobID(),
miniCluster).get();
+
+ // 1st restore, switch from disable to enable
+ File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env2 =
+ getEnv(delegatedStateBackend, secondCheckpointFolder, true,
100, 60000);
+ JobGraph secondJobGraph = buildJobGraph(env2);
+ setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+ miniCluster.submitJob(secondJobGraph).get();
+ waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+ miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+ String secondRestorePath =
+ getLatestCompletedCheckpointPath(secondJobGraph.getJobID(),
miniCluster).get();
+
+ // 2nd restore, private state of first restore checkpoint still exist.
+ File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env3 =
+ getEnv(delegatedStateBackend, thirdCheckpointFolder, true,
100, 100);
+ JobGraph thirdJobGraph = buildJobGraph(env3);
+ setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+ miniCluster.submitJob(thirdJobGraph).get();
+ waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+ Thread.sleep(500);
+ miniCluster.triggerCheckpoint(thirdJobGraph.getJobID()).get();
+ miniCluster.cancelJob(thirdJobGraph.getJobID()).get();
+ }
+
+ @Test
+ public void testCheckpointFolderDeletion() throws Exception {
+ File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ MiniCluster miniCluster = cluster.getMiniCluster();
+ StreamExecutionEnvironment env1 =
+ getEnv(delegatedStateBackend, firstCheckpointFolder, false,
100, 600000);
+ JobGraph firstJobGraph = buildJobGraph(env1);
+
+ miniCluster.submitJob(firstJobGraph).get();
+ waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+ miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+ String firstRestorePath =
+ getLatestCompletedCheckpointPath(firstJobGraph.getJobID(),
miniCluster).get();
+
+ // cancel after next materialization
+ File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env2 =
+ getEnv(delegatedStateBackend, secondCheckpointFolder, true,
100, 100);
+ JobGraph secondJobGraph = buildJobGraph(env2);
+ setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+ miniCluster.submitJob(secondJobGraph).get();
+ waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+ Thread.sleep(1000);
+ miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+ miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+ assertFalse(checkpointFolderExists(firstRestorePath.substring(5)));
Review Comment:
1. Ideally, folder deletion should be tested also in case of checkpoint
subsumption.
2. Is it guaranteed that the folder is cleaned up by the time
`cancelJob.get` returns? If not, the assertion might be flaky
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void
testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
}
+ @Test
+ public void testSwitchFromDisablingToEnablingInClaimMode() throws
Exception {
+ File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ MiniCluster miniCluster = cluster.getMiniCluster();
+ StreamExecutionEnvironment env1 =
+ getEnv(delegatedStateBackend, firstCheckpointFolder, false,
100, 600000);
+ JobGraph firstJobGraph = buildJobGraph(env1);
+
+ miniCluster.submitJob(firstJobGraph).get();
+ waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+ miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+ String firstRestorePath =
+ getLatestCompletedCheckpointPath(firstJobGraph.getJobID(),
miniCluster).get();
+
+ // 1st restore, switch from disable to enable
+ File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env2 =
+ getEnv(delegatedStateBackend, secondCheckpointFolder, true,
100, 60000);
+ JobGraph secondJobGraph = buildJobGraph(env2);
+ setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+ miniCluster.submitJob(secondJobGraph).get();
+ waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+ miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+ String secondRestorePath =
+ getLatestCompletedCheckpointPath(secondJobGraph.getJobID(),
miniCluster).get();
+
+ // 2nd restore, private state of first restore checkpoint still exist.
+ File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env3 =
+ getEnv(delegatedStateBackend, thirdCheckpointFolder, true,
100, 100);
+ JobGraph thirdJobGraph = buildJobGraph(env3);
+ setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
Review Comment:
There's no guarantee that private state is still needed - it could be that
the delegated state backend materialized with completely new private state,
right? (just asking)
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
}
+ public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint)
{
+ synchronized (subsumedCheckpoints) {
+ subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(),
completedCheckpoint);
+ }
+ }
+
+ public void cleanSubsumedCheckpoints(
+ long upTo, Set<Long> stillInUse, Runnable postCleanAction,
Executor executor) {
+ synchronized (subsumedCheckpoints) {
+ Iterator<CompletedCheckpoint> iterator =
subsumedCheckpoints.values().iterator();
+ while (iterator.hasNext()) {
+ CompletedCheckpoint checkpoint = iterator.next();
+ if (checkpoint.getCheckpointID() < upTo
+ && !stillInUse.contains(checkpoint.getCheckpointID()))
{
+ try {
+ LOG.debug("Try to discard checkpoint {}.",
checkpoint.getCheckpointID());
+ cleanCheckpoint(
Review Comment:
I am concerned about `synchronized (subsumedCheckpoints)`:
`cleanCheckpoint` eventually calls `incrementNumberOfCheckpointsToClean`
which is synchronized on `this`. Synchronizing on a single object would make
reasoning easier and less error-prone, WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]