tillrohrmann commented on a change in pull request #14868:
URL: https://github.com/apache/flink/pull/14868#discussion_r586228265
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -93,21 +89,12 @@ void resetForNewExecution() {
hasDataProduced = false;
}
- int addConsumerGroup() {
- int pos = consumers.size();
-
- // NOTE: currently we support only one consumer per result!!!
- if (pos != 0) {
- throw new RuntimeException(
- "Currently, each intermediate result can only have one
consumer.");
- }
-
- consumers.add(new ArrayList<ExecutionEdge>());
- return pos;
+ public void addConsumers(ConsumerVertexGroup consumers) {
+ getEdgeManager().addPartitionConsumers(partitionId, consumers);
}
- void addConsumer(ExecutionEdge edge, int consumerNumber) {
- consumers.get(consumerNumber).add(edge);
+ private EdgeManager getEdgeManager() {
+ return producer.getExecutionGraph().getEdgeManager();
}
Review comment:
Theoretically yes. But the `ExecutionVertex` is already coupled quite
tightly to the `ExecutionGraph`. Hence, it might not make a big difference to
not pass it in.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]