This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4492ac53af74a6a591f6b87a869742df11e7e576 Author: Piotr Nowojski <[email protected]> AuthorDate: Thu May 14 20:34:22 2020 +0200 [FLINK-17258][network][test] Run ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned checkpoints This test needs many concurent checkpoint & savepoints and this is currently not supported with unaligned checkpoints. --- .../java/org/apache/flink/test/classloading/ClassLoaderITCase.java | 4 +++- .../org/apache/flink/test/classloading/jar/CustomKvStateProgram.java | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index 6ea054a..974512c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -313,7 +313,9 @@ public class ClassLoaderITCase extends TestLogger { String.valueOf(parallelism), checkpointDir.toURI().toString(), "5000", - outputDir.toURI().toString()}) + outputDir.toURI().toString(), + "false" // Disable unaligned checkpoints as this test is triggering concurrent savepoints/checkpoints + }) .build(); TestStreamEnvironment.setAsContext( diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java index 954b8df..d6f4aa1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.InfiniteIntegerSource; import org.apache.flink.util.Collector; +import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; /** @@ -47,10 +48,12 @@ public class CustomKvStateProgram { final String checkpointPath = args[1]; final int checkpointingInterval = Integer.parseInt(args[2]); final String outputPath = args[3]; + final Optional<Boolean> unalignedCheckpoints = args.length > 4 ? Optional.of(Boolean.parseBoolean(args[4])) : Optional.empty(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); - env.enableCheckpointing(checkpointingInterval); + env.enableCheckpointing(checkpointingInterval); + unalignedCheckpoints.ifPresent(value -> env.getCheckpointConfig().enableUnalignedCheckpoints(value)); env.setStateBackend(new FsStateBackend(checkpointPath)); DataStream<Integer> source = env.addSource(new InfiniteIntegerSource());
