tillrohrmann commented on a change in pull request #14868:
URL: https://github.com/apache/flink/pull/14868#discussion_r585674612
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
##########
@@ -493,26 +399,27 @@ public void connectSource(
* input-based preference.
*/
public Collection<CompletableFuture<TaskManagerLocation>>
getPreferredLocationsBasedOnInputs() {
+ final List<ConsumedPartitionGroup> allConsumedPartitions =
getAllConsumedPartitions();
+
// otherwise, base the preferred locations on the input connections
- if (inputEdges == null) {
+ if (allConsumedPartitions == null) {
Review comment:
Can `allConsumesPartitions` ever be `null`? If so, then I would suggest
to let `getAllConsumedPartitions` rather return `Collections.emptyList()`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utilities for building {@link EdgeManager}. */
+public class EdgeManagerBuildUtil {
+
+ /**
+ * Calculate the connections between {@link ExecutionJobVertex} and {@link
IntermediateResult}
+ * based on the {@link DistributionPattern}. The connection information is
stored in the {@link
+ * EdgeManager}.
Review comment:
When looking at the code, then it looks as if no `EdgeManager` is
directly used for storing the results. I guess it is an implementation detail
that the `EdgeManager` is used internally. Hence, I would suggest to remove
this detail from the JavaDocs. Moreover, proper JavaDocs should also explain
the parameters.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
##########
@@ -493,26 +399,27 @@ public void connectSource(
* input-based preference.
*/
public Collection<CompletableFuture<TaskManagerLocation>>
getPreferredLocationsBasedOnInputs() {
+ final List<ConsumedPartitionGroup> allConsumedPartitions =
getAllConsumedPartitions();
+
// otherwise, base the preferred locations on the input connections
- if (inputEdges == null) {
+ if (allConsumedPartitions == null) {
return Collections.emptySet();
} else {
Set<CompletableFuture<TaskManagerLocation>> locations =
new HashSet<>(getTotalNumberOfParallelSubtasks());
Set<CompletableFuture<TaskManagerLocation>> inputLocations =
new HashSet<>(getTotalNumberOfParallelSubtasks());
- // go over all inputs
- for (int i = 0; i < inputEdges.length; i++) {
+ // go over all ConsumedPartitionGroups
+ for (ConsumedPartitionGroup consumedPartitionGroup :
allConsumedPartitions) {
inputLocations.clear();
- ExecutionEdge[] sources = inputEdges[i];
- if (sources != null) {
- // go over all input sources
- for (int k = 0; k < sources.length; k++) {
+ if (consumedPartitionGroup != null) {
Review comment:
Why should the `consumedPartitionGroup` not be not null?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Class that manages all the connections between tasks. */
+public class EdgeManager {
+
+ private final Map<IntermediateResultPartitionID,
List<ConsumerVertexGroup>> partitionConsumers =
+ new HashMap<>();
+
+ private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>>
vertexConsumedPartitions =
+ new HashMap<>();
+
+ public void addPartitionConsumers(
Review comment:
Maybe `connectPartitionWithConsumerVertexGroup` is a slightly better
name.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utilities for building {@link EdgeManager}. */
+public class EdgeManagerBuildUtil {
+
+ /**
+ * Calculate the connections between {@link ExecutionJobVertex} and {@link
IntermediateResult}
+ * based on the {@link DistributionPattern}. The connection information is
stored in the {@link
+ * EdgeManager}.
+ */
+ public static void connectVertexToResult(
+ ExecutionJobVertex vertex,
+ IntermediateResult ires,
Review comment:
```suggestion
IntermediateResult intermediateResult,
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -93,21 +89,12 @@ void resetForNewExecution() {
hasDataProduced = false;
}
- int addConsumerGroup() {
- int pos = consumers.size();
-
- // NOTE: currently we support only one consumer per result!!!
- if (pos != 0) {
- throw new RuntimeException(
- "Currently, each intermediate result can only have one
consumer.");
- }
-
- consumers.add(new ArrayList<ExecutionEdge>());
- return pos;
+ public void addConsumers(ConsumerVertexGroup consumers) {
+ getEdgeManager().addPartitionConsumers(partitionId, consumers);
}
- void addConsumer(ExecutionEdge edge, int consumerNumber) {
- consumers.get(consumerNumber).add(edge);
+ private EdgeManager getEdgeManager() {
+ return producer.getExecutionGraph().getEdgeManager();
}
Review comment:
I think this shows that we are overly coupling the
`IntermediateResultPartition` with the `ExecutionGraph`. Couldn't we give an
`EdgeManager` to the `IntermediateResultPartition` when we create it? This
makes the dependency explicit.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/** Group of consumed {@link IntermediateResultPartitionID}s. */
+public class ConsumedPartitionGroup implements
Iterable<IntermediateResultPartitionID> {
+ private final List<IntermediateResultPartitionID> resultPartitions;
+
+ private ConsumedPartitionGroup(List<IntermediateResultPartitionID>
resultPartitions) {
+ this.resultPartitions = resultPartitions;
+ }
+
+ public static ConsumedPartitionGroup fromMultiplePartitions(
+ List<IntermediateResultPartitionID> resultPartitions) {
+ return new ConsumedPartitionGroup(resultPartitions);
+ }
+
+ public static ConsumedPartitionGroup fromSinglePartition(
+ IntermediateResultPartitionID resultPartition) {
+ return new
ConsumedPartitionGroup(Collections.singletonList(resultPartition));
+ }
+
+ @Override
+ public Iterator<IntermediateResultPartitionID> iterator() {
+ return resultPartitions.iterator();
+ }
+
+ public int size() {
+ return resultPartitions.size();
+ }
+
+ public boolean isEmpty() {
+ return resultPartitions.isEmpty();
+ }
+
+ public IntermediateResultPartitionID getFirst() {
+ return iterator().next();
+ }
Review comment:
This method is only used in tests. Hence, we might make this method
package private and add `@VisibleForTesting`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utilities for building {@link EdgeManager}. */
+public class EdgeManagerBuildUtil {
+
+ public static void connectVertexToResult(
+ ExecutionJobVertex vertex,
+ IntermediateResult ires,
+ int inputNumber,
+ DistributionPattern distributionPattern) {
+
+ switch (distributionPattern) {
+ case POINTWISE:
+ connectPointwise(vertex.getTaskVertices(), ires, inputNumber);
+ break;
+ case ALL_TO_ALL:
+ connectAllToAll(vertex.getTaskVertices(), ires, inputNumber);
+ break;
+ default:
+ throw new RuntimeException("Unrecognized distribution
pattern.");
+ }
+ }
+
+ private static void connectAllToAll(
+ ExecutionVertex[] taskVertices, IntermediateResult ires, int
inputNumber) {
+
+ ConsumedPartitionGroup consumedPartitions =
+ new ConsumedPartitionGroup(
+ Arrays.stream(ires.getPartitions())
+
.map(IntermediateResultPartition::getPartitionId)
+ .collect(Collectors.toList()));
+ for (ExecutionVertex ev : taskVertices) {
+ ev.addConsumedPartitions(consumedPartitions, inputNumber);
+ }
+
+ ConsumerVertexGroup vertices =
+ new ConsumerVertexGroup(
+ Arrays.stream(taskVertices)
+ .map(ExecutionVertex::getID)
+ .collect(Collectors.toList()));
+ for (IntermediateResultPartition partition : ires.getPartitions()) {
+ partition.addConsumers(vertices);
+ }
+ }
+
+ private static void connectPointwise(
+ ExecutionVertex[] taskVertices, IntermediateResult ires, int
inputNumber) {
+
+ final int sourceCount = ires.getPartitions().length;
+ final int targetCount = taskVertices.length;
+
+ if (sourceCount == targetCount) {
+ for (int i = 0; i < sourceCount; i++) {
+ ExecutionVertex executionVertex = taskVertices[i];
+ IntermediateResultPartition partition =
ires.getPartitions()[i];
+
+ ConsumerVertexGroup consumerVertexGroup =
+ new ConsumerVertexGroup(executionVertex.getID());
+ partition.addConsumers(consumerVertexGroup);
+
+ ConsumedPartitionGroup consumedPartitionGroup =
+ new ConsumedPartitionGroup(partition.getPartitionId());
+ executionVertex.addConsumedPartitions(consumedPartitionGroup,
inputNumber);
+ }
+ } else if (sourceCount > targetCount) {
+ for (int index = 0; index < targetCount; index++) {
+
+ ExecutionVertex executionVertex = taskVertices[index];
+ ConsumerVertexGroup consumerVertexGroup =
+ new ConsumerVertexGroup(executionVertex.getID());
+
+ int start = index * sourceCount / targetCount;
+ int end = (index + 1) * sourceCount / targetCount;
+
+ List<IntermediateResultPartitionID> consumedPartitions =
+ new ArrayList<>(end - start);
+
+ for (int i = start; i < end; i++) {
+ IntermediateResultPartition partition =
ires.getPartitions()[i];
+ partition.addConsumers(consumerVertexGroup);
+
+ consumedPartitions.add(partition.getPartitionId());
+ }
+
+ ConsumedPartitionGroup consumedPartitionGroup =
+ new ConsumedPartitionGroup(consumedPartitions);
+ executionVertex.addConsumedPartitions(consumedPartitionGroup,
inputNumber);
+ }
+ } else {
+ for (int partitionNum = 0; partitionNum < sourceCount;
partitionNum++) {
+
+ IntermediateResultPartition partition =
ires.getPartitions()[partitionNum];
+ ConsumedPartitionGroup consumerPartitionGroup =
+ new ConsumedPartitionGroup(partition.getPartitionId());
+
+ float factor = ((float) targetCount) / sourceCount;
+ int start = (int) (Math.ceil(partitionNum * factor));
+ int end = (int) (Math.ceil((partitionNum + 1) * factor));
Review comment:
Why is it important that a) uses `Math.floor` and b) `Math.ceil`?
Shouldn't this be irrelevant for the overall correctness?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
##########
@@ -202,19 +199,24 @@ public ExecutionVertexID getID() {
}
public int getNumberOfInputs() {
- return this.inputEdges.length;
+ return getAllConsumedPartitions().size();
+ }
+
+ public List<ConsumedPartitionGroup> getAllConsumedPartitions() {
Review comment:
nit: Should we rather call these methods
`getAllConsumedPartitionGroups`? Same with `getConsumedPartitionGroup` instead
of `getConsumesPartitions`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Class that manages all the connections between tasks. */
+public class EdgeManager {
+
+ private final Map<IntermediateResultPartitionID,
List<ConsumerVertexGroup>> partitionConsumers =
+ new HashMap<>();
+
+ private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>>
vertexConsumedPartitions =
+ new HashMap<>();
+
+ public void addPartitionConsumers(
+ IntermediateResultPartitionID resultPartitionId,
ConsumerVertexGroup consumerVertices) {
+
+ checkState(!partitionConsumers.containsKey(resultPartitionId));
+
+ final List<ConsumerVertexGroup> consumers =
+ getPartitionConsumersInternal(resultPartitionId);
+
+ // sanity check
+ checkState(
+ consumers.isEmpty(), "Currently there has to be exactly one
consumer in real jobs");
+
+ consumers.add(consumerVertices);
+ }
+
+ public void addVertexConsumedPartitions(
+ ExecutionVertexID executionVertexId, ConsumedPartitionGroup
partitions) {
Review comment:
```suggestion
ExecutionVertexID executionVertexId, ConsumedPartitionGroup
consumedPartitionGroup) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Class that manages all the connections between tasks. */
+public class EdgeManager {
+
+ private final Map<IntermediateResultPartitionID,
List<ConsumerVertexGroup>> partitionConsumers =
+ new HashMap<>();
+
+ private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>>
vertexConsumedPartitions =
+ new HashMap<>();
+
+ public void addPartitionConsumers(
+ IntermediateResultPartitionID resultPartitionId,
ConsumerVertexGroup consumerVertices) {
+
+ checkState(!partitionConsumers.containsKey(resultPartitionId));
+
+ final List<ConsumerVertexGroup> consumers =
+ getPartitionConsumersInternal(resultPartitionId);
+
+ // sanity check
+ checkState(
+ consumers.isEmpty(), "Currently there has to be exactly one
consumer in real jobs");
+
+ consumers.add(consumerVertices);
+ }
+
+ public void addVertexConsumedPartitions(
+ ExecutionVertexID executionVertexId, ConsumedPartitionGroup
partitions) {
+
+ final List<ConsumedPartitionGroup> consumedPartitions =
+ getVertexConsumedPartitionsInternal(executionVertexId);
+
+ consumedPartitions.add(partitions);
+ }
+
+ private List<ConsumerVertexGroup> getPartitionConsumersInternal(
+ IntermediateResultPartitionID resultPartitionId) {
+ return partitionConsumers.computeIfAbsent(resultPartitionId, id -> new
ArrayList<>());
+ }
+
+ private List<ConsumedPartitionGroup> getVertexConsumedPartitionsInternal(
+ ExecutionVertexID executionVertexId) {
+ return vertexConsumedPartitions.computeIfAbsent(executionVertexId, id
-> new ArrayList<>());
+ }
+
+ public List<ConsumerVertexGroup> getPartitionConsumers(
+ IntermediateResultPartitionID resultPartitionId) {
+ return
Collections.unmodifiableList(getPartitionConsumersInternal(resultPartitionId));
+ }
+
+ public List<ConsumedPartitionGroup> getVertexConsumedPartitions(
Review comment:
```suggestion
public List<ConsumedPartitionGroup> getConsumedPartitionGroupsForVertex(
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Class that manages all the connections between tasks. */
+public class EdgeManager {
+
+ private final Map<IntermediateResultPartitionID,
List<ConsumerVertexGroup>> partitionConsumers =
+ new HashMap<>();
+
+ private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>>
vertexConsumedPartitions =
+ new HashMap<>();
+
+ public void addPartitionConsumers(
+ IntermediateResultPartitionID resultPartitionId,
ConsumerVertexGroup consumerVertices) {
+
+ checkState(!partitionConsumers.containsKey(resultPartitionId));
+
+ final List<ConsumerVertexGroup> consumers =
+ getPartitionConsumersInternal(resultPartitionId);
+
+ // sanity check
+ checkState(
+ consumers.isEmpty(), "Currently there has to be exactly one
consumer in real jobs");
+
+ consumers.add(consumerVertices);
+ }
+
+ public void addVertexConsumedPartitions(
Review comment:
Maybe `connectVertexWithConsumedPartitionGroup` is a slightly better
name.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Class that manages all the connections between tasks. */
+public class EdgeManager {
+
+ private final Map<IntermediateResultPartitionID,
List<ConsumerVertexGroup>> partitionConsumers =
+ new HashMap<>();
+
+ private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>>
vertexConsumedPartitions =
+ new HashMap<>();
+
+ public void addPartitionConsumers(
+ IntermediateResultPartitionID resultPartitionId,
ConsumerVertexGroup consumerVertices) {
Review comment:
```suggestion
IntermediateResultPartitionID resultPartitionId,
ConsumerVertexGroup consumerVertexGroup) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Class that manages all the connections between tasks. */
+public class EdgeManager {
+
+ private final Map<IntermediateResultPartitionID,
List<ConsumerVertexGroup>> partitionConsumers =
+ new HashMap<>();
+
+ private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>>
vertexConsumedPartitions =
+ new HashMap<>();
+
+ public void addPartitionConsumers(
+ IntermediateResultPartitionID resultPartitionId,
ConsumerVertexGroup consumerVertices) {
+
+ checkState(!partitionConsumers.containsKey(resultPartitionId));
+
+ final List<ConsumerVertexGroup> consumers =
+ getPartitionConsumersInternal(resultPartitionId);
+
+ // sanity check
+ checkState(
+ consumers.isEmpty(), "Currently there has to be exactly one
consumer in real jobs");
+
+ consumers.add(consumerVertices);
+ }
+
+ public void addVertexConsumedPartitions(
+ ExecutionVertexID executionVertexId, ConsumedPartitionGroup
partitions) {
+
+ final List<ConsumedPartitionGroup> consumedPartitions =
+ getVertexConsumedPartitionsInternal(executionVertexId);
+
+ consumedPartitions.add(partitions);
+ }
+
+ private List<ConsumerVertexGroup> getPartitionConsumersInternal(
+ IntermediateResultPartitionID resultPartitionId) {
+ return partitionConsumers.computeIfAbsent(resultPartitionId, id -> new
ArrayList<>());
+ }
+
+ private List<ConsumedPartitionGroup> getVertexConsumedPartitionsInternal(
+ ExecutionVertexID executionVertexId) {
+ return vertexConsumedPartitions.computeIfAbsent(executionVertexId, id
-> new ArrayList<>());
+ }
+
+ public List<ConsumerVertexGroup> getPartitionConsumers(
Review comment:
```suggestion
public List<ConsumerVertexGroup> getConsumerVertexGroupsForPartition(
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utilities for building {@link EdgeManager}. */
+public class EdgeManagerBuildUtil {
+
+ /**
+ * Calculate the connections between {@link ExecutionJobVertex} and {@link
IntermediateResult}
+ * based on the {@link DistributionPattern}. The connection information is
stored in the {@link
+ * EdgeManager}.
+ */
+ public static void connectVertexToResult(
+ ExecutionJobVertex vertex,
+ IntermediateResult ires,
+ DistributionPattern distributionPattern) {
+
+ switch (distributionPattern) {
+ case POINTWISE:
+ connectPointwise(vertex.getTaskVertices(), ires);
+ break;
+ case ALL_TO_ALL:
+ connectAllToAll(vertex.getTaskVertices(), ires);
+ break;
+ default:
+ throw new IllegalArgumentException("Unrecognized distribution
pattern.");
+ }
+ }
+
+ private static void connectAllToAll(ExecutionVertex[] taskVertices,
IntermediateResult ires) {
+
+ ConsumedPartitionGroup consumedPartitions =
+ ConsumedPartitionGroup.fromMultiplePartitions(
+ Arrays.stream(ires.getPartitions())
+
.map(IntermediateResultPartition::getPartitionId)
+ .collect(Collectors.toList()));
+ for (ExecutionVertex ev : taskVertices) {
+ ev.addConsumedPartitions(consumedPartitions);
+ }
+
+ ConsumerVertexGroup vertices =
+ ConsumerVertexGroup.fromMultipleVertices(
+ Arrays.stream(taskVertices)
+ .map(ExecutionVertex::getID)
+ .collect(Collectors.toList()));
+ for (IntermediateResultPartition partition : ires.getPartitions()) {
+ partition.addConsumers(vertices);
+ }
+ }
+
+ private static void connectPointwise(ExecutionVertex[] taskVertices,
IntermediateResult ires) {
Review comment:
```suggestion
private static void connectPointwise(ExecutionVertex[]
executionVertices, IntermediateResult intermediateResult) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -30,30 +30,26 @@
private final ExecutionVertex producer;
- private final int partitionNumber;
-
private final IntermediateResultPartitionID partitionId;
- private List<List<ExecutionEdge>> consumers;
-
/** Whether this partition has produced some data. */
private boolean hasDataProduced = false;
public IntermediateResultPartition(
IntermediateResult totalResult, ExecutionVertex producer, int
partitionNumber) {
this.totalResult = totalResult;
this.producer = producer;
- this.partitionNumber = partitionNumber;
- this.consumers = new ArrayList<List<ExecutionEdge>>(0);
this.partitionId = new
IntermediateResultPartitionID(totalResult.getId(), partitionNumber);
+
+ producer.getExecutionGraph().registerResultPartition(partitionId,
this);
Review comment:
I am not a huge fan of coupling components by these kind of constructs.
Couldn't we register the `IntermediateResultPartition` where it is created
(e.g. in the `ExecutionVertex`)?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utilities for building {@link EdgeManager}. */
+public class EdgeManagerBuildUtil {
+
+ /**
+ * Calculate the connections between {@link ExecutionJobVertex} and {@link
IntermediateResult}
+ * based on the {@link DistributionPattern}. The connection information is
stored in the {@link
+ * EdgeManager}.
+ */
+ public static void connectVertexToResult(
+ ExecutionJobVertex vertex,
+ IntermediateResult ires,
+ DistributionPattern distributionPattern) {
+
+ switch (distributionPattern) {
+ case POINTWISE:
+ connectPointwise(vertex.getTaskVertices(), ires);
+ break;
+ case ALL_TO_ALL:
+ connectAllToAll(vertex.getTaskVertices(), ires);
+ break;
+ default:
+ throw new IllegalArgumentException("Unrecognized distribution
pattern.");
+ }
+ }
+
+ private static void connectAllToAll(ExecutionVertex[] taskVertices,
IntermediateResult ires) {
Review comment:
```suggestion
private static void connectAllToAll(ExecutionVertex[] executionVertices,
IntermediateResult intermediateResult) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
##########
@@ -321,107 +323,11 @@ public ExecutionGraph getExecutionGraph() {
// Graph building
//
--------------------------------------------------------------------------------------------
- public void connectSource(
- int inputNumber, IntermediateResult source, JobEdge edge, int
consumerNumber) {
-
- final DistributionPattern pattern = edge.getDistributionPattern();
- final IntermediateResultPartition[] sourcePartitions =
source.getPartitions();
-
- ExecutionEdge[] edges;
-
- switch (pattern) {
- case POINTWISE:
- edges = connectPointwise(sourcePartitions, inputNumber);
- break;
-
- case ALL_TO_ALL:
- edges = connectAllToAll(sourcePartitions, inputNumber);
- break;
-
- default:
- throw new RuntimeException("Unrecognized distribution
pattern.");
- }
-
- inputEdges[inputNumber] = edges;
-
- // add the consumers to the source
- // for now (until the receiver initiated handshake is in place), we
need to register the
- // edges as the execution graph
- for (ExecutionEdge ee : edges) {
- ee.getSource().addConsumer(ee, consumerNumber);
- }
- }
-
- private ExecutionEdge[] connectAllToAll(
- IntermediateResultPartition[] sourcePartitions, int inputNumber) {
- ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];
-
- for (int i = 0; i < sourcePartitions.length; i++) {
- IntermediateResultPartition irp = sourcePartitions[i];
- edges[i] = new ExecutionEdge(irp, this, inputNumber);
- }
-
- return edges;
- }
-
- private ExecutionEdge[] connectPointwise(
- IntermediateResultPartition[] sourcePartitions, int inputNumber) {
- final int numSources = sourcePartitions.length;
- final int parallelism = getTotalNumberOfParallelSubtasks();
-
- // simple case same number of sources as targets
- if (numSources == parallelism) {
- return new ExecutionEdge[] {
- new ExecutionEdge(sourcePartitions[subTaskIndex], this,
inputNumber)
- };
- } else if (numSources < parallelism) {
-
- int sourcePartition;
-
- // check if the pattern is regular or irregular
- // we use int arithmetics for regular, and floating point with
rounding for irregular
- if (parallelism % numSources == 0) {
- // same number of targets per source
- int factor = parallelism / numSources;
- sourcePartition = subTaskIndex / factor;
- } else {
- // different number of targets per source
- float factor = ((float) parallelism) / numSources;
- sourcePartition = (int) (subTaskIndex / factor);
- }
-
- return new ExecutionEdge[] {
- new ExecutionEdge(sourcePartitions[sourcePartition], this,
inputNumber)
- };
- } else {
- if (numSources % parallelism == 0) {
- // same number of targets per source
- int factor = numSources / parallelism;
- int startIndex = subTaskIndex * factor;
-
- ExecutionEdge[] edges = new ExecutionEdge[factor];
- for (int i = 0; i < factor; i++) {
- edges[i] =
- new ExecutionEdge(sourcePartitions[startIndex +
i], this, inputNumber);
- }
- return edges;
- } else {
- float factor = ((float) numSources) / parallelism;
+ public void addConsumedPartitions(ConsumedPartitionGroup
consumedPartitions) {
Review comment:
Maybe better to call this method `addConsumedPartitionGroup`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/** Group of consumer {@link ExecutionVertexID}s. */
+public class ConsumerVertexGroup implements Iterable<ExecutionVertexID> {
+ private final List<ExecutionVertexID> vertices;
+
+ public ConsumerVertexGroup(List<ExecutionVertexID> vertices) {
Review comment:
Can the constructors be private since we have the factory methods?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/** Group of consumer {@link ExecutionVertexID}s. */
+public class ConsumerVertexGroup implements Iterable<ExecutionVertexID> {
+ private final List<ExecutionVertexID> vertices;
+
+ public ConsumerVertexGroup(List<ExecutionVertexID> vertices) {
+ this.vertices = vertices;
+ }
+
+ public ConsumerVertexGroup(ExecutionVertexID vertex) {
Review comment:
Same here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]