This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dd8c4855e24 [FLINK-32483][tests] Ensure
RescaleCheckpointManuallyITCase always runs against aligned checkpoints.
dd8c4855e24 is described below
commit dd8c4855e24cfe4cabdcc60f8cb3fc0b268f1178
Author: David Moravek <[email protected]>
AuthorDate: Wed Oct 2 16:55:02 2024 +0200
[FLINK-32483][tests] Ensure RescaleCheckpointManuallyITCase always runs
against aligned checkpoints.
---
.../RescaleCheckpointManuallyITCase.java | 28 ++++++++++++----------
1 file changed, 16 insertions(+), 12 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
index 2f09c022021..e85cfd8bdb6 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
+import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -39,8 +40,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
-import org.apache.flink.streaming.util.CheckpointStorageUtils;
-import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
@@ -166,9 +165,6 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
miniCluster);
miniCluster.submitJob(jobGraph).get();
miniCluster.requestJobResult(jobGraph.getJobID()).get();
- // The elements may not all be sent to sink when unaligned
checkpoints enabled(refer to
- // FLINK-26882 for more details).
- // Don't verify current state here.
return getLatestCompletedCheckpointPath(jobGraph.getJobID(),
miniCluster)
.orElseThrow(
() ->
@@ -233,18 +229,26 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
int checkpointingInterval,
MiniCluster miniCluster)
throws IOException {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ final Configuration configuration = new Configuration();
+ configuration.set(
+ CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+ temporaryFolder.newFolder().toURI().toString());
+ configuration.set(
+ CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+ ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
+ // Force Aligned Checkpoints. This is necessary to prevent test
environment randomization
+ // from overriding it. The elements may not all be sent to sink when
unaligned checkpoints
+ // enabled(refer to FLINK-26882 for more details).
+ configuration.set(CheckpointingOptions.ENABLE_UNALIGNED, false);
+ configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
+
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(parallelism);
if (0 < maxParallelism) {
env.getConfig().setMaxParallelism(maxParallelism);
}
env.enableCheckpointing(checkpointingInterval);
- CheckpointStorageUtils.configureFileSystemCheckpointStorage(
- env, temporaryFolder.newFolder().toURI());
- env.getCheckpointConfig()
- .setExternalizedCheckpointRetention(
-
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
- RestartStrategyUtils.configureNoRestartStrategy(env);
env.getConfig().setUseSnapshotCompression(true);
SharedReference<JobID> jobID = sharedObjects.add(new JobID());