This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f40b4eb7703ce2a99fe8f8ef775a63197d45694c
Author: Roman Khachatryan <[email protected]>
AuthorDate: Mon Nov 9 19:21:30 2020 +0100

    [FLINK-19681][checkpointing] Reset channel barrier tracking from 
AlignedController
    
    Input channels are unaware of controller types and always update their
    pendingCheckpointBarrierId.  Therefore, resetting it also should be done
    in either case.  Otherwise, pendingCheckpointBarrierId may be left in a
    wrong state upon receiving a new barrier.
---
 .../flink/streaming/runtime/io/AlignedController.java    |  8 ++++++++
 .../runtime/io/AlignedControllerMassiveRandomTest.java   |  6 +++++-
 .../streaming/runtime/io/AlignedControllerTest.java      |  6 +++++-
 .../streaming/runtime/io/AlternatingControllerTest.java  | 16 ++++++++++++++++
 4 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
index 0991dc9..be6f2ef 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
@@ -102,6 +102,7 @@ public class AlignedController implements 
CheckpointBarrierBehaviourController {
                        InputChannelInfo channelInfo,
                        CheckpointBarrier barrier) throws IOException {
                
checkState(!barrier.getCheckpointOptions().isUnalignedCheckpoint());
+               resetPendingCheckpoint(barrier.getId());
                resumeConsumption();
                return Optional.of(barrier);
        }
@@ -110,6 +111,7 @@ public class AlignedController implements 
CheckpointBarrierBehaviourController {
        public void abortPendingCheckpoint(
                        long cancelledId,
                        CheckpointException exception) throws IOException {
+               resetPendingCheckpoint(cancelledId);
                resumeConsumption();
        }
 
@@ -120,6 +122,12 @@ public class AlignedController implements 
CheckpointBarrierBehaviourController {
                resumeConsumption(channelInfo);
        }
 
+       protected void resetPendingCheckpoint(long cancelledId) {
+               for (final CheckpointableInput input : inputs) {
+                       input.checkpointStopped(cancelledId);
+               }
+       }
+
        public Collection<InputChannelInfo> getBlockedChannels() {
                return blockedChannels.entrySet()
                        .stream()
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerMassiveRandomTest.java
index 0dae4a3..da9bdec 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerMassiveRandomTest.java
@@ -71,7 +71,11 @@ public class AlignedControllerMassiveRandomTest {
                                                "Testing: No task associated",
                                                new DummyCheckpointInvokable(),
                                                myIG.getNumberOfInputChannels(),
-                                               new AlignedController(myIG)),
+                                               new AlignedController(myIG) {
+                                                       @Override
+                                                       protected void 
resetPendingCheckpoint(long cancelledId) {
+                                                       }
+                                       }),
                                        new SyncMailboxExecutor());
 
                        for (int i = 0; i < 2000000; i++) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
index b9eac08..4538f96 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
@@ -133,7 +133,11 @@ public class AlignedControllerTest {
                                "Testing",
                                toNotify,
                                gate.getNumberOfInputChannels(),
-                               new AlignedController(gate)),
+                               new AlignedController(gate) {
+                                       @Override
+                                       protected void 
resetPendingCheckpoint(long cancelledId) {
+                                       }
+                               }),
                        new SyncMailboxExecutor());
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index 8d81df2..b12a0ad 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -68,6 +68,22 @@ import static org.junit.Assert.assertFalse;
  */
 public class AlternatingControllerTest {
 
+       /**
+        * Upon subsuming (or canceling) a checkpoint, channels should be 
notified regardless of whether UC controller is
+        * currently being used or not. Otherwise, channels may not capture 
in-flight buffers.
+        */
+       @Test
+       public void testChannelResetOnNewBarrier() throws Exception {
+               RecordingChannelStateWriter stateWriter = new 
RecordingChannelStateWriter();
+               CheckpointedInputGate gate = buildRemoteInputGate(new 
ValidatingCheckpointHandler(), 2, stateWriter);
+
+               sendBarrier(0, System.currentTimeMillis(), SAVEPOINT, gate, 0); 
// using AC because UC would require ordering in gate while polling
+               ((RemoteInputChannel) 
gate.getChannel(0)).onBuffer(createBuffer(1024), 1, 0); // to be captured
+               send(toBuffer(new CheckpointBarrier(1, 
System.currentTimeMillis(), unaligned(getDefault())), true), 1, gate);
+
+               assertFalse(stateWriter.getAddedInput().isEmpty());
+       }
+
        @Test
        public void testCheckpointHandling() throws Exception {
                testBarrierHandling(CHECKPOINT);

Reply via email to