This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4492ac53af74a6a591f6b87a869742df11e7e576
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu May 14 20:34:22 2020 +0200

    [FLINK-17258][network][test] Run 
ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned 
checkpoints
    
    This test needs many concurent checkpoint & savepoints and this is 
currently not supported with unaligned checkpoints.
---
 .../java/org/apache/flink/test/classloading/ClassLoaderITCase.java   | 4 +++-
 .../org/apache/flink/test/classloading/jar/CustomKvStateProgram.java | 5 ++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 6ea054a..974512c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -313,7 +313,9 @@ public class ClassLoaderITCase extends TestLogger {
                                String.valueOf(parallelism),
                                checkpointDir.toURI().toString(),
                                "5000",
-                               outputDir.toURI().toString()})
+                               outputDir.toURI().toString(),
+                               "false" // Disable unaligned checkpoints as 
this test is triggering concurrent savepoints/checkpoints
+                       })
                        .build();
 
                TestStreamEnvironment.setAsContext(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index 954b8df..d6f4aa1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.InfiniteIntegerSource;
 import org.apache.flink.util.Collector;
 
+import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
@@ -47,10 +48,12 @@ public class CustomKvStateProgram {
                final String checkpointPath = args[1];
                final int checkpointingInterval = Integer.parseInt(args[2]);
                final String outputPath = args[3];
+               final Optional<Boolean> unalignedCheckpoints = args.length > 4 
? Optional.of(Boolean.parseBoolean(args[4])) : Optional.empty();
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(parallelism);
-                               env.enableCheckpointing(checkpointingInterval);
+               env.enableCheckpointing(checkpointingInterval);
+               unalignedCheckpoints.ifPresent(value -> 
env.getCheckpointConfig().enableUnalignedCheckpoints(value));
                env.setStateBackend(new FsStateBackend(checkpointPath));
 
                DataStream<Integer> source = env.addSource(new 
InfiniteIntegerSource());

Reply via email to