This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cf3b861c0b564f88df9f0e00fb9b90ab63f2a188 Author: Roman Khachatryan <[email protected]> AuthorDate: Thu Dec 3 11:21:50 2020 +0100 [FLINK-19681][checkpointing] Address minor feedback --- .../runtime/checkpoint/CheckpointOptions.java | 4 +-- .../runtime/io/network/api/CheckpointBarrier.java | 2 +- .../io/network/partition/PrioritizedDeque.java | 2 +- .../streaming/runtime/io/AlignedController.java | 1 + .../runtime/io/AlternatingController.java | 31 ++++++++-------------- .../runtime/io/StreamTaskNetworkInputTest.java | 2 +- .../io/UnalignedControllerCancellationTest.java | 2 +- 7 files changed, 18 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java index b092a92..1f12e7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java @@ -98,7 +98,7 @@ public class CheckpointOptions implements Serializable { } public boolean isTimeoutable() { - return alignmentTimeout > 0 && alignmentTimeout != NO_ALIGNMENT_TIME_OUT; + return !isUnalignedCheckpoint && (alignmentTimeout > 0 && alignmentTimeout != NO_ALIGNMENT_TIME_OUT); } // ------------------------------------------------------------------------ @@ -190,7 +190,7 @@ public class CheckpointOptions implements Serializable { alignmentTimeout); } - public CheckpointOptions toTimeouted() { + public CheckpointOptions asTimedOut() { checkState(checkpointType == CheckpointType.CHECKPOINT); return create( checkpointType, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java index 8058f7b..e734616 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java @@ -119,6 +119,6 @@ public class CheckpointBarrier extends RuntimeEvent { } public CheckpointBarrier asUnaligned() { - return checkpointOptions.isUnalignedCheckpoint() ? this : new CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().toTimeouted()); + return checkpointOptions.isUnalignedCheckpoint() ? this : new CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().asTimedOut()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java index 77108d0..7b7c282 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java @@ -150,7 +150,7 @@ public final class PrioritizedDeque<T> implements Iterable<T> { */ public T getAndRemove(Predicate<T> preCondition) { Iterator<T> iterator = deque.iterator(); - for (int i = 0; i < deque.size(); i++) { + for (int i = 0; iterator.hasNext(); i++) { T next = iterator.next(); if (preCondition.test(next)) { if (i < numPriorityElements) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java index be6f2ef..033c626 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java @@ -76,6 +76,7 @@ public class AlignedController implements CheckpointBarrierBehaviourController { @Override public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier) { + sequenceNumberInAnnouncedChannels.clear(); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java index 06e6bdb..080a7ce 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointException; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -40,7 +39,6 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll private final UnalignedController unalignedController; private CheckpointBarrierBehaviourController activeController; - private long timeOutedBarrierId = -1; // used to shortcut timeout check public AlternatingController( AlignedController alignedController, @@ -52,6 +50,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll @Override public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier) { activeController = chooseController(barrier); + activeController.preProcessFirstBarrierOrAnnouncement(barrier); } @Override @@ -60,7 +59,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll CheckpointBarrier announcedBarrier, int sequenceNumber) throws IOException { - Optional<CheckpointBarrier> maybeTimedOut = maybeTimeOut(announcedBarrier); + Optional<CheckpointBarrier> maybeTimedOut = asTimedOut(announcedBarrier); announcedBarrier = maybeTimedOut.orElse(announcedBarrier); if (maybeTimedOut.isPresent() && activeController != unalignedController) { @@ -81,7 +80,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll return Optional.of(barrier); } - Optional<CheckpointBarrier> maybeTimedOut = maybeTimeOut(barrier); + Optional<CheckpointBarrier> maybeTimedOut = asTimedOut(barrier); barrier = maybeTimedOut.orElse(barrier); checkState(!activeController.barrierReceived(channelInfo, barrier).isPresent()); @@ -135,7 +134,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll @Override public Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { - Optional<CheckpointBarrier> maybeTimeOut = maybeTimeOut(barrier); + Optional<CheckpointBarrier> maybeTimeOut = asTimedOut(barrier); if (maybeTimeOut.isPresent() && activeController == alignedController) { switchToUnaligned(channelInfo, maybeTimeOut.get()); checkState(activeController == unalignedController); @@ -186,20 +185,12 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll return isAligned(barrier) ? alignedController : unalignedController; } - private Optional<CheckpointBarrier> maybeTimeOut(CheckpointBarrier barrier) { - CheckpointOptions options = barrier.getCheckpointOptions(); - boolean shouldTimeout = (options.isTimeoutable()) && ( - barrier.getId() == timeOutedBarrierId || - (System.currentTimeMillis() - barrier.getTimestamp()) > options.getAlignmentTimeout()); - if (options.isUnalignedCheckpoint() || !shouldTimeout) { - return Optional.empty(); - } - else { - timeOutedBarrierId = Math.max(timeOutedBarrierId, barrier.getId()); - return Optional.of(new CheckpointBarrier( - barrier.getId(), - barrier.getTimestamp(), - options.toTimeouted())); - } + private Optional<CheckpointBarrier> asTimedOut(CheckpointBarrier barrier) { + return Optional.of(barrier).filter(this::canTimeout).map(CheckpointBarrier::asUnaligned); + } + + private boolean canTimeout(CheckpointBarrier barrier) { + return barrier.getCheckpointOptions().isTimeoutable() && + barrier.getCheckpointOptions().getAlignmentTimeout() < (System.currentTimeMillis() - barrier.getTimestamp()); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java index 236550c..e916329 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java @@ -145,7 +145,7 @@ public class StreamTaskNetworkInputTest { deserializers); inputGate.sendEvent( - new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation().toTimeouted()), + new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation().asTimedOut()), channelId); inputGate.sendElement(new StreamRecord<>(42L), channelId); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java index 47436c9..82ac3fe 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java @@ -102,7 +102,7 @@ public class UnalignedControllerCancellationTest { } private static CheckpointBarrier checkpoint(int checkpointId) { - return new CheckpointBarrier(checkpointId, 1, CheckpointOptions.forCheckpointWithDefaultLocation().toTimeouted()); + return new CheckpointBarrier(checkpointId, 1, CheckpointOptions.forCheckpointWithDefaultLocation().asTimedOut()); } private static CancelCheckpointMarker cancel(int checkpointId) {
