AHeise commented on a change in pull request #12575:
URL: https://github.com/apache/flink/pull/12575#discussion_r439030513
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -91,31 +91,25 @@
private final ThreadSafeUnaligner threadSafeUnaligner;
CheckpointBarrierUnaligner(
- int[] numberOfInputChannelsPerGate,
SubtaskCheckpointCoordinator checkpointCoordinator,
String taskName,
- AbstractInvokable toNotifyOnCheckpoint) {
+ AbstractInvokable toNotifyOnCheckpoint,
+ IndexedInputGate... inputGates) {
super(toNotifyOnCheckpoint);
this.taskName = taskName;
-
- final int numGates = numberOfInputChannelsPerGate.length;
-
- gateChannelOffsets = new int[numGates];
- for (int index = 1; index < numGates; index++) {
- gateChannelOffsets[index] = gateChannelOffsets[index -
1] + numberOfInputChannelsPerGate[index - 1];
- }
-
- final int totalNumChannels = gateChannelOffsets[numGates - 1] +
numberOfInputChannelsPerGate[numGates - 1];
- hasInflightBuffers = new boolean[totalNumChannels];
-
- channelInfos = IntStream.range(0, numGates)
- .mapToObj(gateIndex -> IntStream.range(0,
numberOfInputChannelsPerGate[gateIndex])
- .mapToObj(channelIndex -> new
InputChannelInfo(gateIndex, channelIndex)))
- .flatMap(Function.identity())
+ this.channelInfos = Arrays.stream(inputGates)
+ .flatMap(gate ->
gate.getChannels().stream().map(InputChannel::getChannelInfo))
.toArray(InputChannelInfo[]::new);
-
- threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels,
checkNotNull(checkpointCoordinator), this);
+ hasInflightBuffers = new boolean[channelInfos.length];
+ threadSafeUnaligner = new
ThreadSafeUnaligner(channelInfos.length, checkNotNull(checkpointCoordinator),
this);
+
+ gateChannelOffsets = new int[inputGates.length];
+ int offset = 0;
+ for (final IndexedInputGate gate: inputGates) {
+ gateChannelOffsets[gate.getGateIndex()] = offset;
+ offset += gate.getNumberOfInputChannels();
+ }
Review comment:
That's a tough one; I had feared you pointing it out. The issue is that
they are all similar but in the end also quite different.
`CheckpointBarrierAligner` creates a map gate->offset, because it also
creates an index->gate array, which it uses in `resumeConsumption` to first
resolve the gate for a given flattened channel index and then delegate
`resumeConsumption` with adjusted index (that's where the offset is needed).
`CheckpointBarrierUnaligner` mainly uses the `InputChannelInfo` and hence
creates a gate index->offset map to create the flattened index for other
`BarrierHandler` stuff.
These two versions gets the `SingleInputGates` (only they have indices).
`InputProcessorUtil.createCheckpointedInputGates` uses union input gates, so
it cannot use indexes. So it goes by list index. However, now that I write it
down this looks also suspicious...
I guess to use simple offsets in `CheckpointedInput` (because of union
gates), we actually need to assume consecutive indices... @pnowojski however
also saw some strange cases, maybe he can add his observations.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]