This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6b4d170b7872b1fe3451ebe7b675a12f3343a82a Author: Arvid Heise <[email protected]> AuthorDate: Mon Jun 22 11:51:42 2020 +0200 [FLINK-18403][checkpointing] Ensure that unaligned checkpointing is only activated for EXACTLY_ONCE. --- .../api/graph/StreamingJobGraphGenerator.java | 7 ++++- .../streaming/runtime/io/InputProcessorUtil.java | 4 +++ .../api/graph/StreamingJobGraphGeneratorTest.java | 32 ++++++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 9022eaa..0e0b4b6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -224,6 +224,11 @@ public class StreamingJobGraphGenerator { } } } + + if (checkpointConfig.isUnalignedCheckpointsEnabled() && getCheckpointingMode(checkpointConfig) != CheckpointingMode.EXACTLY_ONCE) { + LOG.warn("Unaligned checkpoints can only be used with checkpointing mode EXACTLY_ONCE"); + checkpointConfig.enableUnalignedCheckpoints(false); + } } private void setPhysicalEdges() { @@ -500,8 +505,8 @@ public class StreamingJobGraphGenerator { config.setStateBackend(streamGraph.getStateBackend()); config.setCheckpointingEnabled(checkpointCfg.isCheckpointingEnabled()); - config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled()); config.setCheckpointMode(getCheckpointingMode(checkpointCfg)); + config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled()); for (int i = 0; i < vertex.getStatePartitioners().length; i++) { config.setStatePartitioner(i, vertex.getStatePartitioners()[i]); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java index 3ed8584..4969edc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java @@ -111,6 +111,10 @@ public class InputProcessorUtil { } return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates); case AT_LEAST_ONCE: + if (config.isUnalignedCheckpointsEnabled()) { + throw new IllegalStateException("Cannot use unaligned checkpoints with AT_LEAST_ONCE " + + "checkpointing mode"); + } int numInputChannels = Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum(); return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint); default: diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index eca6883..42edc70 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -193,6 +193,38 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { } @Test + public void testEnabledUnalignedCheckAndDisabledCheckpointing() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(0).print(); + StreamGraph streamGraph = env.getStreamGraph(); + assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled()); + env.getCheckpointConfig().enableUnalignedCheckpoints(true); + + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + + List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration()); + assertEquals(CheckpointingMode.AT_LEAST_ONCE, streamConfig.getCheckpointMode()); + assertFalse(streamConfig.isUnalignedCheckpointsEnabled()); + } + + @Test + public void testUnalignedCheckAndAtLeastOnce() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(0).print(); + StreamGraph streamGraph = env.getStreamGraph(); + env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); + env.getCheckpointConfig().enableUnalignedCheckpoints(true); + + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + + List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration()); + assertEquals(CheckpointingMode.AT_LEAST_ONCE, streamConfig.getCheckpointMode()); + assertFalse(streamConfig.isUnalignedCheckpointsEnabled()); + } + + @Test public void generatorForwardsSavepointRestoreSettings() { StreamGraph streamGraph = new StreamGraph( new ExecutionConfig(),
