This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5d446d2a768d66e01ac6e473e953e95b2f4ac1b0
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue Nov 3 18:30:23 2020 +0100

    [FLINK-19681][checkpointing] Choose controler before processing first 
barrier or announcement
---
 .../streaming/runtime/io/AlignedController.java    |  4 ++
 .../runtime/io/AlternatingController.java          |  7 +++-
 .../io/CheckpointBarrierBehaviourController.java   |  5 +++
 .../runtime/io/SingleCheckpointBarrierHandler.java | 46 ++++++++++++----------
 .../streaming/runtime/io/UnalignedController.java  |  4 ++
 5 files changed, 45 insertions(+), 21 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
index 8f0e409..f3999a4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
@@ -49,6 +49,10 @@ public class AlignedController implements 
CheckpointBarrierBehaviourController {
        }
 
        @Override
+       public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier 
barrier) {
+       }
+
+       @Override
        public void barrierReceived(
                        InputChannelInfo channelInfo,
                        CheckpointBarrier barrier) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index eb302ce..d040c66 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -44,6 +44,11 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
        }
 
        @Override
+       public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier 
barrier) {
+               activeController = chooseController(barrier);
+       }
+
+       @Override
        public void barrierReceived(InputChannelInfo channelInfo, 
CheckpointBarrier barrier) {
                checkActiveController(barrier);
                activeController.barrierReceived(channelInfo, barrier);
@@ -53,7 +58,7 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
        public boolean preProcessFirstBarrier(
                        InputChannelInfo channelInfo,
                        CheckpointBarrier barrier) throws IOException, 
CheckpointException {
-               activeController = chooseController(barrier);
+               checkActiveController(barrier);
                return activeController.preProcessFirstBarrier(channelInfo, 
barrier);
        }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
index f38d745..5939451 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
@@ -32,6 +32,11 @@ import java.io.IOException;
 public interface CheckpointBarrierBehaviourController {
 
        /**
+        * Invoked before first {@link CheckpointBarrier} or it's announcement.
+        */
+       void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier);
+
+       /**
         * Invoked per every received {@link CheckpointBarrier}.
         */
        void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier 
barrier);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
index ef8eb45..fa1ee00 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
@@ -66,6 +66,8 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
         */
        private long currentCheckpointId = -1L;
 
+       private long lastCancelledOrCompletedCheckpointId = -1L;
+
        private int numOpenChannels;
 
        private CompletableFuture<Void> allBarriersReceivedFuture = 
FutureUtils.completedVoidFuture();
@@ -105,18 +107,14 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
                        return;
                }
 
-               if (currentCheckpointId < barrierId) {
-                       if (isCheckpointPending()) {
-                               cancelSubsumedCheckpoint(barrierId);
-                       }
+               checkSubsumedCheckpoint(channelInfo, barrier);
 
+               if (numBarriersReceived == 0) {
                        if (getNumOpenChannels() == 1) {
                                
markAlignmentStartAndEnd(barrier.getTimestamp());
                        } else {
                                markAlignmentStart(barrier.getTimestamp());
                        }
-                       currentCheckpointId = barrierId;
-                       numBarriersReceived = 0;
                        allBarriersReceivedFuture = new CompletableFuture<>();
                        try {
                                if 
(controller.preProcessFirstBarrier(channelInfo, barrier)) {
@@ -140,6 +138,7 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
                                        markAlignmentEnd();
                                }
                                numBarriersReceived = 0;
+                               lastCancelledOrCompletedCheckpointId = 
currentCheckpointId;
                                if 
(controller.postProcessLastBarrier(channelInfo, barrier)) {
                                        LOG.debug("{}: Triggering checkpoint {} 
on the last barrier at {}.",
                                                taskName,
@@ -157,9 +156,22 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
                        CheckpointBarrier announcedBarrier,
                        int sequenceNumber,
                        InputChannelInfo channelInfo) throws IOException {
+               checkSubsumedCheckpoint(channelInfo, announcedBarrier);
                // TODO: FLINK-19681
        }
 
+       private void checkSubsumedCheckpoint(InputChannelInfo channelInfo, 
CheckpointBarrier barrier) throws IOException {
+               long barrierId = barrier.getId();
+               if (currentCheckpointId < barrierId) {
+                       if (isCheckpointPending()) {
+                               cancelSubsumedCheckpoint(currentCheckpointId);
+                       }
+                       currentCheckpointId = barrierId;
+                       numBarriersReceived = 0;
+                       
controller.preProcessFirstBarrierOrAnnouncement(barrier);
+               }
+       }
+
        @Override
        public void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws IOException {
                final long cancelledId = cancelBarrier.getCheckpointId();
@@ -168,10 +180,15 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
                }
        }
 
+       private void abortInternal(long cancelledId, CheckpointFailureReason 
reason) throws IOException {
+               abortInternal(cancelledId, new CheckpointException(reason));
+       }
+
        private void abortInternal(long cancelledId, CheckpointException 
exception) throws IOException {
                // by setting the currentCheckpointId to this checkpoint while 
keeping the numBarriers
                // at zero means that no checkpoint barrier can start a new 
alignment
                currentCheckpointId = Math.max(cancelledId, 
currentCheckpointId);
+               lastCancelledOrCompletedCheckpointId = 
Math.max(lastCancelledOrCompletedCheckpointId, cancelledId);
                numBarriersReceived = 0;
                controller.abortPendingCheckpoint(cancelledId, exception);
                allBarriersReceivedFuture.completeExceptionally(exception);
@@ -187,11 +204,7 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
                                "{}: Received EndOfPartition(-1) before 
completing current checkpoint {}. Skipping current checkpoint.",
                                taskName,
                                currentCheckpointId);
-                       numBarriersReceived = 0;
-                       CheckpointException exception = new 
CheckpointException(CHECKPOINT_DECLINED_INPUT_END_OF_STREAM);
-                       controller.abortPendingCheckpoint(currentCheckpointId, 
exception);
-                       
allBarriersReceivedFuture.completeExceptionally(exception);
-                       notifyAbort(currentCheckpointId, exception);
+                       abortInternal(currentCheckpointId, 
CHECKPOINT_DECLINED_INPUT_END_OF_STREAM);
                }
        }
 
@@ -208,23 +221,16 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
 
        @Override
        protected boolean isCheckpointPending() {
-               return numBarriersReceived > 0;
+               return currentCheckpointId != 
lastCancelledOrCompletedCheckpointId && currentCheckpointId >= 0;
        }
 
        private void cancelSubsumedCheckpoint(long barrierId) throws 
IOException {
-               CheckpointException exception = new 
CheckpointException("Barrier id: " + barrierId,
-                       CHECKPOINT_DECLINED_SUBSUMED);
-               // we did not complete the current checkpoint, another started 
before
                LOG.warn("{}: Received checkpoint barrier for checkpoint {} 
before completing current checkpoint {}. " +
                                "Skipping current checkpoint.",
                        taskName,
                        barrierId,
                        currentCheckpointId);
-
-               // let the task know we are not completing this
-               controller.abortPendingCheckpoint(currentCheckpointId, 
exception);
-               allBarriersReceivedFuture.completeExceptionally(exception);
-               notifyAbort(currentCheckpointId, exception);
+               abortInternal(currentCheckpointId, 
CHECKPOINT_DECLINED_SUBSUMED);
        }
 
        public CompletableFuture<Void> getAllBarriersReceivedFuture(long 
checkpointId) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
index d53b4ec..3d5f575 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
@@ -44,6 +44,10 @@ public class UnalignedController implements 
CheckpointBarrierBehaviourController
        }
 
        @Override
+       public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier 
barrier) {
+       }
+
+       @Override
        public void barrierReceived(InputChannelInfo channelInfo, 
CheckpointBarrier barrier) {
        }
 

Reply via email to