This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cf3b861c0b564f88df9f0e00fb9b90ab63f2a188
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Dec 3 11:21:50 2020 +0100

    [FLINK-19681][checkpointing] Address minor feedback
---
 .../runtime/checkpoint/CheckpointOptions.java      |  4 +--
 .../runtime/io/network/api/CheckpointBarrier.java  |  2 +-
 .../io/network/partition/PrioritizedDeque.java     |  2 +-
 .../streaming/runtime/io/AlignedController.java    |  1 +
 .../runtime/io/AlternatingController.java          | 31 ++++++++--------------
 .../runtime/io/StreamTaskNetworkInputTest.java     |  2 +-
 .../io/UnalignedControllerCancellationTest.java    |  2 +-
 7 files changed, 18 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index b092a92..1f12e7f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -98,7 +98,7 @@ public class CheckpointOptions implements Serializable {
        }
 
        public boolean isTimeoutable() {
-               return alignmentTimeout > 0 && alignmentTimeout != 
NO_ALIGNMENT_TIME_OUT;
+               return !isUnalignedCheckpoint && (alignmentTimeout > 0 && 
alignmentTimeout != NO_ALIGNMENT_TIME_OUT);
        }
 
        // 
------------------------------------------------------------------------
@@ -190,7 +190,7 @@ public class CheckpointOptions implements Serializable {
                        alignmentTimeout);
        }
 
-       public CheckpointOptions toTimeouted() {
+       public CheckpointOptions asTimedOut() {
                checkState(checkpointType == CheckpointType.CHECKPOINT);
                return create(
                        checkpointType,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 8058f7b..e734616 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -119,6 +119,6 @@ public class CheckpointBarrier extends RuntimeEvent {
        }
 
        public CheckpointBarrier asUnaligned() {
-               return checkpointOptions.isUnalignedCheckpoint() ? this : new 
CheckpointBarrier(getId(), getTimestamp(), 
getCheckpointOptions().toTimeouted());
+               return checkpointOptions.isUnalignedCheckpoint() ? this : new 
CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().asTimedOut());
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
index 77108d0..7b7c282 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
@@ -150,7 +150,7 @@ public final class PrioritizedDeque<T> implements 
Iterable<T> {
         */
        public T getAndRemove(Predicate<T> preCondition) {
                Iterator<T> iterator = deque.iterator();
-               for (int i = 0; i < deque.size(); i++) {
+               for (int i = 0; iterator.hasNext(); i++) {
                        T next = iterator.next();
                        if (preCondition.test(next)) {
                                if (i < numPriorityElements) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
index be6f2ef..033c626 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
@@ -76,6 +76,7 @@ public class AlignedController implements 
CheckpointBarrierBehaviourController {
 
        @Override
        public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier 
barrier) {
+               sequenceNumberInAnnouncedChannels.clear();
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index 06e6bdb..080a7ce 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
@@ -40,7 +39,6 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
        private final UnalignedController unalignedController;
 
        private CheckpointBarrierBehaviourController activeController;
-       private long timeOutedBarrierId = -1; // used to shortcut timeout check
 
        public AlternatingController(
                        AlignedController alignedController,
@@ -52,6 +50,7 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
        @Override
        public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier 
barrier) {
                activeController = chooseController(barrier);
+               activeController.preProcessFirstBarrierOrAnnouncement(barrier);
        }
 
        @Override
@@ -60,7 +59,7 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
                        CheckpointBarrier announcedBarrier,
                        int sequenceNumber) throws IOException {
 
-               Optional<CheckpointBarrier> maybeTimedOut = 
maybeTimeOut(announcedBarrier);
+               Optional<CheckpointBarrier> maybeTimedOut = 
asTimedOut(announcedBarrier);
                announcedBarrier = maybeTimedOut.orElse(announcedBarrier);
 
                if (maybeTimedOut.isPresent() && activeController != 
unalignedController) {
@@ -81,7 +80,7 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
                        return Optional.of(barrier);
                }
 
-               Optional<CheckpointBarrier> maybeTimedOut = 
maybeTimeOut(barrier);
+               Optional<CheckpointBarrier> maybeTimedOut = asTimedOut(barrier);
                barrier = maybeTimedOut.orElse(barrier);
 
                checkState(!activeController.barrierReceived(channelInfo, 
barrier).isPresent());
@@ -135,7 +134,7 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
 
        @Override
        public Optional<CheckpointBarrier> 
postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) 
throws IOException, CheckpointException {
-               Optional<CheckpointBarrier> maybeTimeOut = 
maybeTimeOut(barrier);
+               Optional<CheckpointBarrier> maybeTimeOut = asTimedOut(barrier);
                if (maybeTimeOut.isPresent() && activeController == 
alignedController) {
                        switchToUnaligned(channelInfo, maybeTimeOut.get());
                        checkState(activeController == unalignedController);
@@ -186,20 +185,12 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
                return isAligned(barrier) ? alignedController : 
unalignedController;
        }
 
-       private Optional<CheckpointBarrier> maybeTimeOut(CheckpointBarrier 
barrier) {
-               CheckpointOptions options = barrier.getCheckpointOptions();
-               boolean shouldTimeout = (options.isTimeoutable()) && (
-                       barrier.getId() == timeOutedBarrierId ||
-                               (System.currentTimeMillis() - 
barrier.getTimestamp()) > options.getAlignmentTimeout());
-               if (options.isUnalignedCheckpoint() || !shouldTimeout) {
-                       return Optional.empty();
-               }
-               else {
-                       timeOutedBarrierId = Math.max(timeOutedBarrierId, 
barrier.getId());
-                       return Optional.of(new CheckpointBarrier(
-                               barrier.getId(),
-                               barrier.getTimestamp(),
-                               options.toTimeouted()));
-               }
+       private Optional<CheckpointBarrier> asTimedOut(CheckpointBarrier 
barrier) {
+               return 
Optional.of(barrier).filter(this::canTimeout).map(CheckpointBarrier::asUnaligned);
+       }
+
+       private boolean canTimeout(CheckpointBarrier barrier) {
+               return barrier.getCheckpointOptions().isTimeoutable() &&
+                       barrier.getCheckpointOptions().getAlignmentTimeout() < 
(System.currentTimeMillis() - barrier.getTimestamp());
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 236550c..e916329 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -145,7 +145,7 @@ public class StreamTaskNetworkInputTest {
                        deserializers);
 
                inputGate.sendEvent(
-                       new CheckpointBarrier(checkpointId, 0L, 
CheckpointOptions.forCheckpointWithDefaultLocation().toTimeouted()),
+                       new CheckpointBarrier(checkpointId, 0L, 
CheckpointOptions.forCheckpointWithDefaultLocation().asTimedOut()),
                        channelId);
                inputGate.sendElement(new StreamRecord<>(42L), channelId);
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
index 47436c9..82ac3fe 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
@@ -102,7 +102,7 @@ public class UnalignedControllerCancellationTest {
        }
 
        private static CheckpointBarrier checkpoint(int checkpointId) {
-               return new CheckpointBarrier(checkpointId, 1, 
CheckpointOptions.forCheckpointWithDefaultLocation().toTimeouted());
+               return new CheckpointBarrier(checkpointId, 1, 
CheckpointOptions.forCheckpointWithDefaultLocation().asTimedOut());
        }
 
        private static CancelCheckpointMarker cancel(int checkpointId) {

Reply via email to