This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a6a321777a53b8d7807078cb8d98c7b3de242fed
Author: Weijie Guo <[email protected]>
AuthorDate: Mon Oct 31 16:14:24 2022 +0800

    [FLINK-29767] VertexwiseSchedulingStrategy supports hybrid shuffle edge.
    
    This closes #21199
---
 .../strategy/VertexwiseSchedulingStrategy.java     | 89 ++++++++++++++++------
 .../strategy/VertexwiseSchedulingStrategyTest.java | 34 +++++++++
 2 files changed, 99 insertions(+), 24 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
index 74c47234a3a..6524e9fbfc0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
@@ -26,8 +26,8 @@ import org.apache.flink.util.IterableUtils;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,9 +38,8 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * {@link SchedulingStrategy} instance which schedules tasks in granularity of 
vertex (which
- * indicates this strategy only supports ALL_EDGES_BLOCKING batch jobs). Note 
that this strategy
- * implements {@link SchedulingTopologyListener}, so it can handle the updates 
of scheduling
- * topology.
+ * indicates this strategy only supports batch jobs). Note that this strategy 
implements {@link
+ * SchedulingTopologyListener}, so it can handle the updates of scheduling 
topology.
  */
 public class VertexwiseSchedulingStrategy
         implements SchedulingStrategy, SchedulingTopologyListener {
@@ -114,8 +113,6 @@ public class VertexwiseSchedulingStrategy
     }
 
     private void maybeScheduleVertices(final Set<ExecutionVertexID> vertices) {
-        final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new 
HashMap<>();
-
         Set<ExecutionVertexID> allCandidates;
         if (newVertices.isEmpty()) {
             allCandidates = vertices;
@@ -125,31 +122,75 @@ public class VertexwiseSchedulingStrategy
             newVertices.clear();
         }
 
-        final Set<ExecutionVertexID> verticesToDeploy =
-                allCandidates.stream()
-                        .filter(
-                                vertexId -> {
-                                    SchedulingExecutionVertex vertex =
-                                            
schedulingTopology.getVertex(vertexId);
-                                    checkState(vertex.getState() == 
ExecutionState.CREATED);
-                                    return 
inputConsumableDecider.isInputConsumable(
-                                            vertex, Collections.emptySet(), 
consumableStatusCache);
-                                })
-                        .collect(Collectors.toSet());
+        final Set<ExecutionVertexID> verticesToSchedule = new HashSet<>();
+
+        Set<ExecutionVertexID> nextVertices = allCandidates;
+        while (!nextVertices.isEmpty()) {
+            nextVertices = addToScheduleAndGetVertices(nextVertices, 
verticesToSchedule);
+        }
+
+        scheduleVerticesOneByOne(verticesToSchedule);
+        scheduledVertices.addAll(verticesToSchedule);
+    }
+
+    private Set<ExecutionVertexID> addToScheduleAndGetVertices(
+            Set<ExecutionVertexID> currentVertices, Set<ExecutionVertexID> 
verticesToSchedule) {
+        Set<ExecutionVertexID> nextVertices = new HashSet<>();
+        // cache consumedPartitionGroup's consumable status to avoid compute 
repeatedly.
+        final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new 
IdentityHashMap<>();
+        final Set<ConsumerVertexGroup> visitedConsumerVertexGroup =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+
+        for (ExecutionVertexID currentVertex : currentVertices) {
+            if (isVertexSchedulable(currentVertex, consumableStatusCache, 
verticesToSchedule)) {
+                verticesToSchedule.add(currentVertex);
+                Set<ConsumerVertexGroup> canBePipelinedConsumerVertexGroups =
+                        IterableUtils.toStream(
+                                        schedulingTopology
+                                                .getVertex(currentVertex)
+                                                .getProducedResults())
+                                
.map(SchedulingResultPartition::getConsumerVertexGroups)
+                                .flatMap(Collection::stream)
+                                .filter(
+                                        (consumerVertexGroup) ->
+                                                consumerVertexGroup
+                                                        
.getResultPartitionType()
+                                                        
.canBePipelinedConsumed())
+                                .collect(Collectors.toSet());
+                for (ConsumerVertexGroup consumerVertexGroup : 
canBePipelinedConsumerVertexGroups) {
+                    if 
(!visitedConsumerVertexGroup.contains(consumerVertexGroup)) {
+                        visitedConsumerVertexGroup.add(consumerVertexGroup);
+                        nextVertices.addAll(
+                                IterableUtils.toStream(consumerVertexGroup)
+                                        .collect(Collectors.toSet()));
+                    }
+                }
+            }
+        }
+        return nextVertices;
+    }
 
-        scheduleVerticesOneByOne(verticesToDeploy);
-        scheduledVertices.addAll(verticesToDeploy);
+    private boolean isVertexSchedulable(
+            final ExecutionVertexID vertex,
+            final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache,
+            final Set<ExecutionVertexID> verticesToSchedule) {
+        return !verticesToSchedule.contains(vertex)
+                && !scheduledVertices.contains(vertex)
+                && inputConsumableDecider.isInputConsumable(
+                        schedulingTopology.getVertex(vertex),
+                        verticesToSchedule,
+                        consumableStatusCache);
     }
 
-    private void scheduleVerticesOneByOne(final Set<ExecutionVertexID> 
verticesToDeploy) {
-        if (verticesToDeploy.isEmpty()) {
+    private void scheduleVerticesOneByOne(final Set<ExecutionVertexID> 
verticesToSchedule) {
+        if (verticesToSchedule.isEmpty()) {
             return;
         }
-        final List<ExecutionVertexID> sortedVerticesToDeploy =
+        final List<ExecutionVertexID> sortedVerticesToSchedule =
                 
SchedulingStrategyUtils.sortExecutionVerticesInTopologicalOrder(
-                        schedulingTopology, verticesToDeploy);
+                        schedulingTopology, verticesToSchedule);
 
-        sortedVerticesToDeploy.forEach(
+        sortedVerticesToSchedule.forEach(
                 id -> 
schedulerOperations.allocateSlotsAndDeploy(Collections.singletonList(id)));
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
index 6b679b73e29..ea0023ddac1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
@@ -184,6 +184,40 @@ class VertexwiseSchedulingStrategyTest {
                 expectedScheduledVertices, testingSchedulerOperation);
     }
 
+    @Test
+    void testScheduleDownstreamOfHybridEdge() {
+        final TestingSchedulingTopology topology = new 
TestingSchedulingTopology();
+
+        final List<TestingSchedulingExecutionVertex> producers =
+                topology.addExecutionVertices().withParallelism(2).finish();
+
+        final List<TestingSchedulingExecutionVertex> consumers =
+                topology.addExecutionVertices().withParallelism(2).finish();
+
+        // add consumers to scheduling strategy.
+        topology.connectAllToAll(producers, consumers)
+                .withResultPartitionType(ResultPartitionType.HYBRID_FULL)
+                .finish();
+
+        final VertexwiseSchedulingStrategy schedulingStrategy = 
createSchedulingStrategy(topology);
+        inputConsumableDecider.addSourceVertices(new HashSet<>(producers));
+
+        inputConsumableDecider.setInputConsumable(consumers.get(0));
+        inputConsumableDecider.setInputConsumable(consumers.get(1));
+
+        schedulingStrategy.startScheduling();
+
+        // consumers are properly scheduled indicates that the consuming 
relationship and
+        // correlation are successfully built
+        assertLatestScheduledVerticesAreEqualTo(
+                Arrays.asList(
+                        Collections.singletonList(producers.get(0)),
+                        Collections.singletonList(producers.get(1)),
+                        Collections.singletonList(consumers.get(0)),
+                        Collections.singletonList(consumers.get(1))),
+                testingSchedulerOperation);
+    }
+
     @Test
     void testUpdateStrategyWithAllToAll() {
         testUpdateStrategyOnTopologyUpdate(true);

Reply via email to