pnowojski commented on a change in pull request #12575:
URL: https://github.com/apache/flink/pull/12575#discussion_r439306816
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -91,31 +91,25 @@
private final ThreadSafeUnaligner threadSafeUnaligner;
CheckpointBarrierUnaligner(
- int[] numberOfInputChannelsPerGate,
SubtaskCheckpointCoordinator checkpointCoordinator,
String taskName,
- AbstractInvokable toNotifyOnCheckpoint) {
+ AbstractInvokable toNotifyOnCheckpoint,
+ IndexedInputGate... inputGates) {
super(toNotifyOnCheckpoint);
this.taskName = taskName;
-
- final int numGates = numberOfInputChannelsPerGate.length;
-
- gateChannelOffsets = new int[numGates];
- for (int index = 1; index < numGates; index++) {
- gateChannelOffsets[index] = gateChannelOffsets[index -
1] + numberOfInputChannelsPerGate[index - 1];
- }
-
- final int totalNumChannels = gateChannelOffsets[numGates - 1] +
numberOfInputChannelsPerGate[numGates - 1];
- hasInflightBuffers = new boolean[totalNumChannels];
-
- channelInfos = IntStream.range(0, numGates)
- .mapToObj(gateIndex -> IntStream.range(0,
numberOfInputChannelsPerGate[gateIndex])
- .mapToObj(channelIndex -> new
InputChannelInfo(gateIndex, channelIndex)))
- .flatMap(Function.identity())
+ this.channelInfos = Arrays.stream(inputGates)
+ .flatMap(gate ->
gate.getChannels().stream().map(InputChannel::getChannelInfo))
.toArray(InputChannelInfo[]::new);
-
- threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels,
checkNotNull(checkpointCoordinator), this);
+ hasInflightBuffers = new boolean[channelInfos.length];
+ threadSafeUnaligner = new
ThreadSafeUnaligner(channelInfos.length, checkNotNull(checkpointCoordinator),
this);
+
+ gateChannelOffsets = new int[inputGates.length];
+ int offset = 0;
+ for (final IndexedInputGate gate: inputGates) {
+ gateChannelOffsets[gate.getGateIndex()] = offset;
+ offset += gate.getNumberOfInputChannels();
+ }
Review comment:
@AHeise are you asking about
`InputProcessorUtil#generateInputGateToChannelIndexOffsetMap` not using the
input gate indexes? I'm not sure. I guess it is inconsistent with the
`numberOfInputChannelsPerGate` but it might be correct, as all of the
references/usages of `inputGateToChannelIndexOffset` are going through the
offsetting that also uses `inputGateToChannelIndexOffset` (for example
`CheckpointedInputGate#offsetChannelIndex`). But it's really hard to reason
about.
It's also quite strange that when polling records from input gate, we are
first offsetting them once via using `CheckpointedInputGate#offsetChannelIndex`
and then we are using basically the same information to revert the offset back,
to the original value in `CheckpointBarrierAligner#resumeConsumption`.
If we could simplify it by always using pair of ids without affecting
performance that would be really nice...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]