AHeise commented on a change in pull request #12575:
URL: https://github.com/apache/flink/pull/12575#discussion_r439030513



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -91,31 +91,25 @@
        private final ThreadSafeUnaligner threadSafeUnaligner;
 
        CheckpointBarrierUnaligner(
-                       int[] numberOfInputChannelsPerGate,
                        SubtaskCheckpointCoordinator checkpointCoordinator,
                        String taskName,
-                       AbstractInvokable toNotifyOnCheckpoint) {
+                       AbstractInvokable toNotifyOnCheckpoint,
+                       IndexedInputGate... inputGates) {
                super(toNotifyOnCheckpoint);
 
                this.taskName = taskName;
-
-               final int numGates = numberOfInputChannelsPerGate.length;
-
-               gateChannelOffsets = new int[numGates];
-               for (int index = 1; index < numGates; index++) {
-                       gateChannelOffsets[index] = gateChannelOffsets[index - 
1] + numberOfInputChannelsPerGate[index - 1];
-               }
-
-               final int totalNumChannels = gateChannelOffsets[numGates - 1] + 
numberOfInputChannelsPerGate[numGates - 1];
-               hasInflightBuffers = new boolean[totalNumChannels];
-
-               channelInfos = IntStream.range(0, numGates)
-                       .mapToObj(gateIndex -> IntStream.range(0, 
numberOfInputChannelsPerGate[gateIndex])
-                               .mapToObj(channelIndex -> new 
InputChannelInfo(gateIndex, channelIndex)))
-                       .flatMap(Function.identity())
+               this.channelInfos = Arrays.stream(inputGates)
+                       .flatMap(gate -> 
gate.getChannels().stream().map(InputChannel::getChannelInfo))
                        .toArray(InputChannelInfo[]::new);
-
-               threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, 
checkNotNull(checkpointCoordinator), this);
+               hasInflightBuffers = new boolean[channelInfos.length];
+               threadSafeUnaligner = new 
ThreadSafeUnaligner(channelInfos.length, checkNotNull(checkpointCoordinator), 
this);
+
+               gateChannelOffsets = new int[inputGates.length];
+               int offset = 0;
+               for (final IndexedInputGate gate: inputGates) {
+                       gateChannelOffsets[gate.getGateIndex()] = offset;
+                       offset += gate.getNumberOfInputChannels();
+               }

Review comment:
       That's a tough one; I had feared you pointing it out. The issue is that 
they are all similar but in the end also quite different.
   `CheckpointBarrierAligner` creates a map gate->offset, because it also 
creates an index->gate array, which it uses in `resumeConsumption` to first 
resolve the gate for a given flattened channel index and then delegate 
`resumeConsumption` with adjusted index (that's where the offset is needed).
   `CheckpointBarrierUnaligner` mainly uses the `InputChannelInfo` and hence 
creates a gate index->offset map to create the flattened index for other 
`BarrierHandler` stuff.
   These two versions gets the `SingleInputGates` (only they have indices).
   
   `InputProcessorUtil.createCheckpointedInputGates` uses union input gates, so 
it cannot use indexes. So it goes by list index. However, now that I write it 
down this looks also suspicious...
   I guess to use simple offsets in `CheckpointedInput` (because of union 
gates), we actually need to assume consecutive indices... @pnowojski however 
also saw some strange cases, maybe he can add his observations.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to