pnowojski commented on a change in pull request #12575:
URL: https://github.com/apache/flink/pull/12575#discussion_r440595683
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
##########
@@ -105,11 +105,11 @@ public UnionInputGate(IndexedInputGate... inputGates) {
inputChannelToInputGateIndex = new
int[totalNumberOfInputChannels];
int currentNumberOfInputChannels = 0;
- for (final IndexedInputGate inputGate : inputGates) {
- inputGateChannelIndexOffsets[inputGate.getGateIndex()]
= currentNumberOfInputChannels;
+ for (int index = 0; index < inputGates.length; index++) {
Review comment:
This seems to be going in the opposite direction - we are replacing
indexing based on the real IDs with those based on the order?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -71,108 +69,55 @@ public static CheckpointedInputGate
createCheckpointedInputGate(
String taskName,
List<IndexedInputGate>... inputGates) {
- IntStream numberOfInputChannelsPerGate =
- Arrays
- .stream(inputGates)
- .flatMap(collection -> collection.stream())
-
.sorted(Comparator.comparingInt(IndexedInputGate::getGateIndex))
- .mapToInt(InputGate::getNumberOfInputChannels);
-
- Map<InputGate, Integer> inputGateToChannelIndexOffset =
generateInputGateToChannelIndexOffsetMap(unionedInputGates);
- // Note that numberOfInputChannelsPerGate and
inputGateToChannelIndexOffset have a bit different
- // indexing and purposes.
- //
- // The numberOfInputChannelsPerGate is indexed based on
flattened input gates, and sorted based on GateIndex,
- // so that it can be used in combination with InputChannelInfo
class.
- //
- // The inputGateToChannelIndexOffset is based upon unioned
input gates and it's use for translating channel
- // indexes from perspective of UnionInputGate to perspective of
SingleInputGate.
-
+ IndexedInputGate[] sortedInputGates = Arrays.stream(inputGates)
+ .flatMap(Collection::stream)
+
.sorted(Comparator.comparing(IndexedInputGate::getGateIndex))
+ .toArray(IndexedInputGate[]::new);
CheckpointBarrierHandler barrierHandler =
createCheckpointBarrierHandler(
config,
- numberOfInputChannelsPerGate,
+ sortedInputGates,
checkpointCoordinator,
taskName,
- generateChannelIndexToInputGateMap(unionedInputGates),
- inputGateToChannelIndexOffset,
toNotifyOnCheckpoint);
registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
+ InputGate[] unionedInputGates = Arrays.stream(inputGates)
Review comment:
It's a bit confusing that above we have `sortedInputGates` and those
input gates here are not sorted. It means we end up with a confusing state,
where `CheckpointBarrierHandler#inputGates` can be accessed via
`inputGateIndex` while `UnionInputGate#inputGates` can not be.
I understand why is it so, first one is flattened structure of all input
gates, while the other has only a subset of gates. Maybe we can keep it as it
is for now, as this commit is already simplifying things, but maybe we should
replace `UnionInputGate#inputGates` array with a map?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]