This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5d446d2a768d66e01ac6e473e953e95b2f4ac1b0 Author: Piotr Nowojski <[email protected]> AuthorDate: Tue Nov 3 18:30:23 2020 +0100 [FLINK-19681][checkpointing] Choose controler before processing first barrier or announcement --- .../streaming/runtime/io/AlignedController.java | 4 ++ .../runtime/io/AlternatingController.java | 7 +++- .../io/CheckpointBarrierBehaviourController.java | 5 +++ .../runtime/io/SingleCheckpointBarrierHandler.java | 46 ++++++++++++---------- .../streaming/runtime/io/UnalignedController.java | 4 ++ 5 files changed, 45 insertions(+), 21 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java index 8f0e409..f3999a4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java @@ -49,6 +49,10 @@ public class AlignedController implements CheckpointBarrierBehaviourController { } @Override + public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier) { + } + + @Override public void barrierReceived( InputChannelInfo channelInfo, CheckpointBarrier barrier) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java index eb302ce..d040c66 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java @@ -44,6 +44,11 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll } @Override + public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier) { + activeController = chooseController(barrier); + } + + @Override public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) { checkActiveController(barrier); activeController.barrierReceived(channelInfo, barrier); @@ -53,7 +58,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll public boolean preProcessFirstBarrier( InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { - activeController = chooseController(barrier); + checkActiveController(barrier); return activeController.preProcessFirstBarrier(channelInfo, barrier); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java index f38d745..5939451 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java @@ -32,6 +32,11 @@ import java.io.IOException; public interface CheckpointBarrierBehaviourController { /** + * Invoked before first {@link CheckpointBarrier} or it's announcement. + */ + void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier); + + /** * Invoked per every received {@link CheckpointBarrier}. */ void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java index ef8eb45..fa1ee00 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java @@ -66,6 +66,8 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { */ private long currentCheckpointId = -1L; + private long lastCancelledOrCompletedCheckpointId = -1L; + private int numOpenChannels; private CompletableFuture<Void> allBarriersReceivedFuture = FutureUtils.completedVoidFuture(); @@ -105,18 +107,14 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { return; } - if (currentCheckpointId < barrierId) { - if (isCheckpointPending()) { - cancelSubsumedCheckpoint(barrierId); - } + checkSubsumedCheckpoint(channelInfo, barrier); + if (numBarriersReceived == 0) { if (getNumOpenChannels() == 1) { markAlignmentStartAndEnd(barrier.getTimestamp()); } else { markAlignmentStart(barrier.getTimestamp()); } - currentCheckpointId = barrierId; - numBarriersReceived = 0; allBarriersReceivedFuture = new CompletableFuture<>(); try { if (controller.preProcessFirstBarrier(channelInfo, barrier)) { @@ -140,6 +138,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { markAlignmentEnd(); } numBarriersReceived = 0; + lastCancelledOrCompletedCheckpointId = currentCheckpointId; if (controller.postProcessLastBarrier(channelInfo, barrier)) { LOG.debug("{}: Triggering checkpoint {} on the last barrier at {}.", taskName, @@ -157,9 +156,22 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { CheckpointBarrier announcedBarrier, int sequenceNumber, InputChannelInfo channelInfo) throws IOException { + checkSubsumedCheckpoint(channelInfo, announcedBarrier); // TODO: FLINK-19681 } + private void checkSubsumedCheckpoint(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException { + long barrierId = barrier.getId(); + if (currentCheckpointId < barrierId) { + if (isCheckpointPending()) { + cancelSubsumedCheckpoint(currentCheckpointId); + } + currentCheckpointId = barrierId; + numBarriersReceived = 0; + controller.preProcessFirstBarrierOrAnnouncement(barrier); + } + } + @Override public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws IOException { final long cancelledId = cancelBarrier.getCheckpointId(); @@ -168,10 +180,15 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { } } + private void abortInternal(long cancelledId, CheckpointFailureReason reason) throws IOException { + abortInternal(cancelledId, new CheckpointException(reason)); + } + private void abortInternal(long cancelledId, CheckpointException exception) throws IOException { // by setting the currentCheckpointId to this checkpoint while keeping the numBarriers // at zero means that no checkpoint barrier can start a new alignment currentCheckpointId = Math.max(cancelledId, currentCheckpointId); + lastCancelledOrCompletedCheckpointId = Math.max(lastCancelledOrCompletedCheckpointId, cancelledId); numBarriersReceived = 0; controller.abortPendingCheckpoint(cancelledId, exception); allBarriersReceivedFuture.completeExceptionally(exception); @@ -187,11 +204,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { "{}: Received EndOfPartition(-1) before completing current checkpoint {}. Skipping current checkpoint.", taskName, currentCheckpointId); - numBarriersReceived = 0; - CheckpointException exception = new CheckpointException(CHECKPOINT_DECLINED_INPUT_END_OF_STREAM); - controller.abortPendingCheckpoint(currentCheckpointId, exception); - allBarriersReceivedFuture.completeExceptionally(exception); - notifyAbort(currentCheckpointId, exception); + abortInternal(currentCheckpointId, CHECKPOINT_DECLINED_INPUT_END_OF_STREAM); } } @@ -208,23 +221,16 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { @Override protected boolean isCheckpointPending() { - return numBarriersReceived > 0; + return currentCheckpointId != lastCancelledOrCompletedCheckpointId && currentCheckpointId >= 0; } private void cancelSubsumedCheckpoint(long barrierId) throws IOException { - CheckpointException exception = new CheckpointException("Barrier id: " + barrierId, - CHECKPOINT_DECLINED_SUBSUMED); - // we did not complete the current checkpoint, another started before LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + "Skipping current checkpoint.", taskName, barrierId, currentCheckpointId); - - // let the task know we are not completing this - controller.abortPendingCheckpoint(currentCheckpointId, exception); - allBarriersReceivedFuture.completeExceptionally(exception); - notifyAbort(currentCheckpointId, exception); + abortInternal(currentCheckpointId, CHECKPOINT_DECLINED_SUBSUMED); } public CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java index d53b4ec..3d5f575 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java @@ -44,6 +44,10 @@ public class UnalignedController implements CheckpointBarrierBehaviourController } @Override + public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier) { + } + + @Override public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) { }
