pnowojski commented on a change in pull request #12575:
URL: https://github.com/apache/flink/pull/12575#discussion_r439306816



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -91,31 +91,25 @@
        private final ThreadSafeUnaligner threadSafeUnaligner;
 
        CheckpointBarrierUnaligner(
-                       int[] numberOfInputChannelsPerGate,
                        SubtaskCheckpointCoordinator checkpointCoordinator,
                        String taskName,
-                       AbstractInvokable toNotifyOnCheckpoint) {
+                       AbstractInvokable toNotifyOnCheckpoint,
+                       IndexedInputGate... inputGates) {
                super(toNotifyOnCheckpoint);
 
                this.taskName = taskName;
-
-               final int numGates = numberOfInputChannelsPerGate.length;
-
-               gateChannelOffsets = new int[numGates];
-               for (int index = 1; index < numGates; index++) {
-                       gateChannelOffsets[index] = gateChannelOffsets[index - 
1] + numberOfInputChannelsPerGate[index - 1];
-               }
-
-               final int totalNumChannels = gateChannelOffsets[numGates - 1] + 
numberOfInputChannelsPerGate[numGates - 1];
-               hasInflightBuffers = new boolean[totalNumChannels];
-
-               channelInfos = IntStream.range(0, numGates)
-                       .mapToObj(gateIndex -> IntStream.range(0, 
numberOfInputChannelsPerGate[gateIndex])
-                               .mapToObj(channelIndex -> new 
InputChannelInfo(gateIndex, channelIndex)))
-                       .flatMap(Function.identity())
+               this.channelInfos = Arrays.stream(inputGates)
+                       .flatMap(gate -> 
gate.getChannels().stream().map(InputChannel::getChannelInfo))
                        .toArray(InputChannelInfo[]::new);
-
-               threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, 
checkNotNull(checkpointCoordinator), this);
+               hasInflightBuffers = new boolean[channelInfos.length];
+               threadSafeUnaligner = new 
ThreadSafeUnaligner(channelInfos.length, checkNotNull(checkpointCoordinator), 
this);
+
+               gateChannelOffsets = new int[inputGates.length];
+               int offset = 0;
+               for (final IndexedInputGate gate: inputGates) {
+                       gateChannelOffsets[gate.getGateIndex()] = offset;
+                       offset += gate.getNumberOfInputChannels();
+               }

Review comment:
       @AHeise are you asking about 
`InputProcessorUtil#generateInputGateToChannelIndexOffsetMap` not using the 
input gate indexes? I'm not sure. I guess it is inconsistent with the 
`numberOfInputChannelsPerGate` but it might be correct, as all of the 
references/usages of `inputGateToChannelIndexOffset` are going through the 
offsetting that also uses `inputGateToChannelIndexOffset` (for example 
`CheckpointedInputGate#offsetChannelIndex`). But it's really hard to reason 
about.
   
   It's also quite strange that when polling records from input gate, we are 
first offsetting them once via using `CheckpointedInputGate#offsetChannelIndex` 
and then we are using basically the same information to revert the offset back, 
to the original value in `CheckpointBarrierAligner#resumeConsumption`.
   
   If we could simplify it by always using pair of ids without affecting 
performance that would be really nice...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to