This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dd8c4855e24 [FLINK-32483][tests] Ensure 
RescaleCheckpointManuallyITCase always runs against aligned checkpoints.
dd8c4855e24 is described below

commit dd8c4855e24cfe4cabdcc60f8cb3fc0b268f1178
Author: David Moravek <[email protected]>
AuthorDate: Wed Oct 2 16:55:02 2024 +0200

    [FLINK-32483][tests] Ensure RescaleCheckpointManuallyITCase always runs 
against aligned checkpoints.
---
 .../RescaleCheckpointManuallyITCase.java           | 28 ++++++++++++----------
 1 file changed, 16 insertions(+), 12 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
index 2f09c022021..e85cfd8bdb6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExternalizedCheckpointRetention;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -39,8 +40,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
-import org.apache.flink.streaming.util.CheckpointStorageUtils;
-import org.apache.flink.streaming.util.RestartStrategyUtils;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.testutils.junit.SharedObjects;
 import org.apache.flink.testutils.junit.SharedReference;
@@ -166,9 +165,6 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
                             miniCluster);
             miniCluster.submitJob(jobGraph).get();
             miniCluster.requestJobResult(jobGraph.getJobID()).get();
-            // The elements may not all be sent to sink when unaligned 
checkpoints enabled(refer to
-            // FLINK-26882 for more details).
-            // Don't verify current state here.
             return getLatestCompletedCheckpointPath(jobGraph.getJobID(), 
miniCluster)
                     .orElseThrow(
                             () ->
@@ -233,18 +229,26 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
             int checkpointingInterval,
             MiniCluster miniCluster)
             throws IOException {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final Configuration configuration = new Configuration();
+        configuration.set(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+                temporaryFolder.newFolder().toURI().toString());
+        configuration.set(
+                CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+                ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
+        // Force Aligned Checkpoints. This is necessary to prevent test 
environment randomization
+        // from overriding it. The elements may not all be sent to sink when 
unaligned checkpoints
+        // enabled(refer to FLINK-26882 for more details).
+        configuration.set(CheckpointingOptions.ENABLE_UNALIGNED, false);
+        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
+
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         env.setParallelism(parallelism);
         if (0 < maxParallelism) {
             env.getConfig().setMaxParallelism(maxParallelism);
         }
         env.enableCheckpointing(checkpointingInterval);
-        CheckpointStorageUtils.configureFileSystemCheckpointStorage(
-                env, temporaryFolder.newFolder().toURI());
-        env.getCheckpointConfig()
-                .setExternalizedCheckpointRetention(
-                        
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
-        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.getConfig().setUseSnapshotCompression(true);
 
         SharedReference<JobID> jobID = sharedObjects.add(new JobID());

Reply via email to