This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb3eeb187bf520e98d950fdc7b5eee1fa3648a8a
Author: Arvid Heise <[email protected]>
AuthorDate: Mon Jun 15 21:19:36 2020 +0200

    [FLINK-18094][network] Using lists instead of collections of gates while 
creating checkpoint handlers.
    
    The actual implementation have been lists all along and we assume ordering 
anyways.
---
 .../flink/streaming/runtime/io/InputGateUtil.java     | 19 +++++--------------
 .../streaming/runtime/io/InputProcessorUtil.java      |  7 +------
 .../runtime/tasks/AbstractTwoInputStreamTask.java     |  5 ++---
 .../runtime/tasks/MultipleInputStreamTask.java        |  3 +--
 .../streaming/runtime/tasks/TwoInputStreamTask.java   |  6 +++---
 .../streaming/runtime/io/InputProcessorUtilTest.java  |  3 +--
 6 files changed, 13 insertions(+), 30 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
index 4458ac0..abfab89 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
@@ -22,8 +22,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 /**
@@ -34,22 +32,15 @@ import java.util.List;
 @Internal
 public class InputGateUtil {
 
-       public static InputGate createInputGate(Collection<IndexedInputGate> 
inputGates1, Collection<IndexedInputGate> inputGates2) {
-               List<IndexedInputGate> gates = new 
ArrayList<>(inputGates1.size() + inputGates2.size());
-               gates.addAll(inputGates1);
-               gates.addAll(inputGates2);
-               return createInputGate(gates.toArray(new 
IndexedInputGate[gates.size()]));
-       }
-
-       public static InputGate createInputGate(IndexedInputGate[] inputGates) {
-               if (inputGates.length <= 0) {
+       public static InputGate createInputGate(List<IndexedInputGate> 
inputGates) {
+               if (inputGates.size() <= 0) {
                        throw new RuntimeException("No such input gate.");
                }
 
-               if (inputGates.length < 2) {
-                       return inputGates[0];
+               if (inputGates.size() == 1) {
+                       return inputGates.get(0);
                } else {
-                       return new UnionInputGate(inputGates);
+                       return new UnionInputGate(inputGates.toArray(new 
IndexedInputGate[0]));
                }
        }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 40796f5..762761b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -69,12 +69,7 @@ public class InputProcessorUtil {
                        SubtaskCheckpointCoordinator checkpointCoordinator,
                        TaskIOMetricGroup taskIOMetricGroup,
                        String taskName,
-                       Collection<IndexedInputGate> ...inputGates) {
-
-               InputGate[] unionedInputGates = new 
InputGate[inputGates.length];
-               for (int i = 0; i < inputGates.length; i++) {
-                       unionedInputGates[i] = 
InputGateUtil.createInputGate(inputGates[i].toArray(new IndexedInputGate[0]));
-               }
+                       List<IndexedInputGate>... inputGates) {
 
                IntStream numberOfInputChannelsPerGate =
                        Arrays
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java
index a3cf4bf..145d5bc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 /**
@@ -95,8 +94,8 @@ public abstract class AbstractTwoInputStreamTask<IN1, IN2, 
OUT> extends StreamTa
        }
 
        protected abstract void createInputProcessor(
-               Collection<IndexedInputGate> inputGates1,
-               Collection<IndexedInputGate> inputGates2,
+               List<IndexedInputGate> inputGates1,
+               List<IndexedInputGate> inputGates2,
                TypeSerializer<IN1> inputDeserializer1,
                TypeSerializer<IN2> inputDeserializer2) throws Exception;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
index 1578323..4bb185d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
@@ -34,7 +34,6 @@ import 
org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -84,7 +83,7 @@ public class MultipleInputStreamTask<OUT> extends 
StreamTask<OUT, MultipleInputS
        }
 
        protected void createInputProcessor(
-                       Collection<IndexedInputGate>[] inputGates,
+                       List<IndexedInputGate>[] inputGates,
                        TypeSerializer<?>[] inputDeserializers,
                        WatermarkGauge[] inputWatermarkGauges) {
                MultipleInputSelectionHandler selectionHandler = new 
MultipleInputSelectionHandler(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 88d1246..625cf2a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.streaming.runtime.io.InputProcessorUtil;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
 import org.apache.flink.streaming.runtime.io.TwoInputSelectionHandler;
 
-import java.util.Collection;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -45,8 +45,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
AbstractTwoInputStreamTas
 
        @Override
        protected void createInputProcessor(
-               Collection<IndexedInputGate> inputGates1,
-               Collection<IndexedInputGate> inputGates2,
+               List<IndexedInputGate> inputGates1,
+               List<IndexedInputGate> inputGates2,
                TypeSerializer<IN1> inputDeserializer1,
                TypeSerializer<IN2> inputDeserializer2) {
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
index 37a55d6..a17ead0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
@@ -39,7 +39,6 @@ import org.apache.flink.streaming.util.MockStreamTaskBuilder;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -89,7 +88,7 @@ public class InputProcessorUtilTest {
                        streamConfig.setUnalignedCheckpointsEnabled(true);
 
                        // First input gate has index larger than the second
-                       Collection<IndexedInputGate>[] inputGates = new 
Collection[] {
+                       List<IndexedInputGate>[] inputGates = new List[] {
                                Collections.singletonList(new 
MockIndexedInputGate(1, 4)),
                                Collections.singletonList(new 
MockIndexedInputGate(0, 2)),
                        };

Reply via email to