This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7c6b0265e954da45386042bd2ead0102aabe2d1d Author: Anton Kalashnikov <[email protected]> AuthorDate: Thu Jul 15 11:30:52 2021 +0200 [FLINK-23201][streaming] Calculate checkpoint alignment time only for last started checkpoint --- .../io/checkpointing/CheckpointBarrierHandler.java | 29 +++++++++---- .../io/checkpointing/CheckpointBarrierTracker.java | 9 ++-- .../SingleCheckpointBarrierHandler.java | 4 +- .../CheckpointBarrierTrackerTest.java | 50 ++++++++++++++++++++-- 4 files changed, 76 insertions(+), 16 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java index 319fa44..ceda88c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java @@ -60,6 +60,9 @@ public abstract class CheckpointBarrierHandler implements Closeable { /** The timestamp as in {@link System#nanoTime()} at which the last alignment started. */ private long startOfAlignmentTimestamp = OUTSIDE_OF_ALIGNMENT; + /** ID of checkpoint for which alignment was started last. */ + private long startAlignmentCheckpointId = -1; + /** * Cumulative counter of bytes processed during alignment. Once we complete alignment, we will * put this value into the {@link #latestBytesProcessedDuringAlignment}. @@ -113,11 +116,20 @@ public abstract class CheckpointBarrierHandler implements Closeable { checkpointBarrier.getTimestamp(), System.currentTimeMillis()); - CheckpointMetricsBuilder checkpointMetrics = - new CheckpointMetricsBuilder() - .setAlignmentDurationNanos(latestAlignmentDurationNanos) - .setBytesProcessedDuringAlignment(latestBytesProcessedDuringAlignment) - .setCheckpointStartDelayNanos(latestCheckpointStartDelayNanos); + CheckpointMetricsBuilder checkpointMetrics; + if (checkpointBarrier.getId() == startAlignmentCheckpointId) { + checkpointMetrics = + new CheckpointMetricsBuilder() + .setAlignmentDurationNanos(latestAlignmentDurationNanos) + .setBytesProcessedDuringAlignment(latestBytesProcessedDuringAlignment) + .setCheckpointStartDelayNanos(latestCheckpointStartDelayNanos); + } else { + checkpointMetrics = + new CheckpointMetricsBuilder() + .setAlignmentDurationNanos(0L) + .setBytesProcessedDuringAlignment(0L) + .setCheckpointStartDelayNanos(0); + } toNotifyOnCheckpoint.triggerCheckpointOnBarrier( checkpointMetaData, checkpointBarrier.getCheckpointOptions(), checkpointMetrics); @@ -135,17 +147,18 @@ public abstract class CheckpointBarrierHandler implements Closeable { toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause); } - protected void markAlignmentStartAndEnd(long checkpointCreationTimestamp) { - markAlignmentStart(checkpointCreationTimestamp); + protected void markAlignmentStartAndEnd(long checkpointId, long checkpointCreationTimestamp) { + markAlignmentStart(checkpointId, checkpointCreationTimestamp); markAlignmentEnd(0); } - protected void markAlignmentStart(long checkpointCreationTimestamp) { + protected void markAlignmentStart(long checkpointId, long checkpointCreationTimestamp) { latestCheckpointStartDelayNanos = 1_000_000 * Math.max(0, clock.absoluteTimeMillis() - checkpointCreationTimestamp); resetAlignment(); startOfAlignmentTimestamp = clock.relativeTimeNanos(); + startAlignmentCheckpointId = checkpointId; } protected void markAlignmentEnd() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java index 845e23b..be403ce 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java @@ -85,7 +85,7 @@ public class CheckpointBarrierTracker extends CheckpointBarrierHandler { // fast path for single channel trackers if (totalNumberOfInputChannels == 1) { - markAlignmentStartAndEnd(receivedBarrier.getTimestamp()); + markAlignmentStartAndEnd(barrierId, receivedBarrier.getTimestamp()); notifyCheckpoint(receivedBarrier); return; } @@ -123,7 +123,10 @@ public class CheckpointBarrierTracker extends CheckpointBarrierHandler { if (LOG.isDebugEnabled()) { LOG.debug("Received all barriers for checkpoint {}", barrierId); } - markAlignmentEnd(); + // Only one calculation of the alignment time at once is supported right now. + if (barrierCount.checkpointId == latestPendingCheckpointID) { + markAlignmentEnd(); + } notifyCheckpoint(receivedBarrier); } } @@ -133,7 +136,7 @@ public class CheckpointBarrierTracker extends CheckpointBarrierHandler { // if it is not newer than the latest checkpoint ID, then there cannot be a // successful checkpoint for that ID anyways if (barrierId > latestPendingCheckpointID) { - markAlignmentStart(receivedBarrier.getTimestamp()); + markAlignmentStart(barrierId, receivedBarrier.getTimestamp()); latestPendingCheckpointID = barrierId; pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId)); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java index 3f9a2a2..45b5693 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java @@ -209,9 +209,9 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { if (numBarriersReceived++ == 0) { if (getNumOpenChannels() == 1) { - markAlignmentStartAndEnd(barrier.getTimestamp()); + markAlignmentStartAndEnd(barrierId, barrier.getTimestamp()); } else { - markAlignmentStart(barrier.getTimestamp()); + markAlignmentStart(barrierId, barrier.getTimestamp()); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java index 8aee7c3..654fd6e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java @@ -40,6 +40,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable; import org.apache.flink.streaming.api.operators.SyncMailboxExecutor; import org.apache.flink.streaming.runtime.io.MockInputGate; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.ManualClock; import org.apache.flink.util.clock.SystemClock; import org.junit.After; @@ -48,6 +50,7 @@ import org.junit.Test; import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -552,6 +555,32 @@ public class CheckpointBarrierTrackerTest { assertThat(handler.getLastBytesProcessedDuringAlignment().get(), equalTo(0L)); } + @Test + public void testTwoLastBarriersOneByOne() throws Exception { + BufferOrEvent[] sequence = { + // start checkpoint 1 + createBarrier(1, 1), + // start checkpoint 2 + createBarrier(2, 1), + // finish the checkpoint 1 + createBarrier(1, 0), + // finish the checkpoint 2 + createBarrier(2, 0) + }; + + ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler(); + ManualClock manualClock = new ManualClock(); + inputGate = createCheckpointedInputGate(2, sequence, validator, manualClock); + + for (BufferOrEvent boe : sequence) { + assertEquals(boe, inputGate.pollNext().get()); + manualClock.advanceTime(Duration.ofSeconds(1)); + } + assertEquals( + Duration.ofSeconds(2).toNanos(), + validator.lastAlignmentDurationNanos.get().longValue()); + } + // ------------------------------------------------------------------------ // Utils // ------------------------------------------------------------------------ @@ -597,13 +626,28 @@ public class CheckpointBarrierTrackerTest { } private static CheckpointedInputGate createCheckpointedInputGate( + int numberOfChannels, + BufferOrEvent[] sequence, + @Nullable AbstractInvokable toNotifyOnCheckpoint, + Clock clock) { + MockInputGate gate = new MockInputGate(numberOfChannels, Arrays.asList(sequence)); + return createCheckpointedInputGate(gate, toNotifyOnCheckpoint, clock); + } + + private static CheckpointedInputGate createCheckpointedInputGate( IndexedInputGate inputGate, @Nullable AbstractInvokable toNotifyOnCheckpoint) { + return createCheckpointedInputGate( + inputGate, toNotifyOnCheckpoint, SystemClock.getInstance()); + } + + private static CheckpointedInputGate createCheckpointedInputGate( + IndexedInputGate inputGate, + @Nullable AbstractInvokable toNotifyOnCheckpoint, + Clock clock) { return new CheckpointedInputGate( inputGate, new CheckpointBarrierTracker( - inputGate.getNumberOfInputChannels(), - toNotifyOnCheckpoint, - SystemClock.getInstance()), + inputGate.getNumberOfInputChannels(), toNotifyOnCheckpoint, clock), new SyncMailboxExecutor()); }
