This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7281f2b73c50c2c6c90894e67f2b78e61986b4f7
Author: Yun Gao <[email protected]>
AuthorDate: Mon May 31 20:40:25 2021 +0800

    [hotfix] Refactor the initialization of StreamTask to expose 
CheckpointBarrierHandler
---
 .../io/checkpointing/InputProcessorUtil.java       | 54 ----------------------
 .../runtime/tasks/MultipleInputStreamTask.java     |  5 ++
 .../runtime/tasks/OneInputStreamTask.java          | 45 ++++++++++++++----
 .../flink/streaming/runtime/tasks/StreamTask.java  |  4 ++
 .../runtime/tasks/TwoInputStreamTask.java          | 30 ++++++++++--
 .../io/checkpointing/InputProcessorUtilTest.java   | 19 +++++---
 6 files changed, 82 insertions(+), 75 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java
index 16c42b4..da1d2a2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java
@@ -36,12 +36,9 @@ import org.apache.flink.streaming.runtime.tasks.TimerService;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.SystemClock;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
-
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -55,57 +52,6 @@ import java.util.stream.Stream;
  */
 @Internal
 public class InputProcessorUtil {
-    @SuppressWarnings("unchecked")
-    public static CheckpointedInputGate createCheckpointedInputGate(
-            AbstractInvokable toNotifyOnCheckpoint,
-            StreamConfig config,
-            SubtaskCheckpointCoordinator checkpointCoordinator,
-            IndexedInputGate[] inputGates,
-            TaskIOMetricGroup taskIOMetricGroup,
-            String taskName,
-            MailboxExecutor mailboxExecutor,
-            TimerService timerService) {
-        CheckpointedInputGate[] checkpointedInputGates =
-                createCheckpointedMultipleInputGate(
-                        toNotifyOnCheckpoint,
-                        config,
-                        checkpointCoordinator,
-                        taskIOMetricGroup,
-                        taskName,
-                        mailboxExecutor,
-                        new List[] {Arrays.asList(inputGates)},
-                        Collections.emptyList(),
-                        timerService);
-        return Iterables.getOnlyElement(Arrays.asList(checkpointedInputGates));
-    }
-
-    /**
-     * @return an array of {@link CheckpointedInputGate} created for 
corresponding {@link
-     *     InputGate}s supplied as parameters.
-     */
-    public static CheckpointedInputGate[] createCheckpointedMultipleInputGate(
-            AbstractInvokable toNotifyOnCheckpoint,
-            StreamConfig config,
-            SubtaskCheckpointCoordinator checkpointCoordinator,
-            TaskIOMetricGroup taskIOMetricGroup,
-            String taskName,
-            MailboxExecutor mailboxExecutor,
-            List<IndexedInputGate>[] inputGates,
-            List<StreamTaskSourceInput<?>> sourceInputs,
-            TimerService timerService) {
-        CheckpointBarrierHandler barrierHandler =
-                createCheckpointBarrierHandler(
-                        toNotifyOnCheckpoint,
-                        config,
-                        checkpointCoordinator,
-                        taskName,
-                        inputGates,
-                        sourceInputs,
-                        mailboxExecutor,
-                        timerService);
-        return createCheckpointedMultipleInputGate(
-                mailboxExecutor, inputGates, taskIOMetricGroup, 
barrierHandler, config);
-    }
 
     public static CheckpointedInputGate[] createCheckpointedMultipleInputGate(
             MailboxExecutor mailboxExecutor,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
index 20d775a..1ec5906 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
@@ -49,6 +49,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.function.Function;
@@ -175,6 +176,10 @@ public class MultipleInputStreamTask<OUT>
                         getEnvironment().getTaskInfo());
     }
 
+    protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() 
{
+        return Optional.ofNullable(checkpointBarrierHandler);
+    }
+
     @Override
     public Future<Boolean> triggerCheckpointAsync(
             CheckpointMetaData metadata, CheckpointOptions options) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index c1296f3..c114ceb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
 import org.apache.flink.streaming.runtime.io.StreamTaskInput;
 import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
 import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInputFactory;
+import 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
 import 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
 import org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
@@ -43,8 +44,15 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 
+import org.apache.flink.shaded.curator4.com.google.common.collect.Iterables;
+
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
 import static 
org.apache.flink.streaming.api.graph.StreamConfig.requiresSorting;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -53,6 +61,8 @@ import static org.apache.flink.util.Preconditions.checkState;
 @Internal
 public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, 
OneInputStreamOperator<IN, OUT>> {
 
+    @Nullable private CheckpointBarrierHandler checkpointBarrierHandler;
+
     private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();
 
     /**
@@ -117,6 +127,11 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                 .gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
inputWatermarkGauge::getValue);
     }
 
+    @Override
+    protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() 
{
+        return Optional.ofNullable(checkpointBarrierHandler);
+    }
+
     private StreamTaskInput<IN> wrapWithSorted(StreamTaskInput<IN> input) {
         ClassLoader userCodeClassLoader = getUserCodeClassLoader();
         return new SortingDataInput<>(
@@ -133,18 +148,30 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                 this);
     }
 
+    @SuppressWarnings("unchecked")
     private CheckpointedInputGate createCheckpointedInputGate() {
         IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
 
-        return InputProcessorUtil.createCheckpointedInputGate(
-                this,
-                configuration,
-                getCheckpointCoordinator(),
-                inputGates,
-                getEnvironment().getMetricGroup().getIOMetricGroup(),
-                getTaskNameWithSubtaskAndId(),
-                mainMailboxExecutor,
-                systemTimerService);
+        checkpointBarrierHandler =
+                InputProcessorUtil.createCheckpointBarrierHandler(
+                        this,
+                        configuration,
+                        getCheckpointCoordinator(),
+                        getTaskNameWithSubtaskAndId(),
+                        new List[] {Arrays.asList(inputGates)},
+                        Collections.emptyList(),
+                        mainMailboxExecutor,
+                        systemTimerService);
+
+        CheckpointedInputGate[] checkpointedInputGates =
+                InputProcessorUtil.createCheckpointedMultipleInputGate(
+                        mainMailboxExecutor,
+                        new List[] {Arrays.asList(inputGates)},
+                        getEnvironment().getMetricGroup().getIOMetricGroup(),
+                        checkpointBarrierHandler,
+                        configuration);
+
+        return Iterables.getOnlyElement(Arrays.asList(checkpointedInputGates));
     }
 
     private DataOutput<IN> createDataOutput(Counter numRecordsIn) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 23fa66f..29ad693 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -1012,6 +1012,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
         }
     }
 
+    protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() 
{
+        return Optional.empty();
+    }
+
     @Override
     public void triggerCheckpointOnBarrier(
             CheckpointMetaData checkpointMetaData,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 6f15b8b..87e9ee6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -22,12 +22,16 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory;
+import 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
 import 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
 import org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -39,28 +43,44 @@ import static 
org.apache.flink.util.Preconditions.checkState;
 @Internal
 public class TwoInputStreamTask<IN1, IN2, OUT> extends 
AbstractTwoInputStreamTask<IN1, IN2, OUT> {
 
+    @Nullable private CheckpointBarrierHandler checkpointBarrierHandler;
+
     public TwoInputStreamTask(Environment env) throws Exception {
         super(env);
     }
 
     @Override
+    protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() 
{
+        return Optional.ofNullable(checkpointBarrierHandler);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
     protected void createInputProcessor(
             List<IndexedInputGate> inputGates1,
             List<IndexedInputGate> inputGates2,
             Function<Integer, StreamPartitioner<?>> gatePartitioners) {
 
         // create an input instance for each input
-        CheckpointedInputGate[] checkpointedInputGates =
-                InputProcessorUtil.createCheckpointedMultipleInputGate(
+        checkpointBarrierHandler =
+                InputProcessorUtil.createCheckpointBarrierHandler(
                         this,
-                        getConfiguration(),
+                        configuration,
                         getCheckpointCoordinator(),
-                        getEnvironment().getMetricGroup().getIOMetricGroup(),
                         getTaskNameWithSubtaskAndId(),
-                        mainMailboxExecutor,
                         new List[] {inputGates1, inputGates2},
                         Collections.emptyList(),
+                        mainMailboxExecutor,
                         systemTimerService);
+
+        CheckpointedInputGate[] checkpointedInputGates =
+                InputProcessorUtil.createCheckpointedMultipleInputGate(
+                        mainMailboxExecutor,
+                        new List[] {inputGates1, inputGates2},
+                        getEnvironment().getMetricGroup().getIOMetricGroup(),
+                        checkpointBarrierHandler,
+                        configuration);
+
         checkState(checkpointedInputGates.length == 2);
 
         inputProcessor =
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java
index ff4cdbf..76b67e2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java
@@ -69,24 +69,29 @@ public class InputProcessorUtilTest {
                         Collections.singletonList(getGate(0, 2)),
                     };
 
-            CheckpointedInputGate[] checkpointedMultipleInputGate =
-                    InputProcessorUtil.createCheckpointedMultipleInputGate(
+            CheckpointBarrierHandler barrierHandler =
+                    InputProcessorUtil.createCheckpointBarrierHandler(
                             streamTask,
                             streamConfig,
                             new TestSubtaskCheckpointCoordinator(new 
MockChannelStateWriter()),
-                            environment.getMetricGroup().getIOMetricGroup(),
                             streamTask.getName(),
-                            new SyncMailboxExecutor(),
                             inputGates,
                             Collections.emptyList(),
+                            new SyncMailboxExecutor(),
                             new TestProcessingTimeService());
+
+            CheckpointedInputGate[] checkpointedMultipleInputGate =
+                    InputProcessorUtil.createCheckpointedMultipleInputGate(
+                            new SyncMailboxExecutor(),
+                            inputGates,
+                            environment.getMetricGroup().getIOMetricGroup(),
+                            barrierHandler,
+                            streamConfig);
+
             for (CheckpointedInputGate checkpointedInputGate : 
checkpointedMultipleInputGate) {
                 registry.registerCloseable(checkpointedInputGate);
             }
 
-            CheckpointBarrierHandler barrierHandler =
-                    
checkpointedMultipleInputGate[0].getCheckpointBarrierHandler();
-
             List<IndexedInputGate> allInputGates =
                     Arrays.stream(inputGates)
                             .flatMap(gates -> gates.stream())

Reply via email to