fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r895303274
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void
testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
}
+ @Test
+ public void testSwitchFromDisablingToEnablingInClaimMode() throws
Exception {
+ File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ MiniCluster miniCluster = cluster.getMiniCluster();
+ StreamExecutionEnvironment env1 =
+ getEnv(delegatedStateBackend, firstCheckpointFolder, false,
100, 600000);
+ JobGraph firstJobGraph = buildJobGraph(env1);
+
+ miniCluster.submitJob(firstJobGraph).get();
+ waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+ miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+ String firstRestorePath =
+ getLatestCompletedCheckpointPath(firstJobGraph.getJobID(),
miniCluster).get();
+
+ // 1st restore, switch from disable to enable
+ File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env2 =
+ getEnv(delegatedStateBackend, secondCheckpointFolder, true,
100, 60000);
+ JobGraph secondJobGraph = buildJobGraph(env2);
+ setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+ miniCluster.submitJob(secondJobGraph).get();
+ waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+ miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+ String secondRestorePath =
+ getLatestCompletedCheckpointPath(secondJobGraph.getJobID(),
miniCluster).get();
+
+ // 2nd restore, private state of first restore checkpoint still exist.
+ File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env3 =
+ getEnv(delegatedStateBackend, thirdCheckpointFolder, true,
100, 100);
+ JobGraph thirdJobGraph = buildJobGraph(env3);
+ setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+ miniCluster.submitJob(thirdJobGraph).get();
+ waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+ Thread.sleep(500);
+ miniCluster.triggerCheckpoint(thirdJobGraph.getJobID()).get();
+ miniCluster.cancelJob(thirdJobGraph.getJobID()).get();
+ }
+
+ @Test
+ public void testCheckpointFolderDeletion() throws Exception {
+ File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ MiniCluster miniCluster = cluster.getMiniCluster();
+ StreamExecutionEnvironment env1 =
+ getEnv(delegatedStateBackend, firstCheckpointFolder, false,
100, 600000);
+ JobGraph firstJobGraph = buildJobGraph(env1);
+
+ miniCluster.submitJob(firstJobGraph).get();
+ waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+ miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+ miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+ String firstRestorePath =
+ getLatestCompletedCheckpointPath(firstJobGraph.getJobID(),
miniCluster).get();
+
+ // cancel after next materialization
+ File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+ StreamExecutionEnvironment env2 =
+ getEnv(delegatedStateBackend, secondCheckpointFolder, true,
100, 100);
+ JobGraph secondJobGraph = buildJobGraph(env2);
+ setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+ miniCluster.submitJob(secondJobGraph).get();
+ waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+ Thread.sleep(1000);
+ miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+ miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+ assertFalse(checkpointFolderExists(firstRestorePath.substring(5)));
Review Comment:
> 2. Is it guaranteed that the folder is cleaned up by the time
`cancelJob.get` returns? If not, the assertion might be flaky
the folder may be cleaned up by the time `checkpoint subsumption` or
`shutdown`. you are right, the assertion might be flaky. This test is not
stable, so I delete it. And folder deletion can be tested by
`CheckpointsCleanerTest#testCleanSubsumedCheckpointNormal`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]