This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f11452e6a29f15eb08aeb7ac4722691914cbe6fc
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Nov 11 12:29:09 2020 +0100

    [hotfix][checkpointing] Add preconditions to channels and controllers
---
 .../org/apache/flink/streaming/runtime/io/AlignedController.java | 3 +++
 .../apache/flink/streaming/runtime/io/UnalignedController.java   | 4 ++++
 .../flink/streaming/runtime/io/StreamTaskNetworkInputTest.java   | 2 +-
 .../runtime/io/UnalignedControllerCancellationTest.java          | 2 +-
 .../flink/streaming/runtime/io/UnalignedControllerTest.java      | 9 +++++++--
 5 files changed, 16 insertions(+), 4 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
index a2ef674..0991dc9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
@@ -33,6 +33,7 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -92,6 +93,7 @@ public class AlignedController implements 
CheckpointBarrierBehaviourController {
        public Optional<CheckpointBarrier> preProcessFirstBarrier(
                        InputChannelInfo channelInfo,
                        CheckpointBarrier barrier) {
+               
checkArgument(!barrier.getCheckpointOptions().isUnalignedCheckpoint(), 
"Unaligned barrier is not expected");
                return Optional.empty();
        }
 
@@ -99,6 +101,7 @@ public class AlignedController implements 
CheckpointBarrierBehaviourController {
        public Optional<CheckpointBarrier> postProcessLastBarrier(
                        InputChannelInfo channelInfo,
                        CheckpointBarrier barrier) throws IOException {
+               
checkState(!barrier.getCheckpointOptions().isUnalignedCheckpoint());
                resumeConsumption();
                return Optional.of(barrier);
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
index d19533e..6141918 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import 
org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
 import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -53,6 +54,7 @@ public class UnalignedController implements 
CheckpointBarrierBehaviourController
                        InputChannelInfo channelInfo,
                        CheckpointBarrier announcedBarrier,
                        int sequenceNumber) throws IOException {
+               Preconditions.checkState(announcedBarrier.isCheckpoint());
                inputs[channelInfo.getGateIdx()].convertToPriorityEvent(
                        channelInfo.getInputChannelIdx(),
                        sequenceNumber);
@@ -65,6 +67,7 @@ public class UnalignedController implements 
CheckpointBarrierBehaviourController
 
        @Override
        public Optional<CheckpointBarrier> 
preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) 
throws IOException, CheckpointException {
+               
Preconditions.checkArgument(barrier.getCheckpointOptions().isUnalignedCheckpoint(),
 "Aligned barrier not expected");
                checkpointCoordinator.initCheckpoint(barrier.getId(), 
barrier.getCheckpointOptions());
                for (final CheckpointableInput input : inputs) {
                        input.checkpointStarted(barrier);
@@ -74,6 +77,7 @@ public class UnalignedController implements 
CheckpointBarrierBehaviourController
 
        @Override
        public Optional<CheckpointBarrier> 
postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) 
{
+               // note that barrier can be aligned if checkpoint timed out in 
between; event is not converted
                resetPendingCheckpoint(barrier.getId());
                return Optional.empty();
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 5f7498f..236550c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -145,7 +145,7 @@ public class StreamTaskNetworkInputTest {
                        deserializers);
 
                inputGate.sendEvent(
-                       new CheckpointBarrier(checkpointId, 0L, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
+                       new CheckpointBarrier(checkpointId, 0L, 
CheckpointOptions.forCheckpointWithDefaultLocation().toTimeouted()),
                        channelId);
                inputGate.sendElement(new StreamRecord<>(42L), channelId);
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
index 316560d..47436c9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
@@ -102,7 +102,7 @@ public class UnalignedControllerCancellationTest {
        }
 
        private static CheckpointBarrier checkpoint(int checkpointId) {
-               return new CheckpointBarrier(checkpointId, 1, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+               return new CheckpointBarrier(checkpointId, 1, 
CheckpointOptions.forCheckpointWithDefaultLocation().toTimeouted());
        }
 
        private static CancelCheckpointMarker cancel(int checkpointId) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
index a365bf5..7604746 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -38,6 +39,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBui
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import 
org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
@@ -57,6 +59,9 @@ import java.util.Random;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNMENT_TIME_OUT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static 
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -615,7 +620,7 @@ public class UnalignedControllerTest {
                        new CheckpointBarrier(
                                checkpointId,
                                timestamp,
-                               
CheckpointOptions.forCheckpointWithDefaultLocation()),
+                               new CheckpointOptions(CHECKPOINT, getDefault(), 
true, true, NO_ALIGNMENT_TIME_OUT)),
                        new InputChannelInfo(0, channel));
        }
 
@@ -729,7 +734,7 @@ public class UnalignedControllerTest {
        }
 
        private CheckpointBarrier buildCheckpointBarrier(long id) {
-               return new CheckpointBarrier(id, 0, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+               return new CheckpointBarrier(id, 0, new 
CheckpointOptions(CHECKPOINT, getDefault(), true, true, NO_ALIGNMENT_TIME_OUT));
        }
 
        // 
------------------------------------------------------------------------

Reply via email to