This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 10b7afae7423d75f94f397699b09deb9fbbdaca5
Author: fanrui <[email protected]>
AuthorDate: Fri May 20 17:36:16 2022 +0800

    [FLINK-27251][checkpoint] Refactor the barrier alignment timer and default 
priority sequence number
---
 .../network/partition/PipelinedSubpartition.java   | 10 +--
 .../io/checkpointing/BarrierAlignmentUtil.java     | 74 ++++++++++++++++++++++
 .../io/checkpointing/CheckpointBarrierHandler.java |  5 --
 .../io/checkpointing/InputProcessorUtil.java       | 23 +------
 .../SingleCheckpointBarrierHandler.java            | 23 +++----
 .../checkpointing/AlternatingCheckpointsTest.java  | 11 ++--
 .../checkpointing/TestBarrierHandlerFactory.java   | 10 +--
 7 files changed, 97 insertions(+), 59 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 7cfb582127c..125e38fe34d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -71,6 +71,8 @@ public class PipelinedSubpartition extends ResultSubpartition
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedSubpartition.class);
 
+    private static final int DEFAULT_PRIORITY_SEQUENCE_NUMBER = -1;
+
     // ------------------------------------------------------------------------
 
     /**
@@ -171,7 +173,7 @@ public class PipelinedSubpartition extends 
ResultSubpartition
         checkNotNull(bufferConsumer);
 
         final boolean notifyDataAvailable;
-        int prioritySequenceNumber = -1;
+        int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER;
         int newBufferSize;
         synchronized (buffers) {
             if (isFinished || isReleased) {
@@ -191,9 +193,7 @@ public class PipelinedSubpartition extends 
ResultSubpartition
             newBufferSize = bufferSize;
         }
 
-        if (prioritySequenceNumber != -1) {
-            notifyPriorityEvent(prioritySequenceNumber);
-        }
+        notifyPriorityEvent(prioritySequenceNumber);
         if (notifyDataAvailable) {
             notifyDataAvailable();
         }
@@ -593,7 +593,7 @@ public class PipelinedSubpartition extends 
ResultSubpartition
 
     private void notifyPriorityEvent(int prioritySequenceNumber) {
         final PipelinedSubpartitionView readView = this.readView;
-        if (readView != null) {
+        if (readView != null && prioritySequenceNumber != 
DEFAULT_PRIORITY_SEQUENCE_NUMBER) {
             readView.notifyPriorityEvent(prioritySequenceNumber);
         }
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java
new file mode 100644
index 00000000000..85e5c701e39
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.checkpointing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.streaming.runtime.tasks.TimerService;
+import org.apache.flink.util.clock.Clock;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+
+/** Utility for barrier alignment. */
+@Internal
+public class BarrierAlignmentUtil {
+
+    public static long getTimerDelay(Clock clock, CheckpointBarrier 
announcedBarrier) {
+        long alignedCheckpointTimeout =
+                
announcedBarrier.getCheckpointOptions().getAlignedCheckpointTimeout();
+        long timePassedSinceCheckpointStart =
+                clock.absoluteTimeMillis() - announcedBarrier.getTimestamp();
+
+        return Math.max(alignedCheckpointTimeout - 
timePassedSinceCheckpointStart, 0);
+    }
+
+    public static DelayableTimer createRegisterTimerCallback(
+            MailboxExecutor mailboxExecutor, TimerService timerService) {
+        return (callable, delay) -> {
+            ScheduledFuture<?> scheduledFuture =
+                    timerService.registerTimer(
+                            timerService.getCurrentProcessingTime() + 
delay.toMillis(),
+                            timestamp ->
+                                    mailboxExecutor.submit(
+                                            callable,
+                                            "Execute checkpoint barrier 
handler delayed action"));
+            return () -> scheduledFuture.cancel(false);
+        };
+    }
+
+    /** It can register a task to be executed some time later. */
+    public interface DelayableTimer {
+
+        /**
+         * Register a task to be executed some time later.
+         *
+         * @param callable the task to submit
+         * @param delay how long after the delay to execute the task
+         * @return the Cancellable, it can cancel the task.
+         */
+        Cancellable registerTask(Callable<?> callable, Duration delay);
+    }
+
+    /** A handle to a delayed action which can be cancelled. */
+    public interface Cancellable {
+        void cancel();
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
index 008e5eaddf1..102d4f2149c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
@@ -212,9 +212,4 @@ public abstract class CheckpointBarrierHandler implements 
Closeable {
     protected final Clock getClock() {
         return clock;
     }
-
-    /** A handle to a delayed action which can be cancelled. */
-    interface Cancellable {
-        void cancel();
-    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java
index efe61fde8c4..25ecf1fa7af 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java
@@ -30,20 +30,15 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.io.InputGateUtil;
 import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
 import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
-import 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.Cancellable;
 import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
 import org.apache.flink.streaming.runtime.tasks.TimerService;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.SystemClock;
 
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ScheduledFuture;
-import java.util.function.BiFunction;
 import java.util.stream.Stream;
 
 /**
@@ -160,7 +155,7 @@ public class InputProcessorUtil {
                     checkpointCoordinator,
                     clock,
                     numberOfChannels,
-                    createRegisterTimerCallback(mailboxExecutor, timerService),
+                    
BarrierAlignmentUtil.createRegisterTimerCallback(mailboxExecutor, timerService),
                     enableCheckpointAfterTasksFinished,
                     inputs);
         } else {
@@ -169,26 +164,12 @@ public class InputProcessorUtil {
                     toNotifyOnCheckpoint,
                     clock,
                     numberOfChannels,
-                    createRegisterTimerCallback(mailboxExecutor, timerService),
+                    
BarrierAlignmentUtil.createRegisterTimerCallback(mailboxExecutor, timerService),
                     enableCheckpointAfterTasksFinished,
                     inputs);
         }
     }
 
-    private static BiFunction<Callable<?>, Duration, Cancellable> 
createRegisterTimerCallback(
-            MailboxExecutor mailboxExecutor, TimerService timerService) {
-        return (callable, delay) -> {
-            ScheduledFuture<?> scheduledFuture =
-                    timerService.registerTimer(
-                            timerService.getCurrentProcessingTime() + 
delay.toMillis(),
-                            timestamp ->
-                                    mailboxExecutor.submit(
-                                            callable,
-                                            "Execute checkpoint barrier 
handler delayed action"));
-            return () -> scheduledFuture.cancel(false);
-        };
-    }
-
     private static void registerCheckpointMetrics(
             TaskIOMetricGroup taskIOMetricGroup, CheckpointBarrierHandler 
barrierHandler) {
         taskIOMetricGroup.gauge(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
index 92e49ab63f8..aaacd64eb63 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import 
org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
+import 
org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.Cancellable;
+import 
org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.DelayableTimer;
 import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.clock.Clock;
@@ -44,9 +46,7 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiFunction;
 
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED;
@@ -67,7 +67,7 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
 
     private final String taskName;
     private final ControllerImpl context;
-    private final BiFunction<Callable<?>, Duration, Cancellable> registerTimer;
+    private final DelayableTimer registerTimer;
     private final SubtaskCheckpointCoordinator subTaskCheckpointCoordinator;
     private final CheckpointableInput[] inputs;
 
@@ -128,7 +128,7 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
             SubtaskCheckpointCoordinator checkpointCoordinator,
             Clock clock,
             int numOpenChannels,
-            BiFunction<Callable<?>, Duration, Cancellable> registerTimer,
+            DelayableTimer registerTimer,
             boolean enableCheckpointAfterTasksFinished,
             CheckpointableInput... inputs) {
         return new SingleCheckpointBarrierHandler(
@@ -149,7 +149,7 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
             CheckpointableTask toNotifyOnCheckpoint,
             Clock clock,
             int numOpenChannels,
-            BiFunction<Callable<?>, Duration, Cancellable> registerTimer,
+            DelayableTimer registerTimer,
             boolean enableCheckpointAfterTasksFinished,
             CheckpointableInput... inputs) {
         return new SingleCheckpointBarrierHandler(
@@ -171,7 +171,7 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
             SubtaskCheckpointCoordinator checkpointCoordinator,
             Clock clock,
             int numOpenChannels,
-            BiFunction<Callable<?>, Duration, Cancellable> registerTimer,
+            DelayableTimer registerTimer,
             boolean enableCheckpointAfterTasksFinished,
             CheckpointableInput... inputs) {
         return new SingleCheckpointBarrierHandler(
@@ -195,7 +195,7 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
             int numOpenChannels,
             BarrierHandlerState currentState,
             boolean alternating,
-            BiFunction<Callable<?>, Duration, Cancellable> registerTimer,
+            DelayableTimer registerTimer,
             CheckpointableInput[] inputs,
             boolean enableCheckpointAfterTasksFinished) {
         super(toNotifyOnCheckpoint, clock, enableCheckpointAfterTasksFinished);
@@ -308,15 +308,10 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
     }
 
     private void registerAlignmentTimer(CheckpointBarrier announcedBarrier) {
-        long alignedCheckpointTimeout =
-                
announcedBarrier.getCheckpointOptions().getAlignedCheckpointTimeout();
-        long timePassedSinceCheckpointStart =
-                getClock().absoluteTimeMillis() - 
announcedBarrier.getTimestamp();
-
-        long timerDelay = Math.max(alignedCheckpointTimeout - 
timePassedSinceCheckpointStart, 0);
+        long timerDelay = BarrierAlignmentUtil.getTimerDelay(getClock(), 
announcedBarrier);
 
         this.currentAlignmentTimer =
-                registerTimer.apply(
+                registerTimer.registerTask(
                         () -> {
                             long barrierId = announcedBarrier.getId();
                             try {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
index 454ffa55c81..2642bbcd73a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
@@ -38,7 +38,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
-import 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.Cancellable;
+import 
org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.Cancellable;
 import org.apache.flink.streaming.util.TestCheckpointedInputGateBuilder;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.ManualClock;
@@ -57,7 +57,6 @@ import java.util.Optional;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
 
 import static java.util.Collections.singletonList;
 import static junit.framework.TestCase.assertTrue;
@@ -684,8 +683,8 @@ public class AlternatingCheckpointsTest {
         ClockWithDelayedActions clockWithDelayedActions =
                 new ClockWithDelayedActions() {
                     @Override
-                    public Cancellable apply(Callable<?> callable, Duration 
delay) {
-                        super.apply(callable, delay);
+                    public Cancellable registerTask(Callable<?> callable, 
Duration delay) {
+                        super.registerTask(callable, delay);
                         // do not unregister timers on cancel
                         return () -> {};
                     }
@@ -1465,7 +1464,7 @@ public class AlternatingCheckpointsTest {
     }
 
     private static class ClockWithDelayedActions extends Clock
-            implements BiFunction<Callable<?>, Duration, Cancellable> {
+            implements BarrierAlignmentUtil.DelayableTimer {
 
         // must start at least at 100 ms, because ValidatingCheckpointHandler
         // expects barriers to have positive timestamps
@@ -1474,7 +1473,7 @@ public class AlternatingCheckpointsTest {
                 new 
PriorityQueue<>(Comparator.comparingLong(CallableWithTimestamp::getTimestamp));
 
         @Override
-        public Cancellable apply(Callable<?> callable, Duration delay) {
+        public Cancellable registerTask(Callable<?> callable, Duration delay) {
             CallableWithTimestamp callableWithTimestamp =
                     new CallableWithTimestamp(
                             clock.relativeTimeNanos() + delay.toNanos(), 
callable);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/TestBarrierHandlerFactory.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/TestBarrierHandlerFactory.java
index 7f0e5ff5c25..1b345ebe0d2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/TestBarrierHandlerFactory.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/TestBarrierHandlerFactory.java
@@ -22,20 +22,14 @@ import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.Cancellable;
 import 
org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.SystemClock;
 
-import java.time.Duration;
-import java.util.concurrent.Callable;
-import java.util.function.BiFunction;
-
 /** A factory for creating instances of {@link SingleCheckpointBarrierHandler} 
for tests. */
 public class TestBarrierHandlerFactory {
     private final AbstractInvokable target;
-    private BiFunction<Callable<?>, Duration, Cancellable> actionRegistration =
-            (callable, delay) -> () -> {};
+    private BarrierAlignmentUtil.DelayableTimer actionRegistration = 
(callable, delay) -> () -> {};
     private Clock clock = SystemClock.getInstance();
     private boolean enableCheckpointsAfterTasksFinish = true;
 
@@ -48,7 +42,7 @@ public class TestBarrierHandlerFactory {
     }
 
     public TestBarrierHandlerFactory withActionRegistration(
-            BiFunction<Callable<?>, Duration, Cancellable> actionRegistration) 
{
+            BarrierAlignmentUtil.DelayableTimer actionRegistration) {
         this.actionRegistration = actionRegistration;
         return this;
     }

Reply via email to