This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b16a62a6a6a397817891a485ad87ca9772d8be52 Author: Roman Khachatryan <[email protected]> AuthorDate: Wed Dec 2 14:36:12 2020 +0100 [FLINK-19681][checkpointing] Use time of start of alignment instead of checkpoint to timeout --- .../streaming/runtime/io/AlternatingController.java | 17 ++++++++++++++++- .../streaming/runtime/io/AlternatingControllerTest.java | 1 + 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java index 080a7ce..7d8761a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java @@ -39,6 +39,8 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll private final UnalignedController unalignedController; private CheckpointBarrierBehaviourController activeController; + private long firstBarrierArrivalTime = Long.MAX_VALUE; + private long lastSeenBarrier = -1L; public AlternatingController( AlignedController alignedController, @@ -58,6 +60,10 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll InputChannelInfo channelInfo, CheckpointBarrier announcedBarrier, int sequenceNumber) throws IOException { + if (lastSeenBarrier < announcedBarrier.getId()) { + lastSeenBarrier = announcedBarrier.getId(); + firstBarrierArrivalTime = getArrivalTime(announcedBarrier); + } Optional<CheckpointBarrier> maybeTimedOut = asTimedOut(announcedBarrier); announcedBarrier = maybeTimedOut.orElse(announcedBarrier); @@ -103,6 +109,10 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll public Optional<CheckpointBarrier> preProcessFirstBarrier( InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { + if (lastSeenBarrier < barrier.getId()) { + lastSeenBarrier = barrier.getId(); + firstBarrierArrivalTime = getArrivalTime(barrier); + } return activeController.preProcessFirstBarrier(channelInfo, barrier); } @@ -191,6 +201,11 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll private boolean canTimeout(CheckpointBarrier barrier) { return barrier.getCheckpointOptions().isTimeoutable() && - barrier.getCheckpointOptions().getAlignmentTimeout() < (System.currentTimeMillis() - barrier.getTimestamp()); + barrier.getId() <= lastSeenBarrier && + barrier.getCheckpointOptions().getAlignmentTimeout() * 1_000_000 < (System.nanoTime() - firstBarrierArrivalTime); + } + + private long getArrivalTime(CheckpointBarrier announcedBarrier) { + return announcedBarrier.getCheckpointOptions().isTimeoutable() ? System.nanoTime() : Long.MAX_VALUE; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java index 7ecf8cb..34485f5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java @@ -199,6 +199,7 @@ public class AlternatingControllerTest { // First announcements and prioritsed barriers List<AbstractEvent> events = new ArrayList<>(); events.add(gate.pollNext().get().getEvent()); + Thread.sleep(alignmentTimeout * 2); events.add(gate.pollNext().get().getEvent()); events.add(gate.pollNext().get().getEvent()); events.add(gate.pollNext().get().getEvent());
