This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a6a321777a53b8d7807078cb8d98c7b3de242fed Author: Weijie Guo <[email protected]> AuthorDate: Mon Oct 31 16:14:24 2022 +0800 [FLINK-29767] VertexwiseSchedulingStrategy supports hybrid shuffle edge. This closes #21199 --- .../strategy/VertexwiseSchedulingStrategy.java | 89 ++++++++++++++++------ .../strategy/VertexwiseSchedulingStrategyTest.java | 34 +++++++++ 2 files changed, 99 insertions(+), 24 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java index 74c47234a3a..6524e9fbfc0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java @@ -26,8 +26,8 @@ import org.apache.flink.util.IterableUtils; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,9 +38,8 @@ import static org.apache.flink.util.Preconditions.checkState; /** * {@link SchedulingStrategy} instance which schedules tasks in granularity of vertex (which - * indicates this strategy only supports ALL_EDGES_BLOCKING batch jobs). Note that this strategy - * implements {@link SchedulingTopologyListener}, so it can handle the updates of scheduling - * topology. + * indicates this strategy only supports batch jobs). Note that this strategy implements {@link + * SchedulingTopologyListener}, so it can handle the updates of scheduling topology. */ public class VertexwiseSchedulingStrategy implements SchedulingStrategy, SchedulingTopologyListener { @@ -114,8 +113,6 @@ public class VertexwiseSchedulingStrategy } private void maybeScheduleVertices(final Set<ExecutionVertexID> vertices) { - final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>(); - Set<ExecutionVertexID> allCandidates; if (newVertices.isEmpty()) { allCandidates = vertices; @@ -125,31 +122,75 @@ public class VertexwiseSchedulingStrategy newVertices.clear(); } - final Set<ExecutionVertexID> verticesToDeploy = - allCandidates.stream() - .filter( - vertexId -> { - SchedulingExecutionVertex vertex = - schedulingTopology.getVertex(vertexId); - checkState(vertex.getState() == ExecutionState.CREATED); - return inputConsumableDecider.isInputConsumable( - vertex, Collections.emptySet(), consumableStatusCache); - }) - .collect(Collectors.toSet()); + final Set<ExecutionVertexID> verticesToSchedule = new HashSet<>(); + + Set<ExecutionVertexID> nextVertices = allCandidates; + while (!nextVertices.isEmpty()) { + nextVertices = addToScheduleAndGetVertices(nextVertices, verticesToSchedule); + } + + scheduleVerticesOneByOne(verticesToSchedule); + scheduledVertices.addAll(verticesToSchedule); + } + + private Set<ExecutionVertexID> addToScheduleAndGetVertices( + Set<ExecutionVertexID> currentVertices, Set<ExecutionVertexID> verticesToSchedule) { + Set<ExecutionVertexID> nextVertices = new HashSet<>(); + // cache consumedPartitionGroup's consumable status to avoid compute repeatedly. + final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new IdentityHashMap<>(); + final Set<ConsumerVertexGroup> visitedConsumerVertexGroup = + Collections.newSetFromMap(new IdentityHashMap<>()); + + for (ExecutionVertexID currentVertex : currentVertices) { + if (isVertexSchedulable(currentVertex, consumableStatusCache, verticesToSchedule)) { + verticesToSchedule.add(currentVertex); + Set<ConsumerVertexGroup> canBePipelinedConsumerVertexGroups = + IterableUtils.toStream( + schedulingTopology + .getVertex(currentVertex) + .getProducedResults()) + .map(SchedulingResultPartition::getConsumerVertexGroups) + .flatMap(Collection::stream) + .filter( + (consumerVertexGroup) -> + consumerVertexGroup + .getResultPartitionType() + .canBePipelinedConsumed()) + .collect(Collectors.toSet()); + for (ConsumerVertexGroup consumerVertexGroup : canBePipelinedConsumerVertexGroups) { + if (!visitedConsumerVertexGroup.contains(consumerVertexGroup)) { + visitedConsumerVertexGroup.add(consumerVertexGroup); + nextVertices.addAll( + IterableUtils.toStream(consumerVertexGroup) + .collect(Collectors.toSet())); + } + } + } + } + return nextVertices; + } - scheduleVerticesOneByOne(verticesToDeploy); - scheduledVertices.addAll(verticesToDeploy); + private boolean isVertexSchedulable( + final ExecutionVertexID vertex, + final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache, + final Set<ExecutionVertexID> verticesToSchedule) { + return !verticesToSchedule.contains(vertex) + && !scheduledVertices.contains(vertex) + && inputConsumableDecider.isInputConsumable( + schedulingTopology.getVertex(vertex), + verticesToSchedule, + consumableStatusCache); } - private void scheduleVerticesOneByOne(final Set<ExecutionVertexID> verticesToDeploy) { - if (verticesToDeploy.isEmpty()) { + private void scheduleVerticesOneByOne(final Set<ExecutionVertexID> verticesToSchedule) { + if (verticesToSchedule.isEmpty()) { return; } - final List<ExecutionVertexID> sortedVerticesToDeploy = + final List<ExecutionVertexID> sortedVerticesToSchedule = SchedulingStrategyUtils.sortExecutionVerticesInTopologicalOrder( - schedulingTopology, verticesToDeploy); + schedulingTopology, verticesToSchedule); - sortedVerticesToDeploy.forEach( + sortedVerticesToSchedule.forEach( id -> schedulerOperations.allocateSlotsAndDeploy(Collections.singletonList(id))); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java index 6b679b73e29..ea0023ddac1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java @@ -184,6 +184,40 @@ class VertexwiseSchedulingStrategyTest { expectedScheduledVertices, testingSchedulerOperation); } + @Test + void testScheduleDownstreamOfHybridEdge() { + final TestingSchedulingTopology topology = new TestingSchedulingTopology(); + + final List<TestingSchedulingExecutionVertex> producers = + topology.addExecutionVertices().withParallelism(2).finish(); + + final List<TestingSchedulingExecutionVertex> consumers = + topology.addExecutionVertices().withParallelism(2).finish(); + + // add consumers to scheduling strategy. + topology.connectAllToAll(producers, consumers) + .withResultPartitionType(ResultPartitionType.HYBRID_FULL) + .finish(); + + final VertexwiseSchedulingStrategy schedulingStrategy = createSchedulingStrategy(topology); + inputConsumableDecider.addSourceVertices(new HashSet<>(producers)); + + inputConsumableDecider.setInputConsumable(consumers.get(0)); + inputConsumableDecider.setInputConsumable(consumers.get(1)); + + schedulingStrategy.startScheduling(); + + // consumers are properly scheduled indicates that the consuming relationship and + // correlation are successfully built + assertLatestScheduledVerticesAreEqualTo( + Arrays.asList( + Collections.singletonList(producers.get(0)), + Collections.singletonList(producers.get(1)), + Collections.singletonList(consumers.get(0)), + Collections.singletonList(consumers.get(1))), + testingSchedulerOperation); + } + @Test void testUpdateStrategyWithAllToAll() { testUpdateStrategyOnTopologyUpdate(true);
