This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6b4d170b7872b1fe3451ebe7b675a12f3343a82a
Author: Arvid Heise <[email protected]>
AuthorDate: Mon Jun 22 11:51:42 2020 +0200

    [FLINK-18403][checkpointing] Ensure that unaligned checkpointing is only 
activated for EXACTLY_ONCE.
---
 .../api/graph/StreamingJobGraphGenerator.java      |  7 ++++-
 .../streaming/runtime/io/InputProcessorUtil.java   |  4 +++
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 32 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 9022eaa..0e0b4b6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -224,6 +224,11 @@ public class StreamingJobGraphGenerator {
                                }
                        }
                }
+
+               if (checkpointConfig.isUnalignedCheckpointsEnabled() && 
getCheckpointingMode(checkpointConfig) != CheckpointingMode.EXACTLY_ONCE) {
+                       LOG.warn("Unaligned checkpoints can only be used with 
checkpointing mode EXACTLY_ONCE");
+                       checkpointConfig.enableUnalignedCheckpoints(false);
+               }
        }
 
        private void setPhysicalEdges() {
@@ -500,8 +505,8 @@ public class StreamingJobGraphGenerator {
 
                config.setStateBackend(streamGraph.getStateBackend());
                
config.setCheckpointingEnabled(checkpointCfg.isCheckpointingEnabled());
-               
config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
                config.setCheckpointMode(getCheckpointingMode(checkpointCfg));
+               
config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
 
                for (int i = 0; i < vertex.getStatePartitioners().length; i++) {
                        config.setStatePartitioner(i, 
vertex.getStatePartitioners()[i]);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 3ed8584..4969edc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -111,6 +111,10 @@ public class InputProcessorUtil {
                                }
                                return new CheckpointBarrierAligner(taskName, 
toNotifyOnCheckpoint, inputGates);
                        case AT_LEAST_ONCE:
+                               if (config.isUnalignedCheckpointsEnabled()) {
+                                       throw new IllegalStateException("Cannot 
use unaligned checkpoints with AT_LEAST_ONCE " +
+                                               "checkpointing mode");
+                               }
                                int numInputChannels = 
Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
                                return new 
CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint);
                        default:
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index eca6883..42edc70 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -193,6 +193,38 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
        }
 
        @Test
+       public void testEnabledUnalignedCheckAndDisabledCheckpointing() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements(0).print();
+               StreamGraph streamGraph = env.getStreamGraph();
+               assertFalse("Checkpointing enabled", 
streamGraph.getCheckpointConfig().isCheckpointingEnabled());
+               env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+
+               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               StreamConfig streamConfig = new 
StreamConfig(verticesSorted.get(0).getConfiguration());
+               assertEquals(CheckpointingMode.AT_LEAST_ONCE, 
streamConfig.getCheckpointMode());
+               assertFalse(streamConfig.isUnalignedCheckpointsEnabled());
+       }
+
+       @Test
+       public void testUnalignedCheckAndAtLeastOnce() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements(0).print();
+               StreamGraph streamGraph = env.getStreamGraph();
+               env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
+               env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+
+               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               StreamConfig streamConfig = new 
StreamConfig(verticesSorted.get(0).getConfiguration());
+               assertEquals(CheckpointingMode.AT_LEAST_ONCE, 
streamConfig.getCheckpointMode());
+               assertFalse(streamConfig.isUnalignedCheckpointsEnabled());
+       }
+
+       @Test
        public void generatorForwardsSavepointRestoreSettings() {
                StreamGraph streamGraph = new StreamGraph(
                                new ExecutionConfig(),

Reply via email to