tillrohrmann commented on a change in pull request #14868: URL: https://github.com/apache/flink/pull/14868#discussion_r584140805
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Class that manages all the connections between tasks. */ +public class EdgeManager { + + private final Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>> partitionConsumers = + new HashMap<>(); + + private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>> vertexConsumedPartitions = + new HashMap<>(); + + public void addPartitionConsumers( + IntermediateResultPartitionID resultPartitionId, ConsumerVertexGroup consumerVertices) { + + checkState(!partitionConsumers.containsKey(resultPartitionId)); + + final List<ConsumerVertexGroup> consumers = getPartitionConsumers(resultPartitionId); + + // sanity check + checkState( + consumers.size() == 0, + "Currently there has to be exactly one consumer in real jobs"); + + consumers.add(consumerVertices); + } + + public void addVertexConsumedPartitions( + ExecutionVertexID executionVertexId, + ConsumedPartitionGroup partitions, + int inputNumber) { + + final List<ConsumedPartitionGroup> consumedPartitions = + getVertexConsumedPartitions(executionVertexId); + + // sanity check + checkState(consumedPartitions.size() == inputNumber); + + consumedPartitions.add(partitions); + } + + public List<ConsumerVertexGroup> getPartitionConsumers( + IntermediateResultPartitionID resultPartitionId) { + return partitionConsumers.computeIfAbsent(resultPartitionId, id -> new ArrayList<>()); + } + + public List<ConsumedPartitionGroup> getVertexConsumedPartitions( + ExecutionVertexID executionVertexId) { + return vertexConsumedPartitions.computeIfAbsent(executionVertexId, id -> new ArrayList<>()); + } + + public Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>> + getAllPartitionConsumers() { + return partitionConsumers; + } Review comment: Where is this method used? Moreover, I would not return internal fields. At least we should wrap it in `Collections.unmodifiableMap` or so. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ########## @@ -133,7 +130,7 @@ public ExecutionVertex( resultPartitions.put(irp.getPartitionId(), irp); } - this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][]; + getExecutionGraph().registerExecutionVertex(executionVertexId, this); Review comment: This is a call to a overridable method during object construction. Ideally, we don't need these. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Class that manages all the connections between tasks. */ +public class EdgeManager { Review comment: Do we need some tests for this class? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Class that manages all the connections between tasks. */ +public class EdgeManager { + + private final Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>> partitionConsumers = + new HashMap<>(); + + private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>> vertexConsumedPartitions = + new HashMap<>(); + + public void addPartitionConsumers( + IntermediateResultPartitionID resultPartitionId, ConsumerVertexGroup consumerVertices) { + + checkState(!partitionConsumers.containsKey(resultPartitionId)); + + final List<ConsumerVertexGroup> consumers = getPartitionConsumers(resultPartitionId); + + // sanity check + checkState( + consumers.size() == 0, + "Currently there has to be exactly one consumer in real jobs"); + + consumers.add(consumerVertices); + } + + public void addVertexConsumedPartitions( + ExecutionVertexID executionVertexId, + ConsumedPartitionGroup partitions, + int inputNumber) { + + final List<ConsumedPartitionGroup> consumedPartitions = + getVertexConsumedPartitions(executionVertexId); + + // sanity check + checkState(consumedPartitions.size() == inputNumber); + + consumedPartitions.add(partitions); + } + + public List<ConsumerVertexGroup> getPartitionConsumers( + IntermediateResultPartitionID resultPartitionId) { + return partitionConsumers.computeIfAbsent(resultPartitionId, id -> new ArrayList<>()); + } + + public List<ConsumedPartitionGroup> getVertexConsumedPartitions( + ExecutionVertexID executionVertexId) { + return vertexConsumedPartitions.computeIfAbsent(executionVertexId, id -> new ArrayList<>()); + } Review comment: Also here I would be careful with returning inernal fields which can be used to modify the classes internal values. From an encapsulation point of view it would be better to have an `getPartitionConsumersInteral` method which returns the internal list and a `getPartitionConsumers` which returns an unmodifiable version of it. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Class that manages all the connections between tasks. */ +public class EdgeManager { + + private final Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>> partitionConsumers = + new HashMap<>(); + + private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>> vertexConsumedPartitions = + new HashMap<>(); + + public void addPartitionConsumers( + IntermediateResultPartitionID resultPartitionId, ConsumerVertexGroup consumerVertices) { + + checkState(!partitionConsumers.containsKey(resultPartitionId)); + + final List<ConsumerVertexGroup> consumers = getPartitionConsumers(resultPartitionId); + + // sanity check + checkState( + consumers.size() == 0, + "Currently there has to be exactly one consumer in real jobs"); + + consumers.add(consumerVertices); + } + + public void addVertexConsumedPartitions( + ExecutionVertexID executionVertexId, + ConsumedPartitionGroup partitions, + int inputNumber) { + + final List<ConsumedPartitionGroup> consumedPartitions = + getVertexConsumedPartitions(executionVertexId); + + // sanity check + checkState(consumedPartitions.size() == inputNumber); Review comment: Does this mean that we have to add the consumed partitions in increasing order? If this is the contract, then we might wanna add a JavaDoc explaining this more explicitly. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ########## @@ -321,107 +323,11 @@ public ExecutionGraph getExecutionGraph() { // Graph building // -------------------------------------------------------------------------------------------- - public void connectSource( - int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) { - - final DistributionPattern pattern = edge.getDistributionPattern(); - final IntermediateResultPartition[] sourcePartitions = source.getPartitions(); - - ExecutionEdge[] edges; - - switch (pattern) { - case POINTWISE: - edges = connectPointwise(sourcePartitions, inputNumber); - break; - - case ALL_TO_ALL: - edges = connectAllToAll(sourcePartitions, inputNumber); - break; - - default: - throw new RuntimeException("Unrecognized distribution pattern."); - } - - inputEdges[inputNumber] = edges; - - // add the consumers to the source - // for now (until the receiver initiated handshake is in place), we need to register the - // edges as the execution graph - for (ExecutionEdge ee : edges) { - ee.getSource().addConsumer(ee, consumerNumber); - } - } - - private ExecutionEdge[] connectAllToAll( - IntermediateResultPartition[] sourcePartitions, int inputNumber) { - ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length]; - - for (int i = 0; i < sourcePartitions.length; i++) { - IntermediateResultPartition irp = sourcePartitions[i]; - edges[i] = new ExecutionEdge(irp, this, inputNumber); - } - - return edges; - } - - private ExecutionEdge[] connectPointwise( - IntermediateResultPartition[] sourcePartitions, int inputNumber) { - final int numSources = sourcePartitions.length; - final int parallelism = getTotalNumberOfParallelSubtasks(); - - // simple case same number of sources as targets - if (numSources == parallelism) { - return new ExecutionEdge[] { - new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) - }; - } else if (numSources < parallelism) { - - int sourcePartition; - - // check if the pattern is regular or irregular - // we use int arithmetics for regular, and floating point with rounding for irregular - if (parallelism % numSources == 0) { - // same number of targets per source - int factor = parallelism / numSources; - sourcePartition = subTaskIndex / factor; - } else { - // different number of targets per source - float factor = ((float) parallelism) / numSources; - sourcePartition = (int) (subTaskIndex / factor); - } Review comment: This looks a bit different from what we have in the `EdgeManagerBuildUtil`. Why is this the case? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java ########## @@ -101,18 +102,18 @@ public TaskDeploymentDescriptor createDeploymentDescriptor( } private List<InputGateDeploymentDescriptor> createInputGateDeploymentDescriptors() { - List<InputGateDeploymentDescriptor> inputGates = new ArrayList<>(inputEdges.length); + List<InputGateDeploymentDescriptor> inputGates = new ArrayList<>(consumedPartitions.size()); - for (ExecutionEdge[] edges : inputEdges) { + for (List<IntermediateResultPartition> partitions : consumedPartitions) { // If the produced partition has multiple consumers registered, we // need to request the one matching our sub task index. // TODO Refactor after removing the consumers from the intermediate result partitions - int numConsumerEdges = edges[0].getSource().getConsumers().get(0).size(); + IntermediateResultPartition resultPartition = partitions.get(0); - int queueToRequest = subtaskIndex % numConsumerEdges; + int numConsumer = resultPartition.getConsumers().get(0).getVertices().size(); Review comment: `numConsumers` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.Collections; +import java.util.List; + +/** Group of consumed {@link IntermediateResultPartitionID}s. */ +public class ConsumedPartitionGroup { + private final List<IntermediateResultPartitionID> resultPartitions; + + public ConsumedPartitionGroup(List<IntermediateResultPartitionID> resultPartitions) { Review comment: Is it important that the `IntermediateResultPartitionIDs` are ordered or would `Collection<IntermediateResultPartitionID>` be good enough? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Class that manages all the connections between tasks. */ +public class EdgeManager { + + private final Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>> partitionConsumers = + new HashMap<>(); + + private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>> vertexConsumedPartitions = + new HashMap<>(); + + public void addPartitionConsumers( + IntermediateResultPartitionID resultPartitionId, ConsumerVertexGroup consumerVertices) { + + checkState(!partitionConsumers.containsKey(resultPartitionId)); + + final List<ConsumerVertexGroup> consumers = getPartitionConsumers(resultPartitionId); + + // sanity check + checkState( + consumers.size() == 0, + "Currently there has to be exactly one consumer in real jobs"); + + consumers.add(consumerVertices); + } + + public void addVertexConsumedPartitions( + ExecutionVertexID executionVertexId, + ConsumedPartitionGroup partitions, + int inputNumber) { + + final List<ConsumedPartitionGroup> consumedPartitions = + getVertexConsumedPartitions(executionVertexId); + + // sanity check + checkState(consumedPartitions.size() == inputNumber); + + consumedPartitions.add(partitions); + } + + public List<ConsumerVertexGroup> getPartitionConsumers( + IntermediateResultPartitionID resultPartitionId) { + return partitionConsumers.computeIfAbsent(resultPartitionId, id -> new ArrayList<>()); + } + + public List<ConsumedPartitionGroup> getVertexConsumedPartitions( + ExecutionVertexID executionVertexId) { + return vertexConsumedPartitions.computeIfAbsent(executionVertexId, id -> new ArrayList<>()); + } + + public Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>> + getAllPartitionConsumers() { + return partitionConsumers; + } + + public Map<ExecutionVertexID, List<ConsumedPartitionGroup>> getAllVertexConsumedPartitions() { + return vertexConsumedPartitions; + } Review comment: Same for this method. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ########## @@ -668,6 +678,29 @@ public int getTotalNumberOfVertices() { }; } + public EdgeManager getEdgeManager() { + return edgeManager; + } + + public void registerExecutionVertex(ExecutionVertexID id, ExecutionVertex vertex) { + executionVerticesById.put(id, vertex); + } + + public void registerResultPartition( + IntermediateResultPartitionID id, IntermediateResultPartition partition) { + + resultPartitionsById.put(id, partition); + } + + public ExecutionVertex getExecutionVertexOrThrow(ExecutionVertexID id) { + return checkNotNull(executionVerticesById.get(id)); + } + + public IntermediateResultPartition getResultPartitionOrThrow( + final IntermediateResultPartitionID id) { + return checkNotNull(resultPartitionsById.get(id)); + } Review comment: Same here, can these methods be package private? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** Utilities for building {@link EdgeManager}. */ +public class EdgeManagerBuildUtil { + + public static void connectVertexToResult( + ExecutionJobVertex vertex, + IntermediateResult ires, + int inputNumber, + DistributionPattern distributionPattern) { + + switch (distributionPattern) { + case POINTWISE: + connectPointwise(vertex.getTaskVertices(), ires, inputNumber); + break; + case ALL_TO_ALL: + connectAllToAll(vertex.getTaskVertices(), ires, inputNumber); + break; + default: + throw new RuntimeException("Unrecognized distribution pattern."); + } + } + + private static void connectAllToAll( + ExecutionVertex[] taskVertices, IntermediateResult ires, int inputNumber) { + + ConsumedPartitionGroup consumedPartitions = + new ConsumedPartitionGroup( + Arrays.stream(ires.getPartitions()) + .map(IntermediateResultPartition::getPartitionId) + .collect(Collectors.toList())); Review comment: We could have two factory methods for the `ConsumedPartitionGroup`: `forMultiplePartitions(Collection)` and `forSinglePartition(Id)`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** Utilities for building {@link EdgeManager}. */ +public class EdgeManagerBuildUtil { + + public static void connectVertexToResult( + ExecutionJobVertex vertex, + IntermediateResult ires, + int inputNumber, + DistributionPattern distributionPattern) { + + switch (distributionPattern) { + case POINTWISE: + connectPointwise(vertex.getTaskVertices(), ires, inputNumber); + break; + case ALL_TO_ALL: + connectAllToAll(vertex.getTaskVertices(), ires, inputNumber); + break; + default: + throw new RuntimeException("Unrecognized distribution pattern."); + } + } + + private static void connectAllToAll( + ExecutionVertex[] taskVertices, IntermediateResult ires, int inputNumber) { + + ConsumedPartitionGroup consumedPartitions = + new ConsumedPartitionGroup( + Arrays.stream(ires.getPartitions()) + .map(IntermediateResultPartition::getPartitionId) + .collect(Collectors.toList())); + for (ExecutionVertex ev : taskVertices) { + ev.addConsumedPartitions(consumedPartitions, inputNumber); + } + + ConsumerVertexGroup vertices = + new ConsumerVertexGroup( + Arrays.stream(taskVertices) + .map(ExecutionVertex::getID) + .collect(Collectors.toList())); + for (IntermediateResultPartition partition : ires.getPartitions()) { + partition.addConsumers(vertices); + } + } + + private static void connectPointwise( + ExecutionVertex[] taskVertices, IntermediateResult ires, int inputNumber) { + + final int sourceCount = ires.getPartitions().length; + final int targetCount = taskVertices.length; + + if (sourceCount == targetCount) { + for (int i = 0; i < sourceCount; i++) { + ExecutionVertex executionVertex = taskVertices[i]; + IntermediateResultPartition partition = ires.getPartitions()[i]; + + ConsumerVertexGroup consumerVertexGroup = + new ConsumerVertexGroup(executionVertex.getID()); + partition.addConsumers(consumerVertexGroup); + + ConsumedPartitionGroup consumedPartitionGroup = + new ConsumedPartitionGroup(partition.getPartitionId()); + executionVertex.addConsumedPartitions(consumedPartitionGroup, inputNumber); + } + } else if (sourceCount > targetCount) { + for (int index = 0; index < targetCount; index++) { + + ExecutionVertex executionVertex = taskVertices[index]; + ConsumerVertexGroup consumerVertexGroup = + new ConsumerVertexGroup(executionVertex.getID()); + + int start = index * sourceCount / targetCount; + int end = (index + 1) * sourceCount / targetCount; + + List<IntermediateResultPartitionID> consumedPartitions = + new ArrayList<>(end - start); + + for (int i = start; i < end; i++) { + IntermediateResultPartition partition = ires.getPartitions()[i]; + partition.addConsumers(consumerVertexGroup); + + consumedPartitions.add(partition.getPartitionId()); + } + + ConsumedPartitionGroup consumedPartitionGroup = + new ConsumedPartitionGroup(consumedPartitions); + executionVertex.addConsumedPartitions(consumedPartitionGroup, inputNumber); + } + } else { + for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) { + + IntermediateResultPartition partition = ires.getPartitions()[partitionNum]; + ConsumedPartitionGroup consumerPartitionGroup = + new ConsumedPartitionGroup(partition.getPartitionId()); + + float factor = ((float) targetCount) / sourceCount; + int start = (int) (Math.ceil(partitionNum * factor)); + int end = (int) (Math.ceil((partitionNum + 1) * factor)); Review comment: Is there a reason why this calculation is different from the previous case? Why don't we use the same integer based formulas? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import java.util.Collections; +import java.util.List; + +/** Group of consumer {@link ExecutionVertexID}s. */ +public class ConsumerVertexGroup { + private final List<ExecutionVertexID> vertices; + + public ConsumerVertexGroup(List<ExecutionVertexID> vertices) { Review comment: Same here with `List<ExecutionVertexID>` vs. `Collection<ExecutionVertexID>`. If not needed then it is better to use the more general interface. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** Utilities for building {@link EdgeManager}. */ +public class EdgeManagerBuildUtil { + + public static void connectVertexToResult( + ExecutionJobVertex vertex, + IntermediateResult ires, + int inputNumber, + DistributionPattern distributionPattern) { + + switch (distributionPattern) { + case POINTWISE: + connectPointwise(vertex.getTaskVertices(), ires, inputNumber); + break; + case ALL_TO_ALL: + connectAllToAll(vertex.getTaskVertices(), ires, inputNumber); + break; + default: + throw new RuntimeException("Unrecognized distribution pattern."); + } + } + + private static void connectAllToAll( + ExecutionVertex[] taskVertices, IntermediateResult ires, int inputNumber) { + + ConsumedPartitionGroup consumedPartitions = + new ConsumedPartitionGroup( + Arrays.stream(ires.getPartitions()) + .map(IntermediateResultPartition::getPartitionId) + .collect(Collectors.toList())); + for (ExecutionVertex ev : taskVertices) { + ev.addConsumedPartitions(consumedPartitions, inputNumber); + } + + ConsumerVertexGroup vertices = + new ConsumerVertexGroup( Review comment: Same for the `ConsumerVertexGroup` and the fatory methods. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** Utilities for building {@link EdgeManager}. */ +public class EdgeManagerBuildUtil { + + public static void connectVertexToResult( Review comment: Some JavaDocs could be helpful. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ########## @@ -454,12 +455,7 @@ public void connectToPredecessors( this.inputs.add(ires); - int consumerIndex = ires.registerConsumer(); - - for (int i = 0; i < parallelism; i++) { - ExecutionVertex ev = taskVertices[i]; - ev.connectSource(num, ires, edge, consumerIndex); - } + connectVertexToResult(this, ires, num, edge.getDistributionPattern()); Review comment: Personally I like to see where static methods come from. Hence, I think that `EdgeManagerBuildUtil.connectVertexToResult` is better. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ########## @@ -668,6 +678,29 @@ public int getTotalNumberOfVertices() { }; } + public EdgeManager getEdgeManager() { Review comment: Can this be package private? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ########## @@ -503,16 +411,15 @@ public void connectSource( new HashSet<>(getTotalNumberOfParallelSubtasks()); // go over all inputs - for (int i = 0; i < inputEdges.length; i++) { + for (ConsumedPartitionGroup sources : allConsumedPartitions) { inputLocations.clear(); - ExecutionEdge[] sources = inputEdges[i]; if (sources != null) { // go over all input sources Review comment: Comments seem to be outdated. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ########## @@ -321,107 +323,11 @@ public ExecutionGraph getExecutionGraph() { // Graph building // -------------------------------------------------------------------------------------------- - public void connectSource( - int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) { - - final DistributionPattern pattern = edge.getDistributionPattern(); - final IntermediateResultPartition[] sourcePartitions = source.getPartitions(); - - ExecutionEdge[] edges; - - switch (pattern) { - case POINTWISE: - edges = connectPointwise(sourcePartitions, inputNumber); - break; - - case ALL_TO_ALL: - edges = connectAllToAll(sourcePartitions, inputNumber); - break; - - default: - throw new RuntimeException("Unrecognized distribution pattern."); - } - - inputEdges[inputNumber] = edges; - - // add the consumers to the source - // for now (until the receiver initiated handshake is in place), we need to register the - // edges as the execution graph - for (ExecutionEdge ee : edges) { - ee.getSource().addConsumer(ee, consumerNumber); - } - } - - private ExecutionEdge[] connectAllToAll( - IntermediateResultPartition[] sourcePartitions, int inputNumber) { - ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length]; - - for (int i = 0; i < sourcePartitions.length; i++) { - IntermediateResultPartition irp = sourcePartitions[i]; - edges[i] = new ExecutionEdge(irp, this, inputNumber); - } - - return edges; - } - - private ExecutionEdge[] connectPointwise( - IntermediateResultPartition[] sourcePartitions, int inputNumber) { - final int numSources = sourcePartitions.length; - final int parallelism = getTotalNumberOfParallelSubtasks(); - - // simple case same number of sources as targets - if (numSources == parallelism) { - return new ExecutionEdge[] { - new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) - }; - } else if (numSources < parallelism) { - - int sourcePartition; - - // check if the pattern is regular or irregular - // we use int arithmetics for regular, and floating point with rounding for irregular - if (parallelism % numSources == 0) { - // same number of targets per source - int factor = parallelism / numSources; - sourcePartition = subTaskIndex / factor; - } else { - // different number of targets per source - float factor = ((float) parallelism) / numSources; - sourcePartition = (int) (subTaskIndex / factor); - } - - return new ExecutionEdge[] { - new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) - }; - } else { - if (numSources % parallelism == 0) { - // same number of targets per source - int factor = numSources / parallelism; - int startIndex = subTaskIndex * factor; - - ExecutionEdge[] edges = new ExecutionEdge[factor]; - for (int i = 0; i < factor; i++) { - edges[i] = - new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber); - } - return edges; - } else { - float factor = ((float) numSources) / parallelism; + public void addConsumedPartitions(ConsumedPartitionGroup consumedPartitions, int inputNum) { - int start = (int) (subTaskIndex * factor); - int end = - (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) - ? sourcePartitions.length - : (int) ((subTaskIndex + 1) * factor); Review comment: Same here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ########## @@ -503,16 +411,15 @@ public void connectSource( new HashSet<>(getTotalNumberOfParallelSubtasks()); // go over all inputs - for (int i = 0; i < inputEdges.length; i++) { + for (ConsumedPartitionGroup sources : allConsumedPartitions) { inputLocations.clear(); - ExecutionEdge[] sources = inputEdges[i]; if (sources != null) { // go over all input sources - for (int k = 0; k < sources.length; k++) { Review comment: Is `sources` the right variable name here? What about `consumedPartitionGroup`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** Utilities for building {@link EdgeManager}. */ +public class EdgeManagerBuildUtil { + + public static void connectVertexToResult( + ExecutionJobVertex vertex, + IntermediateResult ires, + int inputNumber, + DistributionPattern distributionPattern) { + + switch (distributionPattern) { + case POINTWISE: + connectPointwise(vertex.getTaskVertices(), ires, inputNumber); + break; + case ALL_TO_ALL: + connectAllToAll(vertex.getTaskVertices(), ires, inputNumber); + break; + default: + throw new RuntimeException("Unrecognized distribution pattern."); Review comment: `IllegalArgumentException` would be better here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** Utilities for building {@link EdgeManager}. */ +public class EdgeManagerBuildUtil { + + public static void connectVertexToResult( Review comment: Do we have some tests for this method? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Class that manages all the connections between tasks. */ +public class EdgeManager { + + private final Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>> partitionConsumers = + new HashMap<>(); + + private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>> vertexConsumedPartitions = + new HashMap<>(); + + public void addPartitionConsumers( + IntermediateResultPartitionID resultPartitionId, ConsumerVertexGroup consumerVertices) { + + checkState(!partitionConsumers.containsKey(resultPartitionId)); + + final List<ConsumerVertexGroup> consumers = getPartitionConsumers(resultPartitionId); + + // sanity check + checkState( + consumers.size() == 0, Review comment: `consumers.isEmpty()` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Class that manages all the connections between tasks. */ +public class EdgeManager { Review comment: Can this be package private? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.Collections; +import java.util.List; + +/** Group of consumed {@link IntermediateResultPartitionID}s. */ +public class ConsumedPartitionGroup { + private final List<IntermediateResultPartitionID> resultPartitions; + + public ConsumedPartitionGroup(List<IntermediateResultPartitionID> resultPartitions) { + this.resultPartitions = resultPartitions; + } + + public ConsumedPartitionGroup(IntermediateResultPartitionID resultPartition) { + this(Collections.singletonList(resultPartition)); + } + + public List<IntermediateResultPartitionID> getResultPartitions() { + return Collections.unmodifiableList(resultPartitions); + } Review comment: We could hide the implementation detail by letting `ConsumedPartitionGroup` implement the methods we need it to have to directly work with it. For example if it implements `size()` and `Iterable`, then it should already go a far way. Maybe we also need `get(int index)`. The same applies to the `ConsumerVertexGroup`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Class that manages all the connections between tasks. */ +public class EdgeManager { + + private final Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>> partitionConsumers = + new HashMap<>(); + + private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>> vertexConsumedPartitions = + new HashMap<>(); + + public void addPartitionConsumers( + IntermediateResultPartitionID resultPartitionId, ConsumerVertexGroup consumerVertices) { + + checkState(!partitionConsumers.containsKey(resultPartitionId)); + + final List<ConsumerVertexGroup> consumers = getPartitionConsumers(resultPartitionId); + + // sanity check + checkState( + consumers.size() == 0, + "Currently there has to be exactly one consumer in real jobs"); + + consumers.add(consumerVertices); + } + + public void addVertexConsumedPartitions( + ExecutionVertexID executionVertexId, + ConsumedPartitionGroup partitions, + int inputNumber) { + + final List<ConsumedPartitionGroup> consumedPartitions = + getVertexConsumedPartitions(executionVertexId); + + // sanity check + checkState(consumedPartitions.size() == inputNumber); Review comment: Alternatively we could change the API so that one needs to add all `Collection<ConsumedPartitionGroup>` when adding an `ExecutionVertexID`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
