rkhachatryan commented on a change in pull request #12575:
URL: https://github.com/apache/flink/pull/12575#discussion_r438059312



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
##########
@@ -38,18 +40,18 @@ public static InputGate 
createInputGate(Collection<IndexedInputGate> inputGates1
                List<IndexedInputGate> gates = new 
ArrayList<>(inputGates1.size() + inputGates2.size());
                gates.addAll(inputGates1);
                gates.addAll(inputGates2);
-               return createInputGate(gates.toArray(new 
IndexedInputGate[gates.size()]));
+               return createInputGate(gates);
        }
 
-       public static InputGate createInputGate(IndexedInputGate[] inputGates) {
-               if (inputGates.length <= 0) {
+       public static InputGate createInputGate(Collection<IndexedInputGate> 
inputGates) {

Review comment:
       Replacing an array with `Collection` allows to lose ordering. Why not to 
use `List` instead?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -28,39 +29,28 @@
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.IntStream;
 
 /**
  * Utility for creating {@link CheckpointedInputGate} based on checkpoint mode
  * for {@link StreamOneInputProcessor} and {@link StreamTwoInputProcessor}.
  */
 @Internal
 public class InputProcessorUtil {
-
+       @SuppressWarnings("unchecked")
        public static CheckpointedInputGate createCheckpointedInputGate(
                        AbstractInvokable toNotifyOnCheckpoint,
                        StreamConfig config,
                        SubtaskCheckpointCoordinator checkpointCoordinator,
                        IndexedInputGate[] inputGates,
                        TaskIOMetricGroup taskIOMetricGroup,
                        String taskName) {
-               InputGate inputGate = InputGateUtil.createInputGate(inputGates);
-               CheckpointBarrierHandler barrierHandler = 
createCheckpointBarrierHandler(
+               return createCheckpointedMultipleInputGate(
+                       toNotifyOnCheckpoint,
                        config,
-                       
Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels),
                        checkpointCoordinator,
+                       taskIOMetricGroup,
                        taskName,
-                       generateChannelIndexToInputGateMap(inputGate),
-                       generateInputGateToChannelIndexOffsetMap(inputGate),
-                       toNotifyOnCheckpoint);
-               registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
-
-               
barrierHandler.getBufferReceivedListener().ifPresent(inputGate::registerBufferReceivedListener);
-
-               return new CheckpointedInputGate(inputGate, barrierHandler);
+                       Arrays.asList(inputGates))[0];

Review comment:
       Can we add a check that returned value is a single-element array to 
prevent future bugs?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -91,31 +91,25 @@
        private final ThreadSafeUnaligner threadSafeUnaligner;
 
        CheckpointBarrierUnaligner(
-                       int[] numberOfInputChannelsPerGate,
                        SubtaskCheckpointCoordinator checkpointCoordinator,
                        String taskName,
-                       AbstractInvokable toNotifyOnCheckpoint) {
+                       AbstractInvokable toNotifyOnCheckpoint,
+                       IndexedInputGate... inputGates) {
                super(toNotifyOnCheckpoint);
 
                this.taskName = taskName;
-
-               final int numGates = numberOfInputChannelsPerGate.length;
-
-               gateChannelOffsets = new int[numGates];
-               for (int index = 1; index < numGates; index++) {
-                       gateChannelOffsets[index] = gateChannelOffsets[index - 
1] + numberOfInputChannelsPerGate[index - 1];
-               }
-
-               final int totalNumChannels = gateChannelOffsets[numGates - 1] + 
numberOfInputChannelsPerGate[numGates - 1];
-               hasInflightBuffers = new boolean[totalNumChannels];
-
-               channelInfos = IntStream.range(0, numGates)
-                       .mapToObj(gateIndex -> IntStream.range(0, 
numberOfInputChannelsPerGate[gateIndex])
-                               .mapToObj(channelIndex -> new 
InputChannelInfo(gateIndex, channelIndex)))
-                       .flatMap(Function.identity())
+               this.channelInfos = Arrays.stream(inputGates)
+                       .flatMap(gate -> 
gate.getChannels().stream().map(InputChannel::getChannelInfo))
                        .toArray(InputChannelInfo[]::new);
-
-               threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, 
checkNotNull(checkpointCoordinator), this);
+               hasInflightBuffers = new boolean[channelInfos.length];
+               threadSafeUnaligner = new 
ThreadSafeUnaligner(channelInfos.length, checkNotNull(checkpointCoordinator), 
this);
+
+               gateChannelOffsets = new int[inputGates.length];
+               int offset = 0;
+               for (final IndexedInputGate gate: inputGates) {
+                       gateChannelOffsets[gate.getGateIndex()] = offset;
+                       offset += gate.getNumberOfInputChannels();
+               }

Review comment:
       I see similar code in `CheckpointBarrierAligner` constructor and same in 
`InputProcessorUtil.createCheckpointedInputGates`.
   WDYT about extracting it into something like `InputGateResolver` and passing 
it to barrier handlers, input gates and whoever else need it?
   
   Ideally, I think we should use self-sufficient IDs, but I guess this would 
be too invasive change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to