This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6d89fe5d5ed6daefe75657e9c6bf75dfadb07bb
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Thu Jul 15 11:34:25 2021 +0200

    [FLINK-23201][streaming] Reset alignment only for the currently processed 
checkpoint
---
 .../io/checkpointing/CheckpointBarrierHandler.java |  9 +++++--
 .../io/checkpointing/CheckpointBarrierTracker.java |  2 ++
 .../SingleCheckpointBarrierHandler.java            |  3 +++
 .../checkpointing/AlternatingCheckpointsTest.java  | 28 ++++++++++++++++++++++
 .../CheckpointBarrierTrackerTest.java              | 26 ++++++++++++++++++++
 5 files changed, 66 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
index ceda88c..879f675 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The {@link CheckpointBarrierHandler} reacts to checkpoint barrier arriving 
from the input
@@ -143,7 +144,6 @@ public abstract class CheckpointBarrierHandler implements 
Closeable {
     }
 
     protected void notifyAbort(long checkpointId, CheckpointException cause) 
throws IOException {
-        resetAlignment();
         toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
     }
 
@@ -166,6 +166,11 @@ public abstract class CheckpointBarrierHandler implements 
Closeable {
     }
 
     protected void markAlignmentEnd(long alignmentDuration) {
+        checkState(
+                alignmentDuration >= 0,
+                "Alignment time is less than zero({}). Is the time monotonic?",
+                alignmentDuration);
+
         latestAlignmentDurationNanos.complete(alignmentDuration);
         
latestBytesProcessedDuringAlignment.complete(bytesProcessedDuringAlignment);
 
@@ -173,7 +178,7 @@ public abstract class CheckpointBarrierHandler implements 
Closeable {
         bytesProcessedDuringAlignment = 0;
     }
 
-    private void resetAlignment() {
+    protected void resetAlignment() {
         markAlignmentEnd(0);
         latestAlignmentDurationNanos = new CompletableFuture<>();
         latestBytesProcessedDuringAlignment = new CompletableFuture<>();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java
index be403ce..34683fc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java
@@ -166,6 +166,7 @@ public class CheckpointBarrierTracker extends 
CheckpointBarrierHandler {
 
         // fast path for single channel trackers
         if (totalNumberOfInputChannels == 1) {
+            resetAlignment();
             notifyAbortOnCancellationBarrier(checkpointId);
             return;
         }
@@ -225,6 +226,7 @@ public class CheckpointBarrierTracker extends 
CheckpointBarrierHandler {
                                 
CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
             }
         }
+        resetAlignment();
     }
 
     public long getLatestCheckpointId() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
index 45b5693..c3a6a38 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
@@ -346,6 +346,9 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
         numBarriersReceived = 0;
         resetAlignmentTimer();
         currentState = currentState.abort(cancelledId);
+        if (cancelledId == currentCheckpointId) {
+            resetAlignment();
+        }
         notifyAbort(cancelledId, exception);
         allBarriersReceivedFuture.completeExceptionally(exception);
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
index a2e55fc..5837d67 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
@@ -1042,6 +1042,34 @@ public class AlternatingCheckpointsTest {
         assertFalse(secondChannel.isBlocked());
     }
 
+    @Test
+    public void testNextFirstCheckpointBarrierOvertakesCancellationBarrier() 
throws Exception {
+        int numberOfChannels = 2;
+        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+        CheckpointedInputGate gate =
+                new TestCheckpointedInputGateBuilder(
+                                numberOfChannels, 
getTestBarrierHandlerFactory(target))
+                        .withTestChannels()
+                        .withSyncExecutor()
+                        .build();
+
+        long alignmentTimeout = 10000;
+        Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+
+        send(checkpointBarrier, 0, gate);
+        clock.advanceTime(Duration.ofSeconds(1));
+        send(withTimeout(2, alignmentTimeout), 0, gate);
+        clock.advanceTime(Duration.ofSeconds(1));
+        send(toBuffer(new CancelCheckpointMarker(1L), true), 1, gate);
+        clock.advanceTime(Duration.ofSeconds(1));
+        send(withTimeout(2, alignmentTimeout), 1, gate);
+        clock.advanceTime(Duration.ofSeconds(1));
+
+        assertEquals(
+                Duration.ofSeconds(2).toNanos(),
+                target.lastAlignmentDurationNanos.get().longValue());
+    }
+
     private void testBarrierHandling(CheckpointType checkpointType) throws 
Exception {
         final long barrierId = 123L;
         ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java
index 654fd6e..d0618bd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java
@@ -345,6 +345,32 @@ public class CheckpointBarrierTrackerTest {
     }
 
     @Test
+    public void testNextFirstCheckpointBarrierOvertakesCancellationBarrier() 
throws Exception {
+        BufferOrEvent[] sequence = {
+            // start checkpoint 1
+            createBarrier(1, 1),
+            //  start checkpoint 2(just suppose checkpoint 1 was canceled)
+            createBarrier(2, 1),
+            // cancellation barrier of checkpoint 1
+            createCancellationBarrier(1, 0),
+            //  finish the checkpoint 2
+            createBarrier(2, 0)
+        };
+
+        ValidatingCheckpointHandler validator = new 
ValidatingCheckpointHandler();
+        ManualClock manualClock = new ManualClock();
+        inputGate = createCheckpointedInputGate(2, sequence, validator, 
manualClock);
+
+        for (BufferOrEvent boe : sequence) {
+            assertEquals(boe, inputGate.pollNext().get());
+            manualClock.advanceTime(Duration.ofSeconds(1));
+        }
+        assertEquals(
+                Duration.ofSeconds(2).toNanos(),
+                validator.lastAlignmentDurationNanos.get().longValue());
+    }
+
+    @Test
     public void testSingleChannelAbortCheckpoint() throws Exception {
         BufferOrEvent[] sequence = {
             createBuffer(0),

Reply via email to