This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f11452e6a29f15eb08aeb7ac4722691914cbe6fc Author: Roman Khachatryan <[email protected]> AuthorDate: Wed Nov 11 12:29:09 2020 +0100 [hotfix][checkpointing] Add preconditions to channels and controllers --- .../org/apache/flink/streaming/runtime/io/AlignedController.java | 3 +++ .../apache/flink/streaming/runtime/io/UnalignedController.java | 4 ++++ .../flink/streaming/runtime/io/StreamTaskNetworkInputTest.java | 2 +- .../runtime/io/UnalignedControllerCancellationTest.java | 2 +- .../flink/streaming/runtime/io/UnalignedControllerTest.java | 9 +++++++-- 5 files changed, 16 insertions(+), 4 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java index a2ef674..0991dc9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java @@ -33,6 +33,7 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkState; /** @@ -92,6 +93,7 @@ public class AlignedController implements CheckpointBarrierBehaviourController { public Optional<CheckpointBarrier> preProcessFirstBarrier( InputChannelInfo channelInfo, CheckpointBarrier barrier) { + checkArgument(!barrier.getCheckpointOptions().isUnalignedCheckpoint(), "Unaligned barrier is not expected"); return Optional.empty(); } @@ -99,6 +101,7 @@ public class AlignedController implements CheckpointBarrierBehaviourController { public Optional<CheckpointBarrier> postProcessLastBarrier( InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException { + checkState(!barrier.getCheckpointOptions().isUnalignedCheckpoint()); resumeConsumption(); return Optional.of(barrier); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java index d19533e..6141918 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput; import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator; +import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.Optional; @@ -53,6 +54,7 @@ public class UnalignedController implements CheckpointBarrierBehaviourController InputChannelInfo channelInfo, CheckpointBarrier announcedBarrier, int sequenceNumber) throws IOException { + Preconditions.checkState(announcedBarrier.isCheckpoint()); inputs[channelInfo.getGateIdx()].convertToPriorityEvent( channelInfo.getInputChannelIdx(), sequenceNumber); @@ -65,6 +67,7 @@ public class UnalignedController implements CheckpointBarrierBehaviourController @Override public Optional<CheckpointBarrier> preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { + Preconditions.checkArgument(barrier.getCheckpointOptions().isUnalignedCheckpoint(), "Aligned barrier not expected"); checkpointCoordinator.initCheckpoint(barrier.getId(), barrier.getCheckpointOptions()); for (final CheckpointableInput input : inputs) { input.checkpointStarted(barrier); @@ -74,6 +77,7 @@ public class UnalignedController implements CheckpointBarrierBehaviourController @Override public Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) { + // note that barrier can be aligned if checkpoint timed out in between; event is not converted resetPendingCheckpoint(barrier.getId()); return Optional.empty(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java index 5f7498f..236550c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java @@ -145,7 +145,7 @@ public class StreamTaskNetworkInputTest { deserializers); inputGate.sendEvent( - new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), + new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation().toTimeouted()), channelId); inputGate.sendElement(new StreamRecord<>(42L), channelId); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java index 316560d..47436c9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java @@ -102,7 +102,7 @@ public class UnalignedControllerCancellationTest { } private static CheckpointBarrier checkpoint(int checkpointId) { - return new CheckpointBarrier(checkpointId, 1, CheckpointOptions.forCheckpointWithDefaultLocation()); + return new CheckpointBarrier(checkpointId, 1, CheckpointOptions.forCheckpointWithDefaultLocation().toTimeouted()); } private static CancelCheckpointMarker cancel(int checkpointId) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java index a365bf5..7604746 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter; import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; @@ -38,6 +39,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBui import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.streaming.api.operators.SyncMailboxExecutor; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator; @@ -57,6 +59,9 @@ import java.util.Random; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNMENT_TIME_OUT; +import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; +import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -615,7 +620,7 @@ public class UnalignedControllerTest { new CheckpointBarrier( checkpointId, timestamp, - CheckpointOptions.forCheckpointWithDefaultLocation()), + new CheckpointOptions(CHECKPOINT, getDefault(), true, true, NO_ALIGNMENT_TIME_OUT)), new InputChannelInfo(0, channel)); } @@ -729,7 +734,7 @@ public class UnalignedControllerTest { } private CheckpointBarrier buildCheckpointBarrier(long id) { - return new CheckpointBarrier(id, 0, CheckpointOptions.forCheckpointWithDefaultLocation()); + return new CheckpointBarrier(id, 0, new CheckpointOptions(CHECKPOINT, getDefault(), true, true, NO_ALIGNMENT_TIME_OUT)); } // ------------------------------------------------------------------------
