This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4deaf6edc152d06488f8738386b6a8b7544fe5e9
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Aug 5 14:55:38 2022 +0800

    [FLINK-28799] PipelinedRegionSchedulingStrategy supports all 
resultPartitionType.
    
    This closes #20487
---
 .../adapter/DefaultSchedulingPipelinedRegion.java  |  18 +--
 .../PipelinedRegionSchedulingStrategy.java         | 140 ++++++++++++++-------
 .../strategy/SchedulingPipelinedRegion.java        |   2 +-
 .../DefaultSchedulingPipelinedRegionTest.java      |   7 +-
 .../PipelinedRegionSchedulingStrategyTest.java     | 124 +++++++++++++++++-
 .../strategy/TestingSchedulingPipelinedRegion.java |   2 +-
 6 files changed, 232 insertions(+), 61 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
index 98fab2e24b9..007f8bf5632 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
@@ -40,7 +40,7 @@ public class DefaultSchedulingPipelinedRegion implements 
SchedulingPipelinedRegi
 
     private final Map<ExecutionVertexID, DefaultExecutionVertex> 
executionVertices;
 
-    private Set<ConsumedPartitionGroup> blockingConsumedPartitionGroups;
+    private Set<ConsumedPartitionGroup> nonPipelinedConsumedPartitionGroups;
 
     private Set<ConsumedPartitionGroup> 
releaseBySchedulerConsumedPartitionGroups;
 
@@ -78,7 +78,7 @@ public class DefaultSchedulingPipelinedRegion implements 
SchedulingPipelinedRegi
     }
 
     private void initializeConsumedPartitionGroups() {
-        final Set<ConsumedPartitionGroup> blockingConsumedPartitionGroupSet = 
new HashSet<>();
+        final Set<ConsumedPartitionGroup> 
nonPipelinedConsumedPartitionGroupSet = new HashSet<>();
         final Set<ConsumedPartitionGroup> 
releaseBySchedulerConsumedPartitionGroupSet =
                 new HashSet<>();
         for (DefaultExecutionVertex executionVertex : 
executionVertices.values()) {
@@ -87,8 +87,8 @@ public class DefaultSchedulingPipelinedRegion implements 
SchedulingPipelinedRegi
                 SchedulingResultPartition consumedPartition =
                         
resultPartitionRetriever.apply(consumedPartitionGroup.getFirst());
 
-                if 
(!consumedPartition.getResultType().canBePipelinedConsumed()) {
-                    
blockingConsumedPartitionGroupSet.add(consumedPartitionGroup);
+                if 
(!consumedPartition.getResultType().mustBePipelinedConsumed()) {
+                    
nonPipelinedConsumedPartitionGroupSet.add(consumedPartitionGroup);
                 }
                 if (consumedPartition.getResultType().isReleaseByScheduler()) {
                     
releaseBySchedulerConsumedPartitionGroupSet.add(consumedPartitionGroup);
@@ -96,18 +96,18 @@ public class DefaultSchedulingPipelinedRegion implements 
SchedulingPipelinedRegi
             }
         }
 
-        this.blockingConsumedPartitionGroups =
-                Collections.unmodifiableSet(blockingConsumedPartitionGroupSet);
+        this.nonPipelinedConsumedPartitionGroups =
+                
Collections.unmodifiableSet(nonPipelinedConsumedPartitionGroupSet);
         this.releaseBySchedulerConsumedPartitionGroups =
                 
Collections.unmodifiableSet(releaseBySchedulerConsumedPartitionGroupSet);
     }
 
     @Override
-    public Iterable<ConsumedPartitionGroup> 
getAllBlockingConsumedPartitionGroups() {
-        if (blockingConsumedPartitionGroups == null) {
+    public Iterable<ConsumedPartitionGroup> 
getAllNonPipelinedConsumedPartitionGroups() {
+        if (nonPipelinedConsumedPartitionGroups == null) {
             initializeConsumedPartitionGroups();
         }
-        return blockingConsumedPartitionGroups;
+        return nonPipelinedConsumedPartitionGroups;
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
index 2ace05c67fa..134f1eda5d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
@@ -53,6 +53,10 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
     private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>> 
regionVerticesSorted =
             new IdentityHashMap<>();
 
+    /** All ConsumedPartitionGroups of one schedulingPipelinedRegion. */
+    private final Map<SchedulingPipelinedRegion, Set<ConsumedPartitionGroup>>
+            consumedPartitionGroupsOfRegion = new IdentityHashMap<>();
+
     /** The ConsumedPartitionGroups which are produced by multiple regions. */
     private final Set<ConsumedPartitionGroup> 
crossRegionConsumedPartitionGroups =
             Collections.newSetFromMap(new IdentityHashMap<>());
@@ -75,6 +79,8 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
 
         initPartitionGroupConsumerRegions();
 
+        initConsumedPartitionGroupsOfRegion();
+
         for (SchedulingExecutionVertex vertex : 
schedulingTopology.getVertices()) {
             final SchedulingPipelinedRegion region =
                     
schedulingTopology.getPipelinedRegionOfVertex(vertex.getId());
@@ -84,6 +90,21 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
         }
     }
 
+    private void initConsumedPartitionGroupsOfRegion() {
+        for (SchedulingPipelinedRegion region : 
schedulingTopology.getAllPipelinedRegions()) {
+            Set<ConsumedPartitionGroup> consumedPartitionGroupsSetOfRegion = 
new HashSet<>();
+            for (SchedulingExecutionVertex executionVertex : 
region.getVertices()) {
+                consumedPartitionGroupsSetOfRegion.addAll(
+                        
IterableUtils.toStream(executionVertex.getProducedResults())
+                                .flatMap(
+                                        partition ->
+                                                
partition.getConsumedPartitionGroups().stream())
+                                .collect(Collectors.toSet()));
+            }
+            consumedPartitionGroupsOfRegion.put(region, 
consumedPartitionGroupsSetOfRegion);
+        }
+    }
+
     private void initCrossRegionConsumedPartitionGroups() {
         final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
                 producerRegionsByConsumedPartitionGroup = new 
IdentityHashMap<>();
@@ -91,7 +112,7 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
         for (SchedulingPipelinedRegion pipelinedRegion :
                 schedulingTopology.getAllPipelinedRegions()) {
             for (ConsumedPartitionGroup consumedPartitionGroup :
-                    pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+                    
pipelinedRegion.getAllNonPipelinedConsumedPartitionGroups()) {
                 producerRegionsByConsumedPartitionGroup.computeIfAbsent(
                         consumedPartitionGroup, 
this::getProducerRegionsForConsumedPartitionGroup);
             }
@@ -100,7 +121,7 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
         for (SchedulingPipelinedRegion pipelinedRegion :
                 schedulingTopology.getAllPipelinedRegions()) {
             for (ConsumedPartitionGroup consumedPartitionGroup :
-                    pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+                    
pipelinedRegion.getAllNonPipelinedConsumedPartitionGroups()) {
                 final Set<SchedulingPipelinedRegion> producerRegions =
                         
producerRegionsByConsumedPartitionGroup.get(consumedPartitionGroup);
                 if (producerRegions.size() > 1 && 
producerRegions.contains(pipelinedRegion)) {
@@ -128,7 +149,7 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
     private void initPartitionGroupConsumerRegions() {
         for (SchedulingPipelinedRegion region : 
schedulingTopology.getAllPipelinedRegions()) {
             for (ConsumedPartitionGroup consumedPartitionGroup :
-                    region.getAllBlockingConsumedPartitionGroups()) {
+                    region.getAllNonPipelinedConsumedPartitionGroups()) {
                 if 
(crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup)
                         || 
isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
                     partitionGroupConsumerRegions
@@ -139,6 +160,18 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
         }
     }
 
+    private Set<SchedulingPipelinedRegion> getDownstreamRegionsOfVertex(
+            SchedulingExecutionVertex executionVertex) {
+        return IterableUtils.toStream(executionVertex.getProducedResults())
+                .flatMap(partition -> 
partition.getConsumedPartitionGroups().stream())
+                .flatMap(
+                        partitionGroup ->
+                                partitionGroupConsumerRegions
+                                        .getOrDefault(partitionGroup, 
Collections.emptySet())
+                                        .stream())
+                .collect(Collectors.toSet());
+    }
+
     @Override
     public void startScheduling() {
         final Set<SchedulingPipelinedRegion> sourceRegions =
@@ -150,7 +183,7 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
 
     private boolean isSourceRegion(SchedulingPipelinedRegion region) {
         for (ConsumedPartitionGroup consumedPartitionGroup :
-                region.getAllBlockingConsumedPartitionGroups()) {
+                region.getAllNonPipelinedConsumedPartitionGroups()) {
             if 
(crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup)
                     || 
isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
                 return false;
@@ -173,32 +206,8 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
     public void onExecutionStateChange(
             final ExecutionVertexID executionVertexId, final ExecutionState 
executionState) {
         if (executionState == ExecutionState.FINISHED) {
-            final Set<ConsumedPartitionGroup> finishedConsumedPartitionGroups =
-                    IterableUtils.toStream(
-                                    schedulingTopology
-                                            .getVertex(executionVertexId)
-                                            .getProducedResults())
-                            .filter(
-                                    partition ->
-                                            partition.getState() == 
ResultPartitionState.CONSUMABLE)
-                            .flatMap(partition -> 
partition.getConsumedPartitionGroups().stream())
-                            .filter(
-                                    group ->
-                                            
crossRegionConsumedPartitionGroups.contains(group)
-                                                    || 
group.areAllPartitionsFinished())
-                            .collect(Collectors.toSet());
-
-            final Set<SchedulingPipelinedRegion> consumerRegions =
-                    finishedConsumedPartitionGroups.stream()
-                            .flatMap(
-                                    partitionGroup ->
-                                            partitionGroupConsumerRegions
-                                                    .getOrDefault(
-                                                            partitionGroup, 
Collections.emptySet())
-                                                    .stream())
-                            .collect(Collectors.toSet());
-
-            maybeScheduleRegions(consumerRegions);
+            maybeScheduleRegions(
+                    
getDownstreamRegionsOfVertex(schedulingTopology.getVertex(executionVertexId)));
         }
     }
 
@@ -211,17 +220,34 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
                         schedulingTopology, regions);
 
         final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new 
HashMap<>();
+        final Set<SchedulingPipelinedRegion> downstreamSchedulableRegions = 
new HashSet<>();
         for (SchedulingPipelinedRegion region : regionsSorted) {
-            maybeScheduleRegion(region, consumableStatusCache);
+            if (maybeScheduleRegion(region, consumableStatusCache)) {
+                downstreamSchedulableRegions.addAll(
+                        consumedPartitionGroupsOfRegion.getOrDefault(region, 
Collections.emptySet())
+                                .stream()
+                                .flatMap(
+                                        consumedPartitionGroups ->
+                                                partitionGroupConsumerRegions
+                                                        .getOrDefault(
+                                                                
consumedPartitionGroups,
+                                                                
Collections.emptySet())
+                                                        .stream())
+                                .collect(Collectors.toSet()));
+            }
+        }
+
+        if (!downstreamSchedulableRegions.isEmpty()) {
+            maybeScheduleRegions(downstreamSchedulableRegions);
         }
     }
 
-    private void maybeScheduleRegion(
+    private boolean maybeScheduleRegion(
             final SchedulingPipelinedRegion region,
             final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
         if (scheduledRegions.contains(region)
                 || !areRegionInputsAllConsumable(region, 
consumableStatusCache)) {
-            return;
+            return false;
         }
 
         checkState(
@@ -230,20 +256,23 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
 
         
schedulerOperations.allocateSlotsAndDeploy(regionVerticesSorted.get(region));
         scheduledRegions.add(region);
+        return true;
     }
 
     private boolean areRegionInputsAllConsumable(
             final SchedulingPipelinedRegion region,
             final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
         for (ConsumedPartitionGroup consumedPartitionGroup :
-                region.getAllBlockingConsumedPartitionGroups()) {
+                region.getAllNonPipelinedConsumedPartitionGroups()) {
             if 
(crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup)) {
-                if 
(!isCrossRegionConsumedPartitionConsumable(consumedPartitionGroup, region)) {
+                if (!isDownstreamOfCrossRegionConsumedPartitionSchedulable(
+                        consumedPartitionGroup, region)) {
                     return false;
                 }
             } else if 
(isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
                 if (!consumableStatusCache.computeIfAbsent(
-                        consumedPartitionGroup, 
this::isConsumedPartitionGroupConsumable)) {
+                        consumedPartitionGroup,
+                        this::isDownstreamConsumedPartitionGroupSchedulable)) {
                     return false;
                 }
             }
@@ -251,25 +280,42 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
         return true;
     }
 
-    private boolean isConsumedPartitionGroupConsumable(
+    private boolean isDownstreamConsumedPartitionGroupSchedulable(
             final ConsumedPartitionGroup consumedPartitionGroup) {
-        for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
-            if (schedulingTopology.getResultPartition(partitionId).getState()
-                    != ResultPartitionState.CONSUMABLE) {
-                return false;
+        if 
(consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
+            for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                if 
(!scheduledRegions.contains(getProducerRegion(partitionId))) {
+                    return false;
+                }
+            }
+        } else {
+            for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                if 
(schedulingTopology.getResultPartition(partitionId).getState()
+                        != ResultPartitionState.CONSUMABLE) {
+                    return false;
+                }
             }
         }
         return true;
     }
 
-    private boolean isCrossRegionConsumedPartitionConsumable(
+    private boolean isDownstreamOfCrossRegionConsumedPartitionSchedulable(
             final ConsumedPartitionGroup consumedPartitionGroup,
             final SchedulingPipelinedRegion pipelinedRegion) {
-        for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
-            if (isExternalConsumedPartition(partitionId, pipelinedRegion)
-                    && 
schedulingTopology.getResultPartition(partitionId).getState()
-                            != ResultPartitionState.CONSUMABLE) {
-                return false;
+        if 
(consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
+            for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                if (isExternalConsumedPartition(partitionId, pipelinedRegion)
+                        && 
!scheduledRegions.contains(getProducerRegion(partitionId))) {
+                    return false;
+                }
+            }
+        } else {
+            for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                if (isExternalConsumedPartition(partitionId, pipelinedRegion)
+                        && 
schedulingTopology.getResultPartition(partitionId).getState()
+                                != ResultPartitionState.CONSUMABLE) {
+                    return false;
+                }
             }
         }
         return true;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java
index 69fd8299deb..ae5be2f210d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java
@@ -35,7 +35,7 @@ public interface SchedulingPipelinedRegion
      *
      * @return set of {@link ConsumedPartitionGroup}s
      */
-    Iterable<ConsumedPartitionGroup> getAllBlockingConsumedPartitionGroups();
+    Iterable<ConsumedPartitionGroup> 
getAllNonPipelinedConsumedPartitionGroups();
 
     /**
      * Get all distinct releaseByScheduler {@link ConsumedPartitionGroup}s.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
index 949f039727c..147c91094fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
@@ -147,7 +147,7 @@ public class DefaultSchedulingPipelinedRegionTest extends 
TestLogger {
         final Set<IntermediateResultPartitionID> 
secondPipelinedRegionConsumedResults =
                 new HashSet<>();
         for (ConsumedPartitionGroup consumedPartitionGroup :
-                secondPipelinedRegion.getAllBlockingConsumedPartitionGroups()) 
{
+                
secondPipelinedRegion.getAllNonPipelinedConsumedPartitionGroups()) {
             for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
                 if (!secondPipelinedRegion.contains(
                         
topology.getResultPartition(partitionId).getProducer().getId())) {
@@ -157,7 +157,10 @@ public class DefaultSchedulingPipelinedRegionTest extends 
TestLogger {
         }
 
         assertThat(
-                
firstPipelinedRegion.getAllBlockingConsumedPartitionGroups().iterator().hasNext(),
+                firstPipelinedRegion
+                        .getAllNonPipelinedConsumedPartitionGroups()
+                        .iterator()
+                        .hasNext(),
                 is(false));
         assertThat(secondPipelinedRegionConsumedResults, 
contains(b0ConsumedResultPartition));
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
index 34395e02999..c99b816d532 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
@@ -267,7 +267,7 @@ class PipelinedRegionSchedulingStrategyTest {
     }
 
     @Test
-    void testSchedulingTopologyWithCrossRegionConsumedPartitionGroups() throws 
Exception {
+    void 
testSchedulingTopologyWithBlockingCrossRegionConsumedPartitionGroups() throws 
Exception {
         final JobVertex v1 = createJobVertex("v1", 4);
         final JobVertex v2 = createJobVertex("v2", 3);
         final JobVertex v3 = createJobVertex("v3", 2);
@@ -339,6 +339,74 @@ class PipelinedRegionSchedulingStrategyTest {
         }
     }
 
+    @Test
+    void testSchedulingTopologyWithHybridCrossRegionConsumedPartitionGroups() 
throws Exception {
+        final JobVertex v1 = createJobVertex("v1", 4);
+        final JobVertex v2 = createJobVertex("v2", 3);
+        final JobVertex v3 = createJobVertex("v3", 2);
+
+        v2.connectNewDataSetAsInput(
+                v1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+        v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, 
ResultPartitionType.HYBRID);
+        v3.connectNewDataSetAsInput(
+                v1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+
+        final List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, 
v3));
+        final JobGraph jobGraph =
+                
JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final SchedulingTopology schedulingTopology = 
executionGraph.getSchedulingTopology();
+
+        // Test whether the topology is built correctly
+        final List<SchedulingPipelinedRegion> regions = new ArrayList<>();
+        schedulingTopology.getAllPipelinedRegions().forEach(regions::add);
+        assertThat(regions).hasSize(2);
+
+        final ExecutionVertex v31 = 
executionGraph.getJobVertex(v3.getID()).getTaskVertices()[0];
+
+        final Set<ExecutionVertexID> region1 = new HashSet<>();
+        schedulingTopology
+                .getPipelinedRegionOfVertex(v31.getID())
+                .getVertices()
+                .forEach(vertex -> region1.add(vertex.getId()));
+        assertThat(region1).hasSize(5);
+
+        final ExecutionVertex v32 = 
executionGraph.getJobVertex(v3.getID()).getTaskVertices()[1];
+
+        final Set<ExecutionVertexID> region2 = new HashSet<>();
+        schedulingTopology
+                .getPipelinedRegionOfVertex(v32.getID())
+                .getVertices()
+                .forEach(vertex -> region2.add(vertex.getId()));
+        assertThat(region2).hasSize(4);
+
+        startScheduling(schedulingTopology);
+
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2);
+
+        // Test whether region 1 is scheduled correctly
+        final List<ExecutionVertexID> scheduledVertices1 =
+                testingSchedulerOperation.getScheduledVertices().get(0);
+        assertThat(scheduledVertices1).hasSize(5);
+
+        for (ExecutionVertexID vertexId : scheduledVertices1) {
+            assertThat(region1).contains(vertexId);
+        }
+
+        // Test whether region 2 is scheduled correctly
+        final List<ExecutionVertexID> scheduledVertices2 =
+                testingSchedulerOperation.getScheduledVertices().get(1);
+        assertThat(scheduledVertices2).hasSize(4);
+
+        for (ExecutionVertexID vertexId : scheduledVertices2) {
+            assertThat(region2).contains(vertexId);
+        }
+    }
+
     @Test
     void testScheduleBlockingDownstreamTaskIndividually() throws Exception {
         final JobVertex v1 = createJobVertex("v1", 2);
@@ -397,6 +465,60 @@ class PipelinedRegionSchedulingStrategyTest {
         
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2);
     }
 
+    /**
+     * Source and it's downstream with hybrid edge will be scheduled. When 
blocking result partition
+     * finished, it's downstream will be scheduled.
+     *
+     * <pre>
+     * V1 ----> V2 ----> V3 ----> V4
+     *     |        |         |
+     *  Hybrid   Blocking   Hybrid
+     * </pre>
+     */
+    @Test
+    void testScheduleTopologyWithHybridAndBlockingEdge() throws Exception {
+        final JobVertex v1 = createJobVertex("v1", 1);
+        final JobVertex v2 = createJobVertex("v2", 1);
+        final JobVertex v3 = createJobVertex("v3", 1);
+        final JobVertex v4 = createJobVertex("v4", 1);
+
+        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.HYBRID);
+        v3.connectNewDataSetAsInput(
+                v2, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
+        v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE, 
ResultPartitionType.HYBRID);
+
+        final List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, 
v3, v4));
+        final JobGraph jobGraph =
+                
JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final SchedulingTopology schedulingTopology = 
executionGraph.getSchedulingTopology();
+
+        PipelinedRegionSchedulingStrategy schedulingStrategy = 
startScheduling(schedulingTopology);
+
+        // v1 & v2 will be scheduled as v1 is a source and v1 -> v2 is a 
hybrid downstream.
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2);
+        final ExecutionVertex v11 = 
executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0];
+        final ExecutionVertex v21 = 
executionGraph.getJobVertex(v2.getID()).getTaskVertices()[0];
+        assertThat(testingSchedulerOperation.getScheduledVertices().get(0))
+                .containsExactly(v11.getID());
+        assertThat(testingSchedulerOperation.getScheduledVertices().get(1))
+                .containsExactly(v21.getID());
+        // finish v2 to trigger new round of scheduling.
+        v21.finishAllBlockingPartitions();
+        schedulingStrategy.onExecutionStateChange(v21.getID(), 
ExecutionState.FINISHED);
+        
assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(4);
+        final ExecutionVertex v31 = 
executionGraph.getJobVertex(v3.getID()).getTaskVertices()[0];
+        final ExecutionVertex v41 = 
executionGraph.getJobVertex(v4.getID()).getTaskVertices()[0];
+        assertThat(testingSchedulerOperation.getScheduledVertices().get(2))
+                .containsExactly(v31.getID());
+        assertThat(testingSchedulerOperation.getScheduledVertices().get(3))
+                .containsExactly(v41.getID());
+    }
+
     /** Inner non-pipelined edge will not affect it's region be scheduled. */
     @Test
     void testSchedulingRegionWithInnerNonPipelinedEdge() throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
index f18b2513739..aecb5f0a2fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
@@ -82,7 +82,7 @@ public class TestingSchedulingPipelinedRegion implements 
SchedulingPipelinedRegi
     }
 
     @Override
-    public Iterable<ConsumedPartitionGroup> 
getAllBlockingConsumedPartitionGroups() {
+    public Iterable<ConsumedPartitionGroup> 
getAllNonPipelinedConsumedPartitionGroups() {
         return Collections.unmodifiableSet(blockingConsumedPartitionGroups);
     }
 

Reply via email to