tillrohrmann commented on a change in pull request #14868:
URL: https://github.com/apache/flink/pull/14868#discussion_r588280881



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1339,6 +1367,16 @@ public void deregisterExecution(Execution exec) {
         }
     }
 
+    private void registerExecutionVerticesAndResultPartitions(
+            List<ExecutionJobVertex> executionJobVertices) {
+        for (ExecutionJobVertex executionJobVertex : executionJobVertices) {
+            for (ExecutionVertex executionVertex : 
executionJobVertex.getTaskVertices()) {
+                executionVerticesById.put(executionVertex.getID(), 
executionVertex);
+                
resultPartitionsById.putAll(executionVertex.getProducedPartitions());
+            }
+        }
+    }

Review comment:
       Nice, this is a very good solution :-)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Class that manages all the connections between tasks. */
+public class EdgeManager {
+
+    private final Map<IntermediateResultPartitionID, 
List<ConsumerVertexGroup>> partitionConsumers =
+            new HashMap<>();
+
+    private final Map<ExecutionVertexID, List<ConsumedPartitionGroup>> 
vertexConsumedPartitions =
+            new HashMap<>();
+
+    public void connectPartitionWithConsumerVertexGroup(
+            IntermediateResultPartitionID resultPartitionId,
+            ConsumerVertexGroup consumerVertexGroup) {
+
+        checkNotNull(consumerVertexGroup);
+
+        checkState(!partitionConsumers.containsKey(resultPartitionId));
+
+        final List<ConsumerVertexGroup> consumers =
+                getConsumerVertexGroupsForPartitionInternal(resultPartitionId);
+
+        // sanity check
+        checkState(
+                consumers.isEmpty(), "Currently there has to be exactly one 
consumer in real jobs");

Review comment:
       This `checkState` and the one above seem to be testing the same thing. I 
would keep only one. Ideally one with an explanation message.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
##########
@@ -712,5 +716,21 @@ public ExecutionDeploymentListener 
getExecutionDeploymentListener() {
 
         @Override
         public void notifyExecutionChange(Execution execution, ExecutionState 
newExecutionState) {}
+
+        @Override
+        public EdgeManager getEdgeManager() {
+            return null;
+        }
+
+        @Override
+        public ExecutionVertex getExecutionVertexOrThrow(ExecutionVertexID id) 
{
+            return null;

Review comment:
       Let's implement this method with `UnsupportedOperationException`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
##########
@@ -712,5 +716,21 @@ public ExecutionDeploymentListener 
getExecutionDeploymentListener() {
 
         @Override
         public void notifyExecutionChange(Execution execution, ExecutionState 
newExecutionState) {}
+
+        @Override
+        public EdgeManager getEdgeManager() {
+            return null;

Review comment:
       Let's implement this method with `UnsupportedOperationException`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
##########
@@ -712,5 +716,21 @@ public ExecutionDeploymentListener 
getExecutionDeploymentListener() {
 
         @Override
         public void notifyExecutionChange(Execution execution, ExecutionState 
newExecutionState) {}
+
+        @Override
+        public EdgeManager getEdgeManager() {
+            return null;
+        }
+
+        @Override
+        public ExecutionVertex getExecutionVertexOrThrow(ExecutionVertexID id) 
{
+            return null;
+        }
+
+        @Override
+        public IntermediateResultPartition getResultPartitionOrThrow(
+                IntermediateResultPartitionID id) {
+            return null;

Review comment:
       Let's implement this method with `UnsupportedOperationException`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utilities for building {@link EdgeManager}. */
+public class EdgeManagerBuildUtil {
+
+    public static void connectVertexToResult(
+            ExecutionJobVertex vertex,
+            IntermediateResult ires,
+            int inputNumber,
+            DistributionPattern distributionPattern) {
+
+        switch (distributionPattern) {
+            case POINTWISE:
+                connectPointwise(vertex.getTaskVertices(), ires, inputNumber);
+                break;
+            case ALL_TO_ALL:
+                connectAllToAll(vertex.getTaskVertices(), ires, inputNumber);
+                break;
+            default:
+                throw new RuntimeException("Unrecognized distribution 
pattern.");
+        }
+    }
+
+    private static void connectAllToAll(
+            ExecutionVertex[] taskVertices, IntermediateResult ires, int 
inputNumber) {
+
+        ConsumedPartitionGroup consumedPartitions =
+                new ConsumedPartitionGroup(
+                        Arrays.stream(ires.getPartitions())
+                                
.map(IntermediateResultPartition::getPartitionId)
+                                .collect(Collectors.toList()));
+        for (ExecutionVertex ev : taskVertices) {
+            ev.addConsumedPartitions(consumedPartitions, inputNumber);
+        }
+
+        ConsumerVertexGroup vertices =
+                new ConsumerVertexGroup(
+                        Arrays.stream(taskVertices)
+                                .map(ExecutionVertex::getID)
+                                .collect(Collectors.toList()));
+        for (IntermediateResultPartition partition : ires.getPartitions()) {
+            partition.addConsumers(vertices);
+        }
+    }
+
+    private static void connectPointwise(
+            ExecutionVertex[] taskVertices, IntermediateResult ires, int 
inputNumber) {
+
+        final int sourceCount = ires.getPartitions().length;
+        final int targetCount = taskVertices.length;
+
+        if (sourceCount == targetCount) {
+            for (int i = 0; i < sourceCount; i++) {
+                ExecutionVertex executionVertex = taskVertices[i];
+                IntermediateResultPartition partition = 
ires.getPartitions()[i];
+
+                ConsumerVertexGroup consumerVertexGroup =
+                        new ConsumerVertexGroup(executionVertex.getID());
+                partition.addConsumers(consumerVertexGroup);
+
+                ConsumedPartitionGroup consumedPartitionGroup =
+                        new ConsumedPartitionGroup(partition.getPartitionId());
+                executionVertex.addConsumedPartitions(consumedPartitionGroup, 
inputNumber);
+            }
+        } else if (sourceCount > targetCount) {
+            for (int index = 0; index < targetCount; index++) {
+
+                ExecutionVertex executionVertex = taskVertices[index];
+                ConsumerVertexGroup consumerVertexGroup =
+                        new ConsumerVertexGroup(executionVertex.getID());
+
+                int start = index * sourceCount / targetCount;
+                int end = (index + 1) * sourceCount / targetCount;
+
+                List<IntermediateResultPartitionID> consumedPartitions =
+                        new ArrayList<>(end - start);
+
+                for (int i = start; i < end; i++) {
+                    IntermediateResultPartition partition = 
ires.getPartitions()[i];
+                    partition.addConsumers(consumerVertexGroup);
+
+                    consumedPartitions.add(partition.getPartitionId());
+                }
+
+                ConsumedPartitionGroup consumedPartitionGroup =
+                        new ConsumedPartitionGroup(consumedPartitions);
+                executionVertex.addConsumedPartitions(consumedPartitionGroup, 
inputNumber);
+            }
+        } else {
+            for (int partitionNum = 0; partitionNum < sourceCount; 
partitionNum++) {
+
+                IntermediateResultPartition partition = 
ires.getPartitions()[partitionNum];
+                ConsumedPartitionGroup consumerPartitionGroup =
+                        new ConsumedPartitionGroup(partition.getPartitionId());
+
+                float factor = ((float) targetCount) / sourceCount;
+                int start = (int) (Math.ceil(partitionNum * factor));
+                int end = (int) (Math.ceil((partitionNum + 1) * factor));

Review comment:
       This makes sense. Thanks a lot for the clarification.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/** Group of consumed {@link IntermediateResultPartitionID}s. */
+public class ConsumedPartitionGroup implements 
Iterable<IntermediateResultPartitionID> {
+    private final List<IntermediateResultPartitionID> resultPartitions;
+
+    private ConsumedPartitionGroup(List<IntermediateResultPartitionID> 
resultPartitions) {
+        this.resultPartitions = resultPartitions;
+    }
+
+    public static ConsumedPartitionGroup fromMultiplePartitions(
+            List<IntermediateResultPartitionID> resultPartitions) {
+        return new ConsumedPartitionGroup(resultPartitions);
+    }
+
+    public static ConsumedPartitionGroup fromSinglePartition(
+            IntermediateResultPartitionID resultPartition) {
+        return new 
ConsumedPartitionGroup(Collections.singletonList(resultPartition));
+    }
+
+    @Override
+    public Iterator<IntermediateResultPartitionID> iterator() {
+        return resultPartitions.iterator();
+    }
+
+    public int size() {
+        return resultPartitions.size();
+    }
+
+    public boolean isEmpty() {
+        return resultPartitions.isEmpty();
+    }
+
+    public IntermediateResultPartitionID getFirst() {
+        return iterator().next();
+    }

Review comment:
       Alright.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to