This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2af2740bd5251b7cc39a56f9b73ace1f1cfbb871
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Tue Jun 22 20:16:33 2021 +0300

    [FLINK-23041][streaming] Calculation of timeout for switching from AC to UC 
is based on the global checkpoint start time rather than the local first 
barrier received time
---
 .../SingleCheckpointBarrierHandler.java            | 16 +++--
 .../checkpointing/AlternatingCheckpointsTest.java  | 74 ++++++++++++++++++++--
 2 files changed, 80 insertions(+), 10 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
index 83c324c..fcca333 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
@@ -82,7 +82,6 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
     private CompletableFuture<Void> allBarriersReceivedFuture = new 
CompletableFuture<>();
 
     private BarrierHandlerState currentState;
-    private long firstBarrierArrivalTime;
     private Cancellable currentAlignmentTimer;
     private final boolean alternating;
 
@@ -271,6 +270,13 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
     }
 
     private void registerAlignmentTimer(CheckpointBarrier announcedBarrier) {
+        long alignedCheckpointTimeout =
+                announcedBarrier.getCheckpointOptions().getAlignmentTimeout();
+        long timePassedSinceCheckpointStart =
+                getClock().absoluteTimeMillis() - 
announcedBarrier.getTimestamp();
+
+        long timerDelay = Math.max(alignedCheckpointTimeout - 
timePassedSinceCheckpointStart, 0);
+
         this.currentAlignmentTimer =
                 registerTimer.apply(
                         () -> {
@@ -290,8 +296,7 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
                             currentAlignmentTimer = null;
                             return null;
                         },
-                        Duration.ofMillis(
-                                
announcedBarrier.getCheckpointOptions().getAlignmentTimeout()));
+                        Duration.ofMillis(timerDelay));
     }
 
     private void checkNewCheckpoint(CheckpointBarrier barrier) throws 
IOException {
@@ -306,7 +311,6 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
         currentCheckpointId = barrierId;
         numBarriersReceived = 0;
         allBarriersReceivedFuture = new CompletableFuture<>();
-        firstBarrierArrivalTime = getClock().relativeTimeNanos();
 
         if (alternating && barrier.getCheckpointOptions().isTimeoutable()) {
             registerAlignmentTimer(barrier);
@@ -432,8 +436,8 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
         public boolean isTimedOut(CheckpointBarrier barrier) {
             return barrier.getCheckpointOptions().isTimeoutable()
                     && barrier.getId() <= currentCheckpointId
-                    && barrier.getCheckpointOptions().getAlignmentTimeout() * 
1_000_000
-                            < (getClock().relativeTimeNanos() - 
firstBarrierArrivalTime);
+                    && barrier.getCheckpointOptions().getAlignmentTimeout()
+                            < (getClock().absoluteTimeMillis() - 
barrier.getTimestamp());
         }
 
         @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
index d4acc1f..456e8af 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
@@ -354,6 +354,30 @@ public class AlternatingCheckpointsTest {
     }
 
     @Test
+    public void testTimeoutAlignmentBeforeFirstBarrier() throws Exception {
+        // given: Local channels.
+        int numChannels = 2;
+        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+        CheckpointedInputGate gate =
+                new TestCheckpointedInputGateBuilder(
+                                numChannels, 
getTestBarrierHandlerFactory(target))
+                        .withTestChannels()
+                        .withMailboxExecutor()
+                        .build();
+
+        long alignedCheckpointTimeout = 100;
+        // when: Aligned checkpoint timeout expired before the first barrier 
received.
+        Buffer checkpointBarrier = withTimeout(1, alignedCheckpointTimeout);
+        clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
+
+        ((TestInputChannel) 
gate.getChannel(0)).read(checkpointBarrier.retainBuffer());
+
+        // then: The UC is triggered as soon as the first barrier is received.
+        assertBarrier(gate);
+        assertEquals(1, target.getTriggeredCheckpointCounter());
+    }
+
+    @Test
     public void testTimeoutAlignmentWhenLocalBarrierFirst() throws Exception {
         // given: Gate with remote and local channels.
         int numChannels = 3;
@@ -737,6 +761,48 @@ public class AlternatingCheckpointsTest {
     }
 
     @Test
+    public void testActiveTimeoutBeforeFirstAnnouncementPassiveTimeout() 
throws Exception {
+        // given: Two barriers from two channels.
+        int numChannels = 2;
+        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+        try (CheckpointedInputGate gate =
+                new TestCheckpointedInputGateBuilder(
+                                numChannels, 
getTestBarrierHandlerFactory(target))
+                        .withRemoteChannels()
+                        .withMailboxExecutor()
+                        .build()) {
+            long alignmentCheckpointTimeout = 10;
+            Buffer checkpointBarrier = withTimeout(alignmentCheckpointTimeout);
+
+            getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0);
+            getChannel(gate, 0).onBuffer(checkpointBarrier.retainBuffer(), 1, 
0);
+            getChannel(gate, 1).onBuffer(dataBuffer(), 0, 0);
+            getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 
0);
+
+            assertEquals(0, target.getTriggeredCheckpointCounter());
+
+            // when: The receiving of the first announcement is delayed on 
more than alignment
+            // checkpoint timeout.
+            clock.advanceTimeWithoutRunningCallables(
+                    alignmentCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
+
+            assertAnnouncement(gate);
+            // we simulate active time out firing after the passive one
+            clock.executeCallables();
+
+            // then: Barriers should be reprioritized and the UC should be 
triggered.
+            assertAnnouncement(gate);
+            assertBarrier(gate);
+            assertBarrier(gate);
+            assertEquals(1, target.getTriggeredCheckpointCounter());
+            assertThat(target.getTriggeredCheckpointOptions(), 
contains(unaligned(getDefault())));
+            // Followed by overtaken buffers
+            assertData(gate);
+            assertData(gate);
+        }
+    }
+
+    @Test
     public void testActiveTimeoutAfterBarrierPassiveTimeout() throws Exception 
{
         int numChannels = 2;
         ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
@@ -1112,15 +1178,15 @@ public class AlternatingCheckpointsTest {
         while (checkpointedGate.pollNext().isPresent()) {}
     }
 
-    private Buffer withTimeout(long alignmentTimeout) throws IOException {
-        return withTimeout(1, alignmentTimeout);
+    private Buffer withTimeout(long alignedCheckpointTimeout) throws 
IOException {
+        return withTimeout(1, alignedCheckpointTimeout);
     }
 
-    private Buffer withTimeout(int checkpointId, long alignmentTimeout) throws 
IOException {
+    private Buffer withTimeout(int checkpointId, long 
alignedCheckpointTimeout) throws IOException {
         return barrier(
                 checkpointId,
                 clock.relativeTimeMillis(),
-                alignedWithTimeout(getDefault(), alignmentTimeout));
+                alignedWithTimeout(getDefault(), alignedCheckpointTimeout));
     }
 
     private Buffer barrier(long barrierId, long barrierTimestamp, 
CheckpointOptions options)

Reply via email to