This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 33b8600e3f6626e4ec2952dcd7a3d56d0ac57c1b
Author: Arvid Heise <[email protected]>
AuthorDate: Mon Jun 15 21:16:16 2020 +0200

    [FLINK-18094][network] Simplifying InputProcessorUtil by delegating 
createCheckpointedInputGate to createCheckpointedMultipleInputGate.
---
 .../streaming/runtime/io/InputProcessorUtil.java     | 20 ++++++++------------
 1 file changed, 8 insertions(+), 12 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index b397b9e..40796f5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
@@ -39,7 +41,7 @@ import java.util.stream.IntStream;
  */
 @Internal
 public class InputProcessorUtil {
-
+       @SuppressWarnings("unchecked")
        public static CheckpointedInputGate createCheckpointedInputGate(
                        AbstractInvokable toNotifyOnCheckpoint,
                        StreamConfig config,
@@ -47,20 +49,14 @@ public class InputProcessorUtil {
                        IndexedInputGate[] inputGates,
                        TaskIOMetricGroup taskIOMetricGroup,
                        String taskName) {
-               InputGate inputGate = InputGateUtil.createInputGate(inputGates);
-               CheckpointBarrierHandler barrierHandler = 
createCheckpointBarrierHandler(
+               CheckpointedInputGate[] checkpointedInputGates = 
createCheckpointedMultipleInputGate(
+                       toNotifyOnCheckpoint,
                        config,
-                       
Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels),
                        checkpointCoordinator,
+                       taskIOMetricGroup,
                        taskName,
-                       generateChannelIndexToInputGateMap(inputGate),
-                       generateInputGateToChannelIndexOffsetMap(inputGate),
-                       toNotifyOnCheckpoint);
-               registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
-
-               
barrierHandler.getBufferReceivedListener().ifPresent(inputGate::registerBufferReceivedListener);
-
-               return new CheckpointedInputGate(inputGate, barrierHandler);
+                       Arrays.asList(inputGates));
+               return 
Iterables.getOnlyElement(Arrays.asList(checkpointedInputGates));
        }
 
        /**

Reply via email to