This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4deaf6edc152d06488f8738386b6a8b7544fe5e9 Author: Weijie Guo <[email protected]> AuthorDate: Fri Aug 5 14:55:38 2022 +0800 [FLINK-28799] PipelinedRegionSchedulingStrategy supports all resultPartitionType. This closes #20487 --- .../adapter/DefaultSchedulingPipelinedRegion.java | 18 +-- .../PipelinedRegionSchedulingStrategy.java | 140 ++++++++++++++------- .../strategy/SchedulingPipelinedRegion.java | 2 +- .../DefaultSchedulingPipelinedRegionTest.java | 7 +- .../PipelinedRegionSchedulingStrategyTest.java | 124 +++++++++++++++++- .../strategy/TestingSchedulingPipelinedRegion.java | 2 +- 6 files changed, 232 insertions(+), 61 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java index 98fab2e24b9..007f8bf5632 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java @@ -40,7 +40,7 @@ public class DefaultSchedulingPipelinedRegion implements SchedulingPipelinedRegi private final Map<ExecutionVertexID, DefaultExecutionVertex> executionVertices; - private Set<ConsumedPartitionGroup> blockingConsumedPartitionGroups; + private Set<ConsumedPartitionGroup> nonPipelinedConsumedPartitionGroups; private Set<ConsumedPartitionGroup> releaseBySchedulerConsumedPartitionGroups; @@ -78,7 +78,7 @@ public class DefaultSchedulingPipelinedRegion implements SchedulingPipelinedRegi } private void initializeConsumedPartitionGroups() { - final Set<ConsumedPartitionGroup> blockingConsumedPartitionGroupSet = new HashSet<>(); + final Set<ConsumedPartitionGroup> nonPipelinedConsumedPartitionGroupSet = new HashSet<>(); final Set<ConsumedPartitionGroup> releaseBySchedulerConsumedPartitionGroupSet = new HashSet<>(); for (DefaultExecutionVertex executionVertex : executionVertices.values()) { @@ -87,8 +87,8 @@ public class DefaultSchedulingPipelinedRegion implements SchedulingPipelinedRegi SchedulingResultPartition consumedPartition = resultPartitionRetriever.apply(consumedPartitionGroup.getFirst()); - if (!consumedPartition.getResultType().canBePipelinedConsumed()) { - blockingConsumedPartitionGroupSet.add(consumedPartitionGroup); + if (!consumedPartition.getResultType().mustBePipelinedConsumed()) { + nonPipelinedConsumedPartitionGroupSet.add(consumedPartitionGroup); } if (consumedPartition.getResultType().isReleaseByScheduler()) { releaseBySchedulerConsumedPartitionGroupSet.add(consumedPartitionGroup); @@ -96,18 +96,18 @@ public class DefaultSchedulingPipelinedRegion implements SchedulingPipelinedRegi } } - this.blockingConsumedPartitionGroups = - Collections.unmodifiableSet(blockingConsumedPartitionGroupSet); + this.nonPipelinedConsumedPartitionGroups = + Collections.unmodifiableSet(nonPipelinedConsumedPartitionGroupSet); this.releaseBySchedulerConsumedPartitionGroups = Collections.unmodifiableSet(releaseBySchedulerConsumedPartitionGroupSet); } @Override - public Iterable<ConsumedPartitionGroup> getAllBlockingConsumedPartitionGroups() { - if (blockingConsumedPartitionGroups == null) { + public Iterable<ConsumedPartitionGroup> getAllNonPipelinedConsumedPartitionGroups() { + if (nonPipelinedConsumedPartitionGroups == null) { initializeConsumedPartitionGroups(); } - return blockingConsumedPartitionGroups; + return nonPipelinedConsumedPartitionGroups; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java index 2ace05c67fa..134f1eda5d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java @@ -53,6 +53,10 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>> regionVerticesSorted = new IdentityHashMap<>(); + /** All ConsumedPartitionGroups of one schedulingPipelinedRegion. */ + private final Map<SchedulingPipelinedRegion, Set<ConsumedPartitionGroup>> + consumedPartitionGroupsOfRegion = new IdentityHashMap<>(); + /** The ConsumedPartitionGroups which are produced by multiple regions. */ private final Set<ConsumedPartitionGroup> crossRegionConsumedPartitionGroups = Collections.newSetFromMap(new IdentityHashMap<>()); @@ -75,6 +79,8 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { initPartitionGroupConsumerRegions(); + initConsumedPartitionGroupsOfRegion(); + for (SchedulingExecutionVertex vertex : schedulingTopology.getVertices()) { final SchedulingPipelinedRegion region = schedulingTopology.getPipelinedRegionOfVertex(vertex.getId()); @@ -84,6 +90,21 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { } } + private void initConsumedPartitionGroupsOfRegion() { + for (SchedulingPipelinedRegion region : schedulingTopology.getAllPipelinedRegions()) { + Set<ConsumedPartitionGroup> consumedPartitionGroupsSetOfRegion = new HashSet<>(); + for (SchedulingExecutionVertex executionVertex : region.getVertices()) { + consumedPartitionGroupsSetOfRegion.addAll( + IterableUtils.toStream(executionVertex.getProducedResults()) + .flatMap( + partition -> + partition.getConsumedPartitionGroups().stream()) + .collect(Collectors.toSet())); + } + consumedPartitionGroupsOfRegion.put(region, consumedPartitionGroupsSetOfRegion); + } + } + private void initCrossRegionConsumedPartitionGroups() { final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>> producerRegionsByConsumedPartitionGroup = new IdentityHashMap<>(); @@ -91,7 +112,7 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { for (SchedulingPipelinedRegion pipelinedRegion : schedulingTopology.getAllPipelinedRegions()) { for (ConsumedPartitionGroup consumedPartitionGroup : - pipelinedRegion.getAllBlockingConsumedPartitionGroups()) { + pipelinedRegion.getAllNonPipelinedConsumedPartitionGroups()) { producerRegionsByConsumedPartitionGroup.computeIfAbsent( consumedPartitionGroup, this::getProducerRegionsForConsumedPartitionGroup); } @@ -100,7 +121,7 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { for (SchedulingPipelinedRegion pipelinedRegion : schedulingTopology.getAllPipelinedRegions()) { for (ConsumedPartitionGroup consumedPartitionGroup : - pipelinedRegion.getAllBlockingConsumedPartitionGroups()) { + pipelinedRegion.getAllNonPipelinedConsumedPartitionGroups()) { final Set<SchedulingPipelinedRegion> producerRegions = producerRegionsByConsumedPartitionGroup.get(consumedPartitionGroup); if (producerRegions.size() > 1 && producerRegions.contains(pipelinedRegion)) { @@ -128,7 +149,7 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { private void initPartitionGroupConsumerRegions() { for (SchedulingPipelinedRegion region : schedulingTopology.getAllPipelinedRegions()) { for (ConsumedPartitionGroup consumedPartitionGroup : - region.getAllBlockingConsumedPartitionGroups()) { + region.getAllNonPipelinedConsumedPartitionGroups()) { if (crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup) || isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) { partitionGroupConsumerRegions @@ -139,6 +160,18 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { } } + private Set<SchedulingPipelinedRegion> getDownstreamRegionsOfVertex( + SchedulingExecutionVertex executionVertex) { + return IterableUtils.toStream(executionVertex.getProducedResults()) + .flatMap(partition -> partition.getConsumedPartitionGroups().stream()) + .flatMap( + partitionGroup -> + partitionGroupConsumerRegions + .getOrDefault(partitionGroup, Collections.emptySet()) + .stream()) + .collect(Collectors.toSet()); + } + @Override public void startScheduling() { final Set<SchedulingPipelinedRegion> sourceRegions = @@ -150,7 +183,7 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { private boolean isSourceRegion(SchedulingPipelinedRegion region) { for (ConsumedPartitionGroup consumedPartitionGroup : - region.getAllBlockingConsumedPartitionGroups()) { + region.getAllNonPipelinedConsumedPartitionGroups()) { if (crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup) || isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) { return false; @@ -173,32 +206,8 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { public void onExecutionStateChange( final ExecutionVertexID executionVertexId, final ExecutionState executionState) { if (executionState == ExecutionState.FINISHED) { - final Set<ConsumedPartitionGroup> finishedConsumedPartitionGroups = - IterableUtils.toStream( - schedulingTopology - .getVertex(executionVertexId) - .getProducedResults()) - .filter( - partition -> - partition.getState() == ResultPartitionState.CONSUMABLE) - .flatMap(partition -> partition.getConsumedPartitionGroups().stream()) - .filter( - group -> - crossRegionConsumedPartitionGroups.contains(group) - || group.areAllPartitionsFinished()) - .collect(Collectors.toSet()); - - final Set<SchedulingPipelinedRegion> consumerRegions = - finishedConsumedPartitionGroups.stream() - .flatMap( - partitionGroup -> - partitionGroupConsumerRegions - .getOrDefault( - partitionGroup, Collections.emptySet()) - .stream()) - .collect(Collectors.toSet()); - - maybeScheduleRegions(consumerRegions); + maybeScheduleRegions( + getDownstreamRegionsOfVertex(schedulingTopology.getVertex(executionVertexId))); } } @@ -211,17 +220,34 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { schedulingTopology, regions); final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>(); + final Set<SchedulingPipelinedRegion> downstreamSchedulableRegions = new HashSet<>(); for (SchedulingPipelinedRegion region : regionsSorted) { - maybeScheduleRegion(region, consumableStatusCache); + if (maybeScheduleRegion(region, consumableStatusCache)) { + downstreamSchedulableRegions.addAll( + consumedPartitionGroupsOfRegion.getOrDefault(region, Collections.emptySet()) + .stream() + .flatMap( + consumedPartitionGroups -> + partitionGroupConsumerRegions + .getOrDefault( + consumedPartitionGroups, + Collections.emptySet()) + .stream()) + .collect(Collectors.toSet())); + } + } + + if (!downstreamSchedulableRegions.isEmpty()) { + maybeScheduleRegions(downstreamSchedulableRegions); } } - private void maybeScheduleRegion( + private boolean maybeScheduleRegion( final SchedulingPipelinedRegion region, final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) { if (scheduledRegions.contains(region) || !areRegionInputsAllConsumable(region, consumableStatusCache)) { - return; + return false; } checkState( @@ -230,20 +256,23 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { schedulerOperations.allocateSlotsAndDeploy(regionVerticesSorted.get(region)); scheduledRegions.add(region); + return true; } private boolean areRegionInputsAllConsumable( final SchedulingPipelinedRegion region, final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) { for (ConsumedPartitionGroup consumedPartitionGroup : - region.getAllBlockingConsumedPartitionGroups()) { + region.getAllNonPipelinedConsumedPartitionGroups()) { if (crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup)) { - if (!isCrossRegionConsumedPartitionConsumable(consumedPartitionGroup, region)) { + if (!isDownstreamOfCrossRegionConsumedPartitionSchedulable( + consumedPartitionGroup, region)) { return false; } } else if (isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) { if (!consumableStatusCache.computeIfAbsent( - consumedPartitionGroup, this::isConsumedPartitionGroupConsumable)) { + consumedPartitionGroup, + this::isDownstreamConsumedPartitionGroupSchedulable)) { return false; } } @@ -251,25 +280,42 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { return true; } - private boolean isConsumedPartitionGroupConsumable( + private boolean isDownstreamConsumedPartitionGroupSchedulable( final ConsumedPartitionGroup consumedPartitionGroup) { - for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) { - if (schedulingTopology.getResultPartition(partitionId).getState() - != ResultPartitionState.CONSUMABLE) { - return false; + if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) { + for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) { + if (!scheduledRegions.contains(getProducerRegion(partitionId))) { + return false; + } + } + } else { + for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) { + if (schedulingTopology.getResultPartition(partitionId).getState() + != ResultPartitionState.CONSUMABLE) { + return false; + } } } return true; } - private boolean isCrossRegionConsumedPartitionConsumable( + private boolean isDownstreamOfCrossRegionConsumedPartitionSchedulable( final ConsumedPartitionGroup consumedPartitionGroup, final SchedulingPipelinedRegion pipelinedRegion) { - for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) { - if (isExternalConsumedPartition(partitionId, pipelinedRegion) - && schedulingTopology.getResultPartition(partitionId).getState() - != ResultPartitionState.CONSUMABLE) { - return false; + if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) { + for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) { + if (isExternalConsumedPartition(partitionId, pipelinedRegion) + && !scheduledRegions.contains(getProducerRegion(partitionId))) { + return false; + } + } + } else { + for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) { + if (isExternalConsumedPartition(partitionId, pipelinedRegion) + && schedulingTopology.getResultPartition(partitionId).getState() + != ResultPartitionState.CONSUMABLE) { + return false; + } } } return true; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java index 69fd8299deb..ae5be2f210d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java @@ -35,7 +35,7 @@ public interface SchedulingPipelinedRegion * * @return set of {@link ConsumedPartitionGroup}s */ - Iterable<ConsumedPartitionGroup> getAllBlockingConsumedPartitionGroups(); + Iterable<ConsumedPartitionGroup> getAllNonPipelinedConsumedPartitionGroups(); /** * Get all distinct releaseByScheduler {@link ConsumedPartitionGroup}s. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java index 949f039727c..147c91094fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java @@ -147,7 +147,7 @@ public class DefaultSchedulingPipelinedRegionTest extends TestLogger { final Set<IntermediateResultPartitionID> secondPipelinedRegionConsumedResults = new HashSet<>(); for (ConsumedPartitionGroup consumedPartitionGroup : - secondPipelinedRegion.getAllBlockingConsumedPartitionGroups()) { + secondPipelinedRegion.getAllNonPipelinedConsumedPartitionGroups()) { for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) { if (!secondPipelinedRegion.contains( topology.getResultPartition(partitionId).getProducer().getId())) { @@ -157,7 +157,10 @@ public class DefaultSchedulingPipelinedRegionTest extends TestLogger { } assertThat( - firstPipelinedRegion.getAllBlockingConsumedPartitionGroups().iterator().hasNext(), + firstPipelinedRegion + .getAllNonPipelinedConsumedPartitionGroups() + .iterator() + .hasNext(), is(false)); assertThat(secondPipelinedRegionConsumedResults, contains(b0ConsumedResultPartition)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java index 34395e02999..c99b816d532 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java @@ -267,7 +267,7 @@ class PipelinedRegionSchedulingStrategyTest { } @Test - void testSchedulingTopologyWithCrossRegionConsumedPartitionGroups() throws Exception { + void testSchedulingTopologyWithBlockingCrossRegionConsumedPartitionGroups() throws Exception { final JobVertex v1 = createJobVertex("v1", 4); final JobVertex v2 = createJobVertex("v2", 3); final JobVertex v3 = createJobVertex("v3", 2); @@ -339,6 +339,74 @@ class PipelinedRegionSchedulingStrategyTest { } } + @Test + void testSchedulingTopologyWithHybridCrossRegionConsumedPartitionGroups() throws Exception { + final JobVertex v1 = createJobVertex("v1", 4); + final JobVertex v2 = createJobVertex("v2", 3); + final JobVertex v3 = createJobVertex("v3", 2); + + v2.connectNewDataSetAsInput( + v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID); + v3.connectNewDataSetAsInput( + v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + + final List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3)); + final JobGraph jobGraph = + JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build(); + final ExecutionGraph executionGraph = + TestingDefaultExecutionGraphBuilder.newBuilder() + .setJobGraph(jobGraph) + .build(EXECUTOR_RESOURCE.getExecutor()); + + final SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology(); + + // Test whether the topology is built correctly + final List<SchedulingPipelinedRegion> regions = new ArrayList<>(); + schedulingTopology.getAllPipelinedRegions().forEach(regions::add); + assertThat(regions).hasSize(2); + + final ExecutionVertex v31 = executionGraph.getJobVertex(v3.getID()).getTaskVertices()[0]; + + final Set<ExecutionVertexID> region1 = new HashSet<>(); + schedulingTopology + .getPipelinedRegionOfVertex(v31.getID()) + .getVertices() + .forEach(vertex -> region1.add(vertex.getId())); + assertThat(region1).hasSize(5); + + final ExecutionVertex v32 = executionGraph.getJobVertex(v3.getID()).getTaskVertices()[1]; + + final Set<ExecutionVertexID> region2 = new HashSet<>(); + schedulingTopology + .getPipelinedRegionOfVertex(v32.getID()) + .getVertices() + .forEach(vertex -> region2.add(vertex.getId())); + assertThat(region2).hasSize(4); + + startScheduling(schedulingTopology); + + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2); + + // Test whether region 1 is scheduled correctly + final List<ExecutionVertexID> scheduledVertices1 = + testingSchedulerOperation.getScheduledVertices().get(0); + assertThat(scheduledVertices1).hasSize(5); + + for (ExecutionVertexID vertexId : scheduledVertices1) { + assertThat(region1).contains(vertexId); + } + + // Test whether region 2 is scheduled correctly + final List<ExecutionVertexID> scheduledVertices2 = + testingSchedulerOperation.getScheduledVertices().get(1); + assertThat(scheduledVertices2).hasSize(4); + + for (ExecutionVertexID vertexId : scheduledVertices2) { + assertThat(region2).contains(vertexId); + } + } + @Test void testScheduleBlockingDownstreamTaskIndividually() throws Exception { final JobVertex v1 = createJobVertex("v1", 2); @@ -397,6 +465,60 @@ class PipelinedRegionSchedulingStrategyTest { assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2); } + /** + * Source and it's downstream with hybrid edge will be scheduled. When blocking result partition + * finished, it's downstream will be scheduled. + * + * <pre> + * V1 ----> V2 ----> V3 ----> V4 + * | | | + * Hybrid Blocking Hybrid + * </pre> + */ + @Test + void testScheduleTopologyWithHybridAndBlockingEdge() throws Exception { + final JobVertex v1 = createJobVertex("v1", 1); + final JobVertex v2 = createJobVertex("v2", 1); + final JobVertex v3 = createJobVertex("v3", 1); + final JobVertex v4 = createJobVertex("v4", 1); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID); + v3.connectNewDataSetAsInput( + v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID); + + final List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3, v4)); + final JobGraph jobGraph = + JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build(); + final ExecutionGraph executionGraph = + TestingDefaultExecutionGraphBuilder.newBuilder() + .setJobGraph(jobGraph) + .build(EXECUTOR_RESOURCE.getExecutor()); + + final SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology(); + + PipelinedRegionSchedulingStrategy schedulingStrategy = startScheduling(schedulingTopology); + + // v1 & v2 will be scheduled as v1 is a source and v1 -> v2 is a hybrid downstream. + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2); + final ExecutionVertex v11 = executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0]; + final ExecutionVertex v21 = executionGraph.getJobVertex(v2.getID()).getTaskVertices()[0]; + assertThat(testingSchedulerOperation.getScheduledVertices().get(0)) + .containsExactly(v11.getID()); + assertThat(testingSchedulerOperation.getScheduledVertices().get(1)) + .containsExactly(v21.getID()); + // finish v2 to trigger new round of scheduling. + v21.finishAllBlockingPartitions(); + schedulingStrategy.onExecutionStateChange(v21.getID(), ExecutionState.FINISHED); + assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(4); + final ExecutionVertex v31 = executionGraph.getJobVertex(v3.getID()).getTaskVertices()[0]; + final ExecutionVertex v41 = executionGraph.getJobVertex(v4.getID()).getTaskVertices()[0]; + assertThat(testingSchedulerOperation.getScheduledVertices().get(2)) + .containsExactly(v31.getID()); + assertThat(testingSchedulerOperation.getScheduledVertices().get(3)) + .containsExactly(v41.getID()); + } + /** Inner non-pipelined edge will not affect it's region be scheduled. */ @Test void testSchedulingRegionWithInnerNonPipelinedEdge() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java index f18b2513739..aecb5f0a2fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java @@ -82,7 +82,7 @@ public class TestingSchedulingPipelinedRegion implements SchedulingPipelinedRegi } @Override - public Iterable<ConsumedPartitionGroup> getAllBlockingConsumedPartitionGroups() { + public Iterable<ConsumedPartitionGroup> getAllNonPipelinedConsumedPartitionGroups() { return Collections.unmodifiableSet(blockingConsumedPartitionGroups); }
