This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b16a62a6a6a397817891a485ad87ca9772d8be52
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Dec 2 14:36:12 2020 +0100

    [FLINK-19681][checkpointing] Use time of start of alignment instead of 
checkpoint to timeout
---
 .../streaming/runtime/io/AlternatingController.java     | 17 ++++++++++++++++-
 .../streaming/runtime/io/AlternatingControllerTest.java |  1 +
 2 files changed, 17 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index 080a7ce..7d8761a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -39,6 +39,8 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
        private final UnalignedController unalignedController;
 
        private CheckpointBarrierBehaviourController activeController;
+       private long firstBarrierArrivalTime = Long.MAX_VALUE;
+       private long lastSeenBarrier = -1L;
 
        public AlternatingController(
                        AlignedController alignedController,
@@ -58,6 +60,10 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
                        InputChannelInfo channelInfo,
                        CheckpointBarrier announcedBarrier,
                        int sequenceNumber) throws IOException {
+               if (lastSeenBarrier < announcedBarrier.getId()) {
+                       lastSeenBarrier = announcedBarrier.getId();
+                       firstBarrierArrivalTime = 
getArrivalTime(announcedBarrier);
+               }
 
                Optional<CheckpointBarrier> maybeTimedOut = 
asTimedOut(announcedBarrier);
                announcedBarrier = maybeTimedOut.orElse(announcedBarrier);
@@ -103,6 +109,10 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
        public Optional<CheckpointBarrier> preProcessFirstBarrier(
                        InputChannelInfo channelInfo,
                        CheckpointBarrier barrier) throws IOException, 
CheckpointException {
+               if (lastSeenBarrier < barrier.getId()) {
+                       lastSeenBarrier = barrier.getId();
+                       firstBarrierArrivalTime = getArrivalTime(barrier);
+               }
                return activeController.preProcessFirstBarrier(channelInfo, 
barrier);
        }
 
@@ -191,6 +201,11 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
 
        private boolean canTimeout(CheckpointBarrier barrier) {
                return barrier.getCheckpointOptions().isTimeoutable() &&
-                       barrier.getCheckpointOptions().getAlignmentTimeout() < 
(System.currentTimeMillis() - barrier.getTimestamp());
+                       barrier.getId() <= lastSeenBarrier &&
+                       barrier.getCheckpointOptions().getAlignmentTimeout() * 
1_000_000 < (System.nanoTime() - firstBarrierArrivalTime);
+       }
+
+       private long getArrivalTime(CheckpointBarrier announcedBarrier) {
+               return announcedBarrier.getCheckpointOptions().isTimeoutable() 
? System.nanoTime() : Long.MAX_VALUE;
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index 7ecf8cb..34485f5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -199,6 +199,7 @@ public class AlternatingControllerTest {
                // First announcements and prioritsed barriers
                List<AbstractEvent> events = new ArrayList<>();
                events.add(gate.pollNext().get().getEvent());
+               Thread.sleep(alignmentTimeout * 2);
                events.add(gate.pollNext().get().getEvent());
                events.add(gate.pollNext().get().getEvent());
                events.add(gate.pollNext().get().getEvent());

Reply via email to