This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b6d89fe5d5ed6daefe75657e9c6bf75dfadb07bb Author: Anton Kalashnikov <[email protected]> AuthorDate: Thu Jul 15 11:34:25 2021 +0200 [FLINK-23201][streaming] Reset alignment only for the currently processed checkpoint --- .../io/checkpointing/CheckpointBarrierHandler.java | 9 +++++-- .../io/checkpointing/CheckpointBarrierTracker.java | 2 ++ .../SingleCheckpointBarrierHandler.java | 3 +++ .../checkpointing/AlternatingCheckpointsTest.java | 28 ++++++++++++++++++++++ .../CheckpointBarrierTrackerTest.java | 26 ++++++++++++++++++++ 5 files changed, 66 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java index ceda88c..879f675 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * The {@link CheckpointBarrierHandler} reacts to checkpoint barrier arriving from the input @@ -143,7 +144,6 @@ public abstract class CheckpointBarrierHandler implements Closeable { } protected void notifyAbort(long checkpointId, CheckpointException cause) throws IOException { - resetAlignment(); toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause); } @@ -166,6 +166,11 @@ public abstract class CheckpointBarrierHandler implements Closeable { } protected void markAlignmentEnd(long alignmentDuration) { + checkState( + alignmentDuration >= 0, + "Alignment time is less than zero({}). Is the time monotonic?", + alignmentDuration); + latestAlignmentDurationNanos.complete(alignmentDuration); latestBytesProcessedDuringAlignment.complete(bytesProcessedDuringAlignment); @@ -173,7 +178,7 @@ public abstract class CheckpointBarrierHandler implements Closeable { bytesProcessedDuringAlignment = 0; } - private void resetAlignment() { + protected void resetAlignment() { markAlignmentEnd(0); latestAlignmentDurationNanos = new CompletableFuture<>(); latestBytesProcessedDuringAlignment = new CompletableFuture<>(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java index be403ce..34683fc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java @@ -166,6 +166,7 @@ public class CheckpointBarrierTracker extends CheckpointBarrierHandler { // fast path for single channel trackers if (totalNumberOfInputChannels == 1) { + resetAlignment(); notifyAbortOnCancellationBarrier(checkpointId); return; } @@ -225,6 +226,7 @@ public class CheckpointBarrierTracker extends CheckpointBarrierHandler { CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)); } } + resetAlignment(); } public long getLatestCheckpointId() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java index 45b5693..c3a6a38 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java @@ -346,6 +346,9 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { numBarriersReceived = 0; resetAlignmentTimer(); currentState = currentState.abort(cancelledId); + if (cancelledId == currentCheckpointId) { + resetAlignment(); + } notifyAbort(cancelledId, exception); allBarriersReceivedFuture.completeExceptionally(exception); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java index a2e55fc..5837d67 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java @@ -1042,6 +1042,34 @@ public class AlternatingCheckpointsTest { assertFalse(secondChannel.isBlocked()); } + @Test + public void testNextFirstCheckpointBarrierOvertakesCancellationBarrier() throws Exception { + int numberOfChannels = 2; + ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); + CheckpointedInputGate gate = + new TestCheckpointedInputGateBuilder( + numberOfChannels, getTestBarrierHandlerFactory(target)) + .withTestChannels() + .withSyncExecutor() + .build(); + + long alignmentTimeout = 10000; + Buffer checkpointBarrier = withTimeout(alignmentTimeout); + + send(checkpointBarrier, 0, gate); + clock.advanceTime(Duration.ofSeconds(1)); + send(withTimeout(2, alignmentTimeout), 0, gate); + clock.advanceTime(Duration.ofSeconds(1)); + send(toBuffer(new CancelCheckpointMarker(1L), true), 1, gate); + clock.advanceTime(Duration.ofSeconds(1)); + send(withTimeout(2, alignmentTimeout), 1, gate); + clock.advanceTime(Duration.ofSeconds(1)); + + assertEquals( + Duration.ofSeconds(2).toNanos(), + target.lastAlignmentDurationNanos.get().longValue()); + } + private void testBarrierHandling(CheckpointType checkpointType) throws Exception { final long barrierId = 123L; ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java index 654fd6e..d0618bd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java @@ -345,6 +345,32 @@ public class CheckpointBarrierTrackerTest { } @Test + public void testNextFirstCheckpointBarrierOvertakesCancellationBarrier() throws Exception { + BufferOrEvent[] sequence = { + // start checkpoint 1 + createBarrier(1, 1), + // start checkpoint 2(just suppose checkpoint 1 was canceled) + createBarrier(2, 1), + // cancellation barrier of checkpoint 1 + createCancellationBarrier(1, 0), + // finish the checkpoint 2 + createBarrier(2, 0) + }; + + ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler(); + ManualClock manualClock = new ManualClock(); + inputGate = createCheckpointedInputGate(2, sequence, validator, manualClock); + + for (BufferOrEvent boe : sequence) { + assertEquals(boe, inputGate.pollNext().get()); + manualClock.advanceTime(Duration.ofSeconds(1)); + } + assertEquals( + Duration.ofSeconds(2).toNanos(), + validator.lastAlignmentDurationNanos.get().longValue()); + } + + @Test public void testSingleChannelAbortCheckpoint() throws Exception { BufferOrEvent[] sequence = { createBuffer(0),
