This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9fdbbfcec892222a3cc57f9b5d01a8cab9472d0d Author: Roman Khachatryan <[email protected]> AuthorDate: Thu Dec 3 11:54:29 2020 +0100 [FLINK-19681][checkpointing] Switch controller before processing the first barrier If a checkpoint announcement was processed and then UC-barrier arrives (from the upstream) then it should be processed by the UC controller. --- .../streaming/runtime/io/AlternatingController.java | 1 + .../runtime/io/AlternatingControllerTest.java | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java index 7d8761a..e3f816c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java @@ -113,6 +113,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll lastSeenBarrier = barrier.getId(); firstBarrierArrivalTime = getArrivalTime(barrier); } + activeController = chooseController(barrier); return activeController.preProcessFirstBarrier(channelInfo, barrier); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java index 34485f5..d998649 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java @@ -105,6 +105,26 @@ public class AlternatingControllerTest { assertFalse(stateWriter.getAddedInput().isEmpty()); } + /** + * If a checkpoint announcement was processed from one channel and then UC-barrier arrives + * on another channel, this UC barrier should be processed by the UC controller. + */ + @Test + public void testSwitchToUnalignedByUpstream() throws Exception { + SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(2).build(); + inputGate.setInputChannels(new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)); + ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); + SingleCheckpointBarrierHandler barrierHandler = barrierHandler(inputGate, target); + CheckpointedInputGate gate = buildGate(target, 2); + + CheckpointBarrier aligned = new CheckpointBarrier(1, System.currentTimeMillis(), alignedWithTimeout(getDefault(), Integer.MAX_VALUE)); + + send(toBuffer(new EventAnnouncement(aligned, 0), true), 0, gate); // process announcement but not the barrier + assertEquals(0, target.triggeredCheckpointCounter); + send(toBuffer(aligned.asUnaligned(), true), 1, gate); // pretend it came from upstream before the first (AC) barrier was picked up + assertEquals(1, target.triggeredCheckpointCounter); + } + @Test public void testCheckpointHandling() throws Exception { testBarrierHandling(CHECKPOINT);
