This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 10b7afae7423d75f94f397699b09deb9fbbdaca5 Author: fanrui <[email protected]> AuthorDate: Fri May 20 17:36:16 2022 +0800 [FLINK-27251][checkpoint] Refactor the barrier alignment timer and default priority sequence number --- .../network/partition/PipelinedSubpartition.java | 10 +-- .../io/checkpointing/BarrierAlignmentUtil.java | 74 ++++++++++++++++++++++ .../io/checkpointing/CheckpointBarrierHandler.java | 5 -- .../io/checkpointing/InputProcessorUtil.java | 23 +------ .../SingleCheckpointBarrierHandler.java | 23 +++---- .../checkpointing/AlternatingCheckpointsTest.java | 11 ++-- .../checkpointing/TestBarrierHandlerFactory.java | 10 +-- 7 files changed, 97 insertions(+), 59 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 7cfb582127c..125e38fe34d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -71,6 +71,8 @@ public class PipelinedSubpartition extends ResultSubpartition private static final Logger LOG = LoggerFactory.getLogger(PipelinedSubpartition.class); + private static final int DEFAULT_PRIORITY_SEQUENCE_NUMBER = -1; + // ------------------------------------------------------------------------ /** @@ -171,7 +173,7 @@ public class PipelinedSubpartition extends ResultSubpartition checkNotNull(bufferConsumer); final boolean notifyDataAvailable; - int prioritySequenceNumber = -1; + int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER; int newBufferSize; synchronized (buffers) { if (isFinished || isReleased) { @@ -191,9 +193,7 @@ public class PipelinedSubpartition extends ResultSubpartition newBufferSize = bufferSize; } - if (prioritySequenceNumber != -1) { - notifyPriorityEvent(prioritySequenceNumber); - } + notifyPriorityEvent(prioritySequenceNumber); if (notifyDataAvailable) { notifyDataAvailable(); } @@ -593,7 +593,7 @@ public class PipelinedSubpartition extends ResultSubpartition private void notifyPriorityEvent(int prioritySequenceNumber) { final PipelinedSubpartitionView readView = this.readView; - if (readView != null) { + if (readView != null && prioritySequenceNumber != DEFAULT_PRIORITY_SEQUENCE_NUMBER) { readView.notifyPriorityEvent(prioritySequenceNumber); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java new file mode 100644 index 00000000000..85e5c701e39 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.checkpointing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.runtime.tasks.TimerService; +import org.apache.flink.util.clock.Clock; + +import java.time.Duration; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledFuture; + +/** Utility for barrier alignment. */ +@Internal +public class BarrierAlignmentUtil { + + public static long getTimerDelay(Clock clock, CheckpointBarrier announcedBarrier) { + long alignedCheckpointTimeout = + announcedBarrier.getCheckpointOptions().getAlignedCheckpointTimeout(); + long timePassedSinceCheckpointStart = + clock.absoluteTimeMillis() - announcedBarrier.getTimestamp(); + + return Math.max(alignedCheckpointTimeout - timePassedSinceCheckpointStart, 0); + } + + public static DelayableTimer createRegisterTimerCallback( + MailboxExecutor mailboxExecutor, TimerService timerService) { + return (callable, delay) -> { + ScheduledFuture<?> scheduledFuture = + timerService.registerTimer( + timerService.getCurrentProcessingTime() + delay.toMillis(), + timestamp -> + mailboxExecutor.submit( + callable, + "Execute checkpoint barrier handler delayed action")); + return () -> scheduledFuture.cancel(false); + }; + } + + /** It can register a task to be executed some time later. */ + public interface DelayableTimer { + + /** + * Register a task to be executed some time later. + * + * @param callable the task to submit + * @param delay how long after the delay to execute the task + * @return the Cancellable, it can cancel the task. + */ + Cancellable registerTask(Callable<?> callable, Duration delay); + } + + /** A handle to a delayed action which can be cancelled. */ + public interface Cancellable { + void cancel(); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java index 008e5eaddf1..102d4f2149c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java @@ -212,9 +212,4 @@ public abstract class CheckpointBarrierHandler implements Closeable { protected final Clock getClock() { return clock; } - - /** A handle to a delayed action which can be cancelled. */ - interface Cancellable { - void cancel(); - } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java index efe61fde8c4..25ecf1fa7af 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java @@ -30,20 +30,15 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.io.InputGateUtil; import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor; import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput; -import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.Cancellable; import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator; import org.apache.flink.streaming.runtime.tasks.TimerService; import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; -import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ScheduledFuture; -import java.util.function.BiFunction; import java.util.stream.Stream; /** @@ -160,7 +155,7 @@ public class InputProcessorUtil { checkpointCoordinator, clock, numberOfChannels, - createRegisterTimerCallback(mailboxExecutor, timerService), + BarrierAlignmentUtil.createRegisterTimerCallback(mailboxExecutor, timerService), enableCheckpointAfterTasksFinished, inputs); } else { @@ -169,26 +164,12 @@ public class InputProcessorUtil { toNotifyOnCheckpoint, clock, numberOfChannels, - createRegisterTimerCallback(mailboxExecutor, timerService), + BarrierAlignmentUtil.createRegisterTimerCallback(mailboxExecutor, timerService), enableCheckpointAfterTasksFinished, inputs); } } - private static BiFunction<Callable<?>, Duration, Cancellable> createRegisterTimerCallback( - MailboxExecutor mailboxExecutor, TimerService timerService) { - return (callable, delay) -> { - ScheduledFuture<?> scheduledFuture = - timerService.registerTimer( - timerService.getCurrentProcessingTime() + delay.toMillis(), - timestamp -> - mailboxExecutor.submit( - callable, - "Execute checkpoint barrier handler delayed action")); - return () -> scheduledFuture.cancel(false); - }; - } - private static void registerCheckpointMetrics( TaskIOMetricGroup taskIOMetricGroup, CheckpointBarrierHandler barrierHandler) { taskIOMetricGroup.gauge( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java index 92e49ab63f8..aaacd64eb63 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput; import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask; +import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.Cancellable; +import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.DelayableTimer; import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.clock.Clock; @@ -44,9 +46,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED; @@ -67,7 +67,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { private final String taskName; private final ControllerImpl context; - private final BiFunction<Callable<?>, Duration, Cancellable> registerTimer; + private final DelayableTimer registerTimer; private final SubtaskCheckpointCoordinator subTaskCheckpointCoordinator; private final CheckpointableInput[] inputs; @@ -128,7 +128,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { SubtaskCheckpointCoordinator checkpointCoordinator, Clock clock, int numOpenChannels, - BiFunction<Callable<?>, Duration, Cancellable> registerTimer, + DelayableTimer registerTimer, boolean enableCheckpointAfterTasksFinished, CheckpointableInput... inputs) { return new SingleCheckpointBarrierHandler( @@ -149,7 +149,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { CheckpointableTask toNotifyOnCheckpoint, Clock clock, int numOpenChannels, - BiFunction<Callable<?>, Duration, Cancellable> registerTimer, + DelayableTimer registerTimer, boolean enableCheckpointAfterTasksFinished, CheckpointableInput... inputs) { return new SingleCheckpointBarrierHandler( @@ -171,7 +171,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { SubtaskCheckpointCoordinator checkpointCoordinator, Clock clock, int numOpenChannels, - BiFunction<Callable<?>, Duration, Cancellable> registerTimer, + DelayableTimer registerTimer, boolean enableCheckpointAfterTasksFinished, CheckpointableInput... inputs) { return new SingleCheckpointBarrierHandler( @@ -195,7 +195,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { int numOpenChannels, BarrierHandlerState currentState, boolean alternating, - BiFunction<Callable<?>, Duration, Cancellable> registerTimer, + DelayableTimer registerTimer, CheckpointableInput[] inputs, boolean enableCheckpointAfterTasksFinished) { super(toNotifyOnCheckpoint, clock, enableCheckpointAfterTasksFinished); @@ -308,15 +308,10 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { } private void registerAlignmentTimer(CheckpointBarrier announcedBarrier) { - long alignedCheckpointTimeout = - announcedBarrier.getCheckpointOptions().getAlignedCheckpointTimeout(); - long timePassedSinceCheckpointStart = - getClock().absoluteTimeMillis() - announcedBarrier.getTimestamp(); - - long timerDelay = Math.max(alignedCheckpointTimeout - timePassedSinceCheckpointStart, 0); + long timerDelay = BarrierAlignmentUtil.getTimerDelay(getClock(), announcedBarrier); this.currentAlignmentTimer = - registerTimer.apply( + registerTimer.registerTask( () -> { long barrierId = announcedBarrier.getId(); try { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java index 454ffa55c81..2642bbcd73a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java @@ -38,7 +38,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; -import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.Cancellable; +import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.Cancellable; import org.apache.flink.streaming.util.TestCheckpointedInputGateBuilder; import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.ManualClock; @@ -57,7 +57,6 @@ import java.util.Optional; import java.util.PriorityQueue; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; import static java.util.Collections.singletonList; import static junit.framework.TestCase.assertTrue; @@ -684,8 +683,8 @@ public class AlternatingCheckpointsTest { ClockWithDelayedActions clockWithDelayedActions = new ClockWithDelayedActions() { @Override - public Cancellable apply(Callable<?> callable, Duration delay) { - super.apply(callable, delay); + public Cancellable registerTask(Callable<?> callable, Duration delay) { + super.registerTask(callable, delay); // do not unregister timers on cancel return () -> {}; } @@ -1465,7 +1464,7 @@ public class AlternatingCheckpointsTest { } private static class ClockWithDelayedActions extends Clock - implements BiFunction<Callable<?>, Duration, Cancellable> { + implements BarrierAlignmentUtil.DelayableTimer { // must start at least at 100 ms, because ValidatingCheckpointHandler // expects barriers to have positive timestamps @@ -1474,7 +1473,7 @@ public class AlternatingCheckpointsTest { new PriorityQueue<>(Comparator.comparingLong(CallableWithTimestamp::getTimestamp)); @Override - public Cancellable apply(Callable<?> callable, Duration delay) { + public Cancellable registerTask(Callable<?> callable, Duration delay) { CallableWithTimestamp callableWithTimestamp = new CallableWithTimestamp( clock.relativeTimeNanos() + delay.toNanos(), callable); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/TestBarrierHandlerFactory.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/TestBarrierHandlerFactory.java index 7f0e5ff5c25..1b345ebe0d2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/TestBarrierHandlerFactory.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/TestBarrierHandlerFactory.java @@ -22,20 +22,14 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.Cancellable; import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator; import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; -import java.time.Duration; -import java.util.concurrent.Callable; -import java.util.function.BiFunction; - /** A factory for creating instances of {@link SingleCheckpointBarrierHandler} for tests. */ public class TestBarrierHandlerFactory { private final AbstractInvokable target; - private BiFunction<Callable<?>, Duration, Cancellable> actionRegistration = - (callable, delay) -> () -> {}; + private BarrierAlignmentUtil.DelayableTimer actionRegistration = (callable, delay) -> () -> {}; private Clock clock = SystemClock.getInstance(); private boolean enableCheckpointsAfterTasksFinish = true; @@ -48,7 +42,7 @@ public class TestBarrierHandlerFactory { } public TestBarrierHandlerFactory withActionRegistration( - BiFunction<Callable<?>, Duration, Cancellable> actionRegistration) { + BarrierAlignmentUtil.DelayableTimer actionRegistration) { this.actionRegistration = actionRegistration; return this; }
