This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9fdbbfcec892222a3cc57f9b5d01a8cab9472d0d
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Dec 3 11:54:29 2020 +0100

    [FLINK-19681][checkpointing] Switch controller before processing the first 
barrier
    
    If a checkpoint announcement was processed and then UC-barrier arrives
    (from the upstream) then it should be processed by the UC controller.
---
 .../streaming/runtime/io/AlternatingController.java  |  1 +
 .../runtime/io/AlternatingControllerTest.java        | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index 7d8761a..e3f816c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -113,6 +113,7 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
                        lastSeenBarrier = barrier.getId();
                        firstBarrierArrivalTime = getArrivalTime(barrier);
                }
+               activeController = chooseController(barrier);
                return activeController.preProcessFirstBarrier(channelInfo, 
barrier);
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index 34485f5..d998649 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -105,6 +105,26 @@ public class AlternatingControllerTest {
                assertFalse(stateWriter.getAddedInput().isEmpty());
        }
 
+       /**
+        * If a checkpoint announcement was processed from one channel and then 
UC-barrier arrives
+        * on another channel, this UC barrier should be processed by the UC 
controller.
+        */
+       @Test
+       public void testSwitchToUnalignedByUpstream() throws Exception {
+               SingleInputGate inputGate = new 
SingleInputGateBuilder().setNumberOfChannels(2).build();
+               inputGate.setInputChannels(new TestInputChannel(inputGate, 0), 
new TestInputChannel(inputGate, 1));
+               ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
+               SingleCheckpointBarrierHandler barrierHandler = 
barrierHandler(inputGate, target);
+               CheckpointedInputGate gate = buildGate(target, 2);
+
+               CheckpointBarrier aligned = new CheckpointBarrier(1, 
System.currentTimeMillis(), alignedWithTimeout(getDefault(), 
Integer.MAX_VALUE));
+
+               send(toBuffer(new EventAnnouncement(aligned, 0), true), 0, 
gate); // process announcement but not the barrier
+               assertEquals(0, target.triggeredCheckpointCounter);
+               send(toBuffer(aligned.asUnaligned(), true), 1, gate); // 
pretend it came from upstream before the first (AC) barrier was picked up
+               assertEquals(1, target.triggeredCheckpointCounter);
+       }
+
        @Test
        public void testCheckpointHandling() throws Exception {
                testBarrierHandling(CHECKPOINT);

Reply via email to