fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r895296077


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void 
testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws 
Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 
100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), 
miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 
100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), 
miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 
100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);

Review Comment:
   This is done by making **the interval of materialization**  very long to 
ensure that materialization does not happen, to make the old private state 
still in use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to