pnowojski commented on a change in pull request #12575:
URL: https://github.com/apache/flink/pull/12575#discussion_r440595683



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
##########
@@ -105,11 +105,11 @@ public UnionInputGate(IndexedInputGate... inputGates) {
                inputChannelToInputGateIndex = new 
int[totalNumberOfInputChannels];
 
                int currentNumberOfInputChannels = 0;
-               for (final IndexedInputGate inputGate : inputGates) {
-                       inputGateChannelIndexOffsets[inputGate.getGateIndex()] 
= currentNumberOfInputChannels;
+               for (int index = 0; index < inputGates.length; index++) {

Review comment:
       This seems to be going in the opposite direction - we are replacing 
indexing based on the real IDs with those based on the order?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -71,108 +69,55 @@ public static CheckpointedInputGate 
createCheckpointedInputGate(
                        String taskName,
                        List<IndexedInputGate>... inputGates) {
 
-               IntStream numberOfInputChannelsPerGate =
-                       Arrays
-                               .stream(inputGates)
-                               .flatMap(collection -> collection.stream())
-                               
.sorted(Comparator.comparingInt(IndexedInputGate::getGateIndex))
-                               .mapToInt(InputGate::getNumberOfInputChannels);
-
-               Map<InputGate, Integer> inputGateToChannelIndexOffset = 
generateInputGateToChannelIndexOffsetMap(unionedInputGates);
-               // Note that numberOfInputChannelsPerGate and 
inputGateToChannelIndexOffset have a bit different
-               // indexing and purposes.
-               //
-               // The numberOfInputChannelsPerGate is indexed based on 
flattened input gates, and sorted based on GateIndex,
-               // so that it can be used in combination with InputChannelInfo 
class.
-               //
-               // The inputGateToChannelIndexOffset is based upon unioned 
input gates and it's use for translating channel
-               // indexes from perspective of UnionInputGate to perspective of 
SingleInputGate.
-
+               IndexedInputGate[] sortedInputGates = Arrays.stream(inputGates)
+                       .flatMap(Collection::stream)
+                       
.sorted(Comparator.comparing(IndexedInputGate::getGateIndex))
+                       .toArray(IndexedInputGate[]::new);
                CheckpointBarrierHandler barrierHandler = 
createCheckpointBarrierHandler(
                        config,
-                       numberOfInputChannelsPerGate,
+                       sortedInputGates,
                        checkpointCoordinator,
                        taskName,
-                       generateChannelIndexToInputGateMap(unionedInputGates),
-                       inputGateToChannelIndexOffset,
                        toNotifyOnCheckpoint);
                registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
 
+               InputGate[] unionedInputGates = Arrays.stream(inputGates)

Review comment:
       It's a bit confusing that above we have `sortedInputGates` and those 
input gates here are not sorted. It means we end up with a confusing state, 
where `CheckpointBarrierHandler#inputGates` can be accessed via 
`inputGateIndex` while `UnionInputGate#inputGates` can not be.
   
   I understand why is it so, first one is flattened structure of all input 
gates, while the other has only a subset of gates. Maybe we can keep it as it 
is for now, as this commit is already simplifying things, but maybe we should 
replace `UnionInputGate#inputGates` array with a map?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to