pnowojski commented on a change in pull request #12575: URL: https://github.com/apache/flink/pull/12575#discussion_r440595683
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java ########## @@ -105,11 +105,11 @@ public UnionInputGate(IndexedInputGate... inputGates) { inputChannelToInputGateIndex = new int[totalNumberOfInputChannels]; int currentNumberOfInputChannels = 0; - for (final IndexedInputGate inputGate : inputGates) { - inputGateChannelIndexOffsets[inputGate.getGateIndex()] = currentNumberOfInputChannels; + for (int index = 0; index < inputGates.length; index++) { Review comment: This seems to be going in the opposite direction - we are replacing indexing based on the real IDs with those based on the order? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java ########## @@ -71,108 +69,55 @@ public static CheckpointedInputGate createCheckpointedInputGate( String taskName, List<IndexedInputGate>... inputGates) { - IntStream numberOfInputChannelsPerGate = - Arrays - .stream(inputGates) - .flatMap(collection -> collection.stream()) - .sorted(Comparator.comparingInt(IndexedInputGate::getGateIndex)) - .mapToInt(InputGate::getNumberOfInputChannels); - - Map<InputGate, Integer> inputGateToChannelIndexOffset = generateInputGateToChannelIndexOffsetMap(unionedInputGates); - // Note that numberOfInputChannelsPerGate and inputGateToChannelIndexOffset have a bit different - // indexing and purposes. - // - // The numberOfInputChannelsPerGate is indexed based on flattened input gates, and sorted based on GateIndex, - // so that it can be used in combination with InputChannelInfo class. - // - // The inputGateToChannelIndexOffset is based upon unioned input gates and it's use for translating channel - // indexes from perspective of UnionInputGate to perspective of SingleInputGate. - + IndexedInputGate[] sortedInputGates = Arrays.stream(inputGates) + .flatMap(Collection::stream) + .sorted(Comparator.comparing(IndexedInputGate::getGateIndex)) + .toArray(IndexedInputGate[]::new); CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler( config, - numberOfInputChannelsPerGate, + sortedInputGates, checkpointCoordinator, taskName, - generateChannelIndexToInputGateMap(unionedInputGates), - inputGateToChannelIndexOffset, toNotifyOnCheckpoint); registerCheckpointMetrics(taskIOMetricGroup, barrierHandler); + InputGate[] unionedInputGates = Arrays.stream(inputGates) Review comment: It's a bit confusing that above we have `sortedInputGates` and those input gates here are not sorted. It means we end up with a confusing state, where `CheckpointBarrierHandler#inputGates` can be accessed via `inputGateIndex` while `UnionInputGate#inputGates` can not be. I understand why is it so, first one is flattened structure of all input gates, while the other has only a subset of gates. Maybe we can keep it as it is for now, as this commit is already simplifying things, but maybe we should replace `UnionInputGate#inputGates` array with a map? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org