This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit fb3eeb187bf520e98d950fdc7b5eee1fa3648a8a Author: Arvid Heise <[email protected]> AuthorDate: Mon Jun 15 21:19:36 2020 +0200 [FLINK-18094][network] Using lists instead of collections of gates while creating checkpoint handlers. The actual implementation have been lists all along and we assume ordering anyways. --- .../flink/streaming/runtime/io/InputGateUtil.java | 19 +++++-------------- .../streaming/runtime/io/InputProcessorUtil.java | 7 +------ .../runtime/tasks/AbstractTwoInputStreamTask.java | 5 ++--- .../runtime/tasks/MultipleInputStreamTask.java | 3 +-- .../streaming/runtime/tasks/TwoInputStreamTask.java | 6 +++--- .../streaming/runtime/io/InputProcessorUtilTest.java | 3 +-- 6 files changed, 13 insertions(+), 30 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java index 4458ac0..abfab89 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java @@ -22,8 +22,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; /** @@ -34,22 +32,15 @@ import java.util.List; @Internal public class InputGateUtil { - public static InputGate createInputGate(Collection<IndexedInputGate> inputGates1, Collection<IndexedInputGate> inputGates2) { - List<IndexedInputGate> gates = new ArrayList<>(inputGates1.size() + inputGates2.size()); - gates.addAll(inputGates1); - gates.addAll(inputGates2); - return createInputGate(gates.toArray(new IndexedInputGate[gates.size()])); - } - - public static InputGate createInputGate(IndexedInputGate[] inputGates) { - if (inputGates.length <= 0) { + public static InputGate createInputGate(List<IndexedInputGate> inputGates) { + if (inputGates.size() <= 0) { throw new RuntimeException("No such input gate."); } - if (inputGates.length < 2) { - return inputGates[0]; + if (inputGates.size() == 1) { + return inputGates.get(0); } else { - return new UnionInputGate(inputGates); + return new UnionInputGate(inputGates.toArray(new IndexedInputGate[0])); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java index 40796f5..762761b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java @@ -69,12 +69,7 @@ public class InputProcessorUtil { SubtaskCheckpointCoordinator checkpointCoordinator, TaskIOMetricGroup taskIOMetricGroup, String taskName, - Collection<IndexedInputGate> ...inputGates) { - - InputGate[] unionedInputGates = new InputGate[inputGates.length]; - for (int i = 0; i < inputGates.length; i++) { - unionedInputGates[i] = InputGateUtil.createInputGate(inputGates[i].toArray(new IndexedInputGate[0])); - } + List<IndexedInputGate>... inputGates) { IntStream numberOfInputChannelsPerGate = Arrays diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java index a3cf4bf..145d5bc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java @@ -29,7 +29,6 @@ import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge; import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import java.util.ArrayList; -import java.util.Collection; import java.util.List; /** @@ -95,8 +94,8 @@ public abstract class AbstractTwoInputStreamTask<IN1, IN2, OUT> extends StreamTa } protected abstract void createInputProcessor( - Collection<IndexedInputGate> inputGates1, - Collection<IndexedInputGate> inputGates2, + List<IndexedInputGate> inputGates1, + List<IndexedInputGate> inputGates2, TypeSerializer<IN1> inputDeserializer1, TypeSerializer<IN2> inputDeserializer2) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java index 1578323..4bb185d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java @@ -34,7 +34,6 @@ import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge; import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import static org.apache.flink.util.Preconditions.checkState; @@ -84,7 +83,7 @@ public class MultipleInputStreamTask<OUT> extends StreamTask<OUT, MultipleInputS } protected void createInputProcessor( - Collection<IndexedInputGate>[] inputGates, + List<IndexedInputGate>[] inputGates, TypeSerializer<?>[] inputDeserializers, WatermarkGauge[] inputWatermarkGauges) { MultipleInputSelectionHandler selectionHandler = new MultipleInputSelectionHandler( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 88d1246..625cf2a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -28,7 +28,7 @@ import org.apache.flink.streaming.runtime.io.InputProcessorUtil; import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor; import org.apache.flink.streaming.runtime.io.TwoInputSelectionHandler; -import java.util.Collection; +import java.util.List; import static org.apache.flink.util.Preconditions.checkState; @@ -45,8 +45,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTas @Override protected void createInputProcessor( - Collection<IndexedInputGate> inputGates1, - Collection<IndexedInputGate> inputGates2, + List<IndexedInputGate> inputGates1, + List<IndexedInputGate> inputGates2, TypeSerializer<IN1> inputDeserializer1, TypeSerializer<IN2> inputDeserializer2) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java index 37a55d6..a17ead0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java @@ -39,7 +39,6 @@ import org.apache.flink.streaming.util.MockStreamTaskBuilder; import org.junit.Test; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -89,7 +88,7 @@ public class InputProcessorUtilTest { streamConfig.setUnalignedCheckpointsEnabled(true); // First input gate has index larger than the second - Collection<IndexedInputGate>[] inputGates = new Collection[] { + List<IndexedInputGate>[] inputGates = new List[] { Collections.singletonList(new MockIndexedInputGate(1, 4)), Collections.singletonList(new MockIndexedInputGate(0, 2)), };
