This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 33b8600e3f6626e4ec2952dcd7a3d56d0ac57c1b Author: Arvid Heise <[email protected]> AuthorDate: Mon Jun 15 21:16:16 2020 +0200 [FLINK-18094][network] Simplifying InputProcessorUtil by delegating createCheckpointedInputGate to createCheckpointedMultipleInputGate. --- .../streaming/runtime/io/InputProcessorUtil.java | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java index b397b9e..40796f5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator; +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + import java.util.Arrays; import java.util.Collection; import java.util.Comparator; @@ -39,7 +41,7 @@ import java.util.stream.IntStream; */ @Internal public class InputProcessorUtil { - + @SuppressWarnings("unchecked") public static CheckpointedInputGate createCheckpointedInputGate( AbstractInvokable toNotifyOnCheckpoint, StreamConfig config, @@ -47,20 +49,14 @@ public class InputProcessorUtil { IndexedInputGate[] inputGates, TaskIOMetricGroup taskIOMetricGroup, String taskName) { - InputGate inputGate = InputGateUtil.createInputGate(inputGates); - CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler( + CheckpointedInputGate[] checkpointedInputGates = createCheckpointedMultipleInputGate( + toNotifyOnCheckpoint, config, - Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels), checkpointCoordinator, + taskIOMetricGroup, taskName, - generateChannelIndexToInputGateMap(inputGate), - generateInputGateToChannelIndexOffsetMap(inputGate), - toNotifyOnCheckpoint); - registerCheckpointMetrics(taskIOMetricGroup, barrierHandler); - - barrierHandler.getBufferReceivedListener().ifPresent(inputGate::registerBufferReceivedListener); - - return new CheckpointedInputGate(inputGate, barrierHandler); + Arrays.asList(inputGates)); + return Iterables.getOnlyElement(Arrays.asList(checkpointedInputGates)); } /**
