This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7c6b0265e954da45386042bd2ead0102aabe2d1d
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Thu Jul 15 11:30:52 2021 +0200

    [FLINK-23201][streaming] Calculate checkpoint alignment time only for last 
started checkpoint
---
 .../io/checkpointing/CheckpointBarrierHandler.java | 29 +++++++++----
 .../io/checkpointing/CheckpointBarrierTracker.java |  9 ++--
 .../SingleCheckpointBarrierHandler.java            |  4 +-
 .../CheckpointBarrierTrackerTest.java              | 50 ++++++++++++++++++++--
 4 files changed, 76 insertions(+), 16 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
index 319fa44..ceda88c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
@@ -60,6 +60,9 @@ public abstract class CheckpointBarrierHandler implements 
Closeable {
     /** The timestamp as in {@link System#nanoTime()} at which the last 
alignment started. */
     private long startOfAlignmentTimestamp = OUTSIDE_OF_ALIGNMENT;
 
+    /** ID of checkpoint for which alignment was started last. */
+    private long startAlignmentCheckpointId = -1;
+
     /**
      * Cumulative counter of bytes processed during alignment. Once we 
complete alignment, we will
      * put this value into the {@link #latestBytesProcessedDuringAlignment}.
@@ -113,11 +116,20 @@ public abstract class CheckpointBarrierHandler implements 
Closeable {
                         checkpointBarrier.getTimestamp(),
                         System.currentTimeMillis());
 
-        CheckpointMetricsBuilder checkpointMetrics =
-                new CheckpointMetricsBuilder()
-                        
.setAlignmentDurationNanos(latestAlignmentDurationNanos)
-                        
.setBytesProcessedDuringAlignment(latestBytesProcessedDuringAlignment)
-                        
.setCheckpointStartDelayNanos(latestCheckpointStartDelayNanos);
+        CheckpointMetricsBuilder checkpointMetrics;
+        if (checkpointBarrier.getId() == startAlignmentCheckpointId) {
+            checkpointMetrics =
+                    new CheckpointMetricsBuilder()
+                            
.setAlignmentDurationNanos(latestAlignmentDurationNanos)
+                            
.setBytesProcessedDuringAlignment(latestBytesProcessedDuringAlignment)
+                            
.setCheckpointStartDelayNanos(latestCheckpointStartDelayNanos);
+        } else {
+            checkpointMetrics =
+                    new CheckpointMetricsBuilder()
+                            .setAlignmentDurationNanos(0L)
+                            .setBytesProcessedDuringAlignment(0L)
+                            .setCheckpointStartDelayNanos(0);
+        }
 
         toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
                 checkpointMetaData, checkpointBarrier.getCheckpointOptions(), 
checkpointMetrics);
@@ -135,17 +147,18 @@ public abstract class CheckpointBarrierHandler implements 
Closeable {
         toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
     }
 
-    protected void markAlignmentStartAndEnd(long checkpointCreationTimestamp) {
-        markAlignmentStart(checkpointCreationTimestamp);
+    protected void markAlignmentStartAndEnd(long checkpointId, long 
checkpointCreationTimestamp) {
+        markAlignmentStart(checkpointId, checkpointCreationTimestamp);
         markAlignmentEnd(0);
     }
 
-    protected void markAlignmentStart(long checkpointCreationTimestamp) {
+    protected void markAlignmentStart(long checkpointId, long 
checkpointCreationTimestamp) {
         latestCheckpointStartDelayNanos =
                 1_000_000 * Math.max(0, clock.absoluteTimeMillis() - 
checkpointCreationTimestamp);
 
         resetAlignment();
         startOfAlignmentTimestamp = clock.relativeTimeNanos();
+        startAlignmentCheckpointId = checkpointId;
     }
 
     protected void markAlignmentEnd() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java
index 845e23b..be403ce 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java
@@ -85,7 +85,7 @@ public class CheckpointBarrierTracker extends 
CheckpointBarrierHandler {
 
         // fast path for single channel trackers
         if (totalNumberOfInputChannels == 1) {
-            markAlignmentStartAndEnd(receivedBarrier.getTimestamp());
+            markAlignmentStartAndEnd(barrierId, 
receivedBarrier.getTimestamp());
             notifyCheckpoint(receivedBarrier);
             return;
         }
@@ -123,7 +123,10 @@ public class CheckpointBarrierTracker extends 
CheckpointBarrierHandler {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Received all barriers for checkpoint {}", 
barrierId);
                     }
-                    markAlignmentEnd();
+                    // Only one calculation of the alignment time at once is 
supported right now.
+                    if (barrierCount.checkpointId == 
latestPendingCheckpointID) {
+                        markAlignmentEnd();
+                    }
                     notifyCheckpoint(receivedBarrier);
                 }
             }
@@ -133,7 +136,7 @@ public class CheckpointBarrierTracker extends 
CheckpointBarrierHandler {
             // if it is not newer than the latest checkpoint ID, then there 
cannot be a
             // successful checkpoint for that ID anyways
             if (barrierId > latestPendingCheckpointID) {
-                markAlignmentStart(receivedBarrier.getTimestamp());
+                markAlignmentStart(barrierId, receivedBarrier.getTimestamp());
                 latestPendingCheckpointID = barrierId;
                 pendingCheckpoints.addLast(new 
CheckpointBarrierCount(barrierId));
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
index 3f9a2a2..45b5693 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
@@ -209,9 +209,9 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
 
         if (numBarriersReceived++ == 0) {
             if (getNumOpenChannels() == 1) {
-                markAlignmentStartAndEnd(barrier.getTimestamp());
+                markAlignmentStartAndEnd(barrierId, barrier.getTimestamp());
             } else {
-                markAlignmentStart(barrier.getTimestamp());
+                markAlignmentStart(barrierId, barrier.getTimestamp());
             }
         }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java
index 8aee7c3..654fd6e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java
@@ -40,6 +40,8 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
 import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
 import org.apache.flink.streaming.runtime.io.MockInputGate;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.ManualClock;
 import org.apache.flink.util.clock.SystemClock;
 
 import org.junit.After;
@@ -48,6 +50,7 @@ import org.junit.Test;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -552,6 +555,32 @@ public class CheckpointBarrierTrackerTest {
         assertThat(handler.getLastBytesProcessedDuringAlignment().get(), 
equalTo(0L));
     }
 
+    @Test
+    public void testTwoLastBarriersOneByOne() throws Exception {
+        BufferOrEvent[] sequence = {
+            // start checkpoint 1
+            createBarrier(1, 1),
+            // start checkpoint 2
+            createBarrier(2, 1),
+            // finish the checkpoint 1
+            createBarrier(1, 0),
+            // finish the checkpoint 2
+            createBarrier(2, 0)
+        };
+
+        ValidatingCheckpointHandler validator = new 
ValidatingCheckpointHandler();
+        ManualClock manualClock = new ManualClock();
+        inputGate = createCheckpointedInputGate(2, sequence, validator, 
manualClock);
+
+        for (BufferOrEvent boe : sequence) {
+            assertEquals(boe, inputGate.pollNext().get());
+            manualClock.advanceTime(Duration.ofSeconds(1));
+        }
+        assertEquals(
+                Duration.ofSeconds(2).toNanos(),
+                validator.lastAlignmentDurationNanos.get().longValue());
+    }
+
     // ------------------------------------------------------------------------
     //  Utils
     // ------------------------------------------------------------------------
@@ -597,13 +626,28 @@ public class CheckpointBarrierTrackerTest {
     }
 
     private static CheckpointedInputGate createCheckpointedInputGate(
+            int numberOfChannels,
+            BufferOrEvent[] sequence,
+            @Nullable AbstractInvokable toNotifyOnCheckpoint,
+            Clock clock) {
+        MockInputGate gate = new MockInputGate(numberOfChannels, 
Arrays.asList(sequence));
+        return createCheckpointedInputGate(gate, toNotifyOnCheckpoint, clock);
+    }
+
+    private static CheckpointedInputGate createCheckpointedInputGate(
             IndexedInputGate inputGate, @Nullable AbstractInvokable 
toNotifyOnCheckpoint) {
+        return createCheckpointedInputGate(
+                inputGate, toNotifyOnCheckpoint, SystemClock.getInstance());
+    }
+
+    private static CheckpointedInputGate createCheckpointedInputGate(
+            IndexedInputGate inputGate,
+            @Nullable AbstractInvokable toNotifyOnCheckpoint,
+            Clock clock) {
         return new CheckpointedInputGate(
                 inputGate,
                 new CheckpointBarrierTracker(
-                        inputGate.getNumberOfInputChannels(),
-                        toNotifyOnCheckpoint,
-                        SystemClock.getInstance()),
+                        inputGate.getNumberOfInputChannels(), 
toNotifyOnCheckpoint, clock),
                 new SyncMailboxExecutor());
     }
 

Reply via email to