Thesharing commented on a change in pull request #15310:
URL: https://github.com/apache/flink/pull/15310#discussion_r602992946



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java
##########
@@ -23,10 +23,19 @@
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.topology.PipelinedRegion;
 
+import java.util.List;
+
 /** Pipelined region on execution level, i.e., {@link ExecutionGraph} level. */
 public interface SchedulingPipelinedRegion
         extends PipelinedRegion<
                 ExecutionVertexID,
                 IntermediateResultPartitionID,
                 SchedulingExecutionVertex,
-                SchedulingResultPartition> {}
+                SchedulingResultPartition> {
+    /**
+     * Get all blocking {@link ConsumedPartitionGroup}s.
+     *
+     * @return list of {@link ConsumedPartitionGroup}s
+     */
+    List<ConsumedPartitionGroup> getAllBlockingConsumedPartitionGroups();

Review comment:
       Resolved. I'm wondering should we change 
`SchedulingExecutionVertex#getConsumedPartitionGroups` and 
`SchedulingResultPartition#getConsumerVertexGroups` into `Iterable`, too? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -19,30 +19,51 @@
 
 package org.apache.flink.runtime.scheduler.adapter;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 /** Default implementation of {@link SchedulingPipelinedRegion}. */
 public class DefaultSchedulingPipelinedRegion implements 
SchedulingPipelinedRegion {
 
     private final Map<ExecutionVertexID, DefaultExecutionVertex> 
executionVertices;
 
-    private Set<DefaultResultPartition> consumedResults;
+    private List<ConsumedPartitionGroup> blockingConsumedPartitionGroups;
+
+    private final Function<IntermediateResultPartitionID, 
DefaultResultPartition>
+            resultPartitionRetriever;
+
+    public DefaultSchedulingPipelinedRegion(
+            Set<DefaultExecutionVertex> defaultExecutionVertices,
+            Function<IntermediateResultPartitionID, DefaultResultPartition>
+                    resultPartitionRetriever) {
 
-    public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex> 
defaultExecutionVertices) {
         Preconditions.checkNotNull(defaultExecutionVertices);
 
         this.executionVertices = new HashMap<>();
         for (DefaultExecutionVertex executionVertex : 
defaultExecutionVertices) {
             this.executionVertices.put(executionVertex.getId(), 
executionVertex);
         }
+
+        this.resultPartitionRetriever = resultPartitionRetriever;

Review comment:
       Thanks for reminding. Added.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
##########
@@ -30,15 +33,24 @@
     private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex> 
regionVertices =
             new HashMap<>();
 
-    private final Set<TestingSchedulingResultPartition> consumedPartitions = 
new HashSet<>();
+    private final List<ConsumedPartitionGroup> consumedPartitionGroups = new 
ArrayList<>();

Review comment:
       Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
##########
@@ -31,7 +31,7 @@
 import java.util.stream.Collectors;
 
 /** Utils for {@link SchedulingStrategy}. */
-class SchedulingStrategyUtils {
+public class SchedulingStrategyUtils {

Review comment:
       Sorry that I forget to remove it since I removed the methods added in 
the previous commits. Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
         }
     }
 
+    private void initCrossRegionConsumerPartitionGroups() {

Review comment:
       Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -19,30 +19,51 @@
 
 package org.apache.flink.runtime.scheduler.adapter;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 /** Default implementation of {@link SchedulingPipelinedRegion}. */
 public class DefaultSchedulingPipelinedRegion implements 
SchedulingPipelinedRegion {
 
     private final Map<ExecutionVertexID, DefaultExecutionVertex> 
executionVertices;
 
-    private Set<DefaultResultPartition> consumedResults;
+    private List<ConsumedPartitionGroup> blockingConsumedPartitionGroups;
+
+    private final Function<IntermediateResultPartitionID, 
DefaultResultPartition>
+            resultPartitionRetriever;
+
+    public DefaultSchedulingPipelinedRegion(
+            Set<DefaultExecutionVertex> defaultExecutionVertices,
+            Function<IntermediateResultPartitionID, DefaultResultPartition>
+                    resultPartitionRetriever) {
 
-    public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex> 
defaultExecutionVertices) {
         Preconditions.checkNotNull(defaultExecutionVertices);
 
         this.executionVertices = new HashMap<>();
         for (DefaultExecutionVertex executionVertex : 
defaultExecutionVertices) {
             this.executionVertices.put(executionVertex.getId(), 
executionVertex);
         }
+
+        this.resultPartitionRetriever = resultPartitionRetriever;
+    }
+
+    @VisibleForTesting
+    public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex> 
defaultExecutionVertices) {

Review comment:
       Agreed. Removed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -60,23 +81,33 @@ public DefaultExecutionVertex getVertex(final 
ExecutionVertexID vertexId) {
         return executionVertex;
     }
 
-    @Override
-    public Iterable<DefaultResultPartition> getConsumedResults() {
-        if (consumedResults == null) {
-            initializeConsumedResults();
-        }
-        return consumedResults;
-    }
-
-    private void initializeConsumedResults() {
-        final Set<DefaultResultPartition> consumedResults = new HashSet<>();
+    private void initializeAllBlockingConsumedPartitionGroups() {
+        final Set<ConsumedPartitionGroup> consumedResultGroupSet = new 
HashSet<>();
         for (DefaultExecutionVertex executionVertex : 
executionVertices.values()) {
-            for (DefaultResultPartition resultPartition : 
executionVertex.getConsumedResults()) {
-                if 
(!executionVertices.containsKey(resultPartition.getProducer().getId())) {
-                    consumedResults.add(resultPartition);
+            for (ConsumedPartitionGroup consumedResultGroup :
+                    executionVertex.getConsumedPartitionGroups()) {
+                SchedulingResultPartition resultPartition =
+                        
resultPartitionRetriever.apply(consumedResultGroup.getFirst());
+                if (resultPartition.getResultType().isBlocking()) {
+                    consumedResultGroupSet.add(consumedResultGroup);
                 }
             }
         }
-        this.consumedResults = Collections.unmodifiableSet(consumedResults);
+
+        this.blockingConsumedPartitionGroups =

Review comment:
       Agreed. Resolved.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
##########
@@ -47,12 +47,16 @@
 
     private ResultPartitionState state;
 
-    TestingSchedulingResultPartition(
-            IntermediateDataSetID dataSetID, ResultPartitionType type, 
ResultPartitionState state) {
+    private TestingSchedulingResultPartition(
+            IntermediateDataSetID dataSetID,
+            int partitionNum,

Review comment:
       `testNotifyingBlockingResultPartitionProducerFinished` and 
`testFinishedBlockingResultPartitionProducerDoNotScheduleNonCreatedRegions` in 
`PipelinedRegionSchedulingStrategyTest`.
   
   I prefer this should be changed since the `correlatedResultPartitionGroups`  
in `PipelinedRegionSchedulingStrategy` use `IntermediateDataSetID` as key. It's 
more reasonable to make `IntermediateResultPartitionID` has the same 
`IntermediateDataSetID`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
##########
@@ -30,15 +33,24 @@
     private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex> 
regionVertices =
             new HashMap<>();
 
-    private final Set<TestingSchedulingResultPartition> consumedPartitions = 
new HashSet<>();
+    private final List<ConsumedPartitionGroup> consumedPartitionGroups = new 
ArrayList<>();
+
+    private final Map<IntermediateResultPartitionID, 
TestingSchedulingResultPartition>

Review comment:
       Changed this field into a local variable.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -49,16 +50,21 @@
 
     private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
 
-    /** Result partitions are correlated if they have the same result id. */
-    private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>>
-            correlatedResultPartitions = new HashMap<>();
+    /** ConsumedPartitionGroups are correlated if they have the same result 
id. */
+    private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>>
+            correlatedResultPartitionGroups = new HashMap<>();
 
-    private final Map<IntermediateResultPartitionID, 
Set<SchedulingPipelinedRegion>>
-            partitionConsumerRegions = new HashMap<>();
+    /** External consumer regions of each ConsumedPartitionGroup. */
+    private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
+            partitionGroupConsumerRegions = new HashMap<>();
 
     private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>> 
regionVerticesSorted =
             new IdentityHashMap<>();
 
+    /** The ConsumedPartitionGroups which have inter-region producers and 
inter-region producers. */
+    private final Set<ConsumedPartitionGroup> 
crossRegionConsumerPartitionGroups =

Review comment:
       Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
##########
@@ -74,7 +74,10 @@ private void initRegionExecutionViewByVertex() {
         if (regionExecutionView.isFinished()) {
             final SchedulingPipelinedRegion pipelinedRegion =
                     
schedulingTopology.getPipelinedRegionOfVertex(finishedVertex);
-            return 
filterReleasablePartitions(pipelinedRegion.getConsumedResults());
+            return filterReleasablePartitions(

Review comment:
       This part will be improved by FLINK-21331 later. But in this PR, first, 
I'll make sure `getAllBlockingConsumedPartitionGroups` return `Set`. Also I'll 
add a `distinct()` in `filterReleasablePartitions`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
         }
     }
 
+    private void initCrossRegionConsumerPartitionGroups() {
+        Set<ConsumedPartitionGroup> visitedPartitionGroups =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+
+        for (SchedulingPipelinedRegion pipelinedRegion :
+                schedulingTopology.getAllPipelinedRegions()) {
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+                if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+                    visitedPartitionGroups.add(consumedPartitionGroup);
+
+                    SchedulingPipelinedRegion producerRegion = null;
+                    for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                        SchedulingPipelinedRegion region = 
getProducerRegion(partitionId);
+                        if (producerRegion == null) {
+                            producerRegion = region;
+                        } else if (producerRegion != region) {
+                            
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private SchedulingPipelinedRegion 
getProducerRegion(IntermediateResultPartitionID partitionId) {
+        return schedulingTopology.getPipelinedRegionOfVertex(
+                
schedulingTopology.getResultPartition(partitionId).getProducer().getId());
+    }
+
+    private void initPartitionGroupConsumerRegions() {
+        for (SchedulingPipelinedRegion region : 
schedulingTopology.getAllPipelinedRegions()) {
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    region.getAllBlockingConsumedPartitionGroups()) {
+                if 
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
+                        || 
isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+                    partitionGroupConsumerRegions
+                            .computeIfAbsent(consumedPartitionGroup, group -> 
new HashSet<>())
+                            .add(region);
+                }
+            }
+        }
+    }
+
+    private void initCorrelatedResultPartitionGroups() {
+        for (ConsumedPartitionGroup consumedPartitionGroup :
+                partitionGroupConsumerRegions.keySet()) {
+            for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                correlatedResultPartitionGroups
+                        .computeIfAbsent(
+                                partitionId.getIntermediateDataSetID(), id -> 
new HashSet<>())
+                        .add(consumedPartitionGroup);
+            }
+        }
+    }
+
     @Override
     public void startScheduling() {
         final Set<SchedulingPipelinedRegion> sourceRegions =
                 
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
-                        .filter(region -> 
!region.getConsumedResults().iterator().hasNext())
+                        .filter(this::noExternalBlockingConsumedPartitions)
                         .collect(Collectors.toSet());
         maybeScheduleRegions(sourceRegions);
     }
 
+    private boolean 
noExternalBlockingConsumedPartitions(SchedulingPipelinedRegion region) {
+        for (ConsumedPartitionGroup consumedPartitionGroup :
+                region.getAllBlockingConsumedPartitionGroups()) {
+            if 
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)

Review comment:
       I'd prefer that the region with cross-region partitions should not be 
scheduled like a source region. A test case has been added.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -49,16 +50,21 @@
 
     private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
 
-    /** Result partitions are correlated if they have the same result id. */
-    private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>>
-            correlatedResultPartitions = new HashMap<>();
+    /** ConsumedPartitionGroups are correlated if they have the same result 
id. */
+    private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>>
+            correlatedResultPartitionGroups = new HashMap<>();
 
-    private final Map<IntermediateResultPartitionID, 
Set<SchedulingPipelinedRegion>>
-            partitionConsumerRegions = new HashMap<>();
+    /** External consumer regions of each ConsumedPartitionGroup. */
+    private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
+            partitionGroupConsumerRegions = new HashMap<>();
 
     private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>> 
regionVerticesSorted =
             new IdentityHashMap<>();
 
+    /** The ConsumedPartitionGroups which have inter-region producers and 
inter-region producers. */

Review comment:
       Thanks for pointing this out. Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
         }
     }
 
+    private void initCrossRegionConsumerPartitionGroups() {
+        Set<ConsumedPartitionGroup> visitedPartitionGroups =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+
+        for (SchedulingPipelinedRegion pipelinedRegion :
+                schedulingTopology.getAllPipelinedRegions()) {
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+                if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+                    visitedPartitionGroups.add(consumedPartitionGroup);
+
+                    SchedulingPipelinedRegion producerRegion = null;
+                    for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                        SchedulingPipelinedRegion region = 
getProducerRegion(partitionId);
+                        if (producerRegion == null) {
+                            producerRegion = region;
+                        } else if (producerRegion != region) {
+                            
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private SchedulingPipelinedRegion 
getProducerRegion(IntermediateResultPartitionID partitionId) {
+        return schedulingTopology.getPipelinedRegionOfVertex(
+                
schedulingTopology.getResultPartition(partitionId).getProducer().getId());
+    }
+
+    private void initPartitionGroupConsumerRegions() {
+        for (SchedulingPipelinedRegion region : 
schedulingTopology.getAllPipelinedRegions()) {
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    region.getAllBlockingConsumedPartitionGroups()) {
+                if 
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
+                        || 
isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+                    partitionGroupConsumerRegions
+                            .computeIfAbsent(consumedPartitionGroup, group -> 
new HashSet<>())
+                            .add(region);
+                }
+            }
+        }
+    }
+
+    private void initCorrelatedResultPartitionGroups() {
+        for (ConsumedPartitionGroup consumedPartitionGroup :
+                partitionGroupConsumerRegions.keySet()) {
+            for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                correlatedResultPartitionGroups
+                        .computeIfAbsent(
+                                partitionId.getIntermediateDataSetID(), id -> 
new HashSet<>())
+                        .add(consumedPartitionGroup);
+            }
+        }
+    }
+
     @Override
     public void startScheduling() {
         final Set<SchedulingPipelinedRegion> sourceRegions =
                 
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
-                        .filter(region -> 
!region.getConsumedResults().iterator().hasNext())
+                        .filter(this::noExternalBlockingConsumedPartitions)
                         .collect(Collectors.toSet());
         maybeScheduleRegions(sourceRegions);
     }
 
+    private boolean 
noExternalBlockingConsumedPartitions(SchedulingPipelinedRegion region) {
+        for (ConsumedPartitionGroup consumedPartitionGroup :
+                region.getAllBlockingConsumedPartitionGroups()) {
+            if 
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)

Review comment:
       Furthermore, @zhuzhurk and I find that for the case illustrated in 
https://github.com/apache/flink/pull/15310#issuecomment-806018417, the region 2 
won't be scheduled, even if we don't have this PR. It's because the result 
partition produced by the third vertex in layer 2 is not consumable since it 
locates in region 2. This leading to that the IntermediateDataSet it belongs to 
will never be consumable. And region 2 will never be scheduled. We are going to 
discuss this issue in a new JIRA.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
         }
     }
 
+    private void initCrossRegionConsumerPartitionGroups() {
+        Set<ConsumedPartitionGroup> visitedPartitionGroups =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+
+        for (SchedulingPipelinedRegion pipelinedRegion :
+                schedulingTopology.getAllPipelinedRegions()) {
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+                if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+                    visitedPartitionGroups.add(consumedPartitionGroup);
+
+                    SchedulingPipelinedRegion producerRegion = null;
+                    for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                        SchedulingPipelinedRegion region = 
getProducerRegion(partitionId);
+                        if (producerRegion == null) {
+                            producerRegion = region;
+                        } else if (producerRegion != region) {
+                            
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);

Review comment:
       Yes. Sorry for being careless. Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
         }
     }
 
+    private void initCrossRegionConsumerPartitionGroups() {
+        Set<ConsumedPartitionGroup> visitedPartitionGroups =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+
+        for (SchedulingPipelinedRegion pipelinedRegion :
+                schedulingTopology.getAllPipelinedRegions()) {
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+                if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+                    visitedPartitionGroups.add(consumedPartitionGroup);
+
+                    SchedulingPipelinedRegion producerRegion = null;
+                    for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                        SchedulingPipelinedRegion region = 
getProducerRegion(partitionId);
+                        if (producerRegion == null) {
+                            producerRegion = region;
+                        } else if (producerRegion != region) {
+                            
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private SchedulingPipelinedRegion 
getProducerRegion(IntermediateResultPartitionID partitionId) {
+        return schedulingTopology.getPipelinedRegionOfVertex(
+                
schedulingTopology.getResultPartition(partitionId).getProducer().getId());
+    }
+
+    private void initPartitionGroupConsumerRegions() {
+        for (SchedulingPipelinedRegion region : 
schedulingTopology.getAllPipelinedRegions()) {
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    region.getAllBlockingConsumedPartitionGroups()) {
+                if 
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
+                        || 
isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+                    partitionGroupConsumerRegions
+                            .computeIfAbsent(consumedPartitionGroup, group -> 
new HashSet<>())
+                            .add(region);
+                }
+            }
+        }
+    }
+
+    private void initCorrelatedResultPartitionGroups() {
+        for (ConsumedPartitionGroup consumedPartitionGroup :
+                partitionGroupConsumerRegions.keySet()) {
+            for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                correlatedResultPartitionGroups
+                        .computeIfAbsent(
+                                partitionId.getIntermediateDataSetID(), id -> 
new HashSet<>())
+                        .add(consumedPartitionGroup);
+            }
+        }
+    }
+
     @Override
     public void startScheduling() {
         final Set<SchedulingPipelinedRegion> sourceRegions =
                 
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
-                        .filter(region -> 
!region.getConsumedResults().iterator().hasNext())
+                        .filter(this::noExternalBlockingConsumedPartitions)
                         .collect(Collectors.toSet());
         maybeScheduleRegions(sourceRegions);
     }
 
+    private boolean 
noExternalBlockingConsumedPartitions(SchedulingPipelinedRegion region) {

Review comment:
       Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -175,15 +246,50 @@ private void maybeScheduleRegion(final 
SchedulingPipelinedRegion region) {
         schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
     }
 
-    private boolean areRegionInputsAllConsumable(final 
SchedulingPipelinedRegion region) {
-        for (SchedulingResultPartition partition : 
region.getConsumedResults()) {
-            if (partition.getState() != ResultPartitionState.CONSUMABLE) {
+    private boolean areRegionInputsAllConsumable(
+            final SchedulingPipelinedRegion region,
+            final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+        for (ConsumedPartitionGroup consumedPartitionGroup :
+                region.getAllBlockingConsumedPartitionGroups()) {
+            if 
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)) {
+                if 
(!isCrossRegionConsumedPartitionConsumable(consumedPartitionGroup, region)) {
+                    return false;
+                }
+            } else if 
(isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+                if (!consumableStatusCache.computeIfAbsent(
+                        consumedPartitionGroup, 
this::isConsumedPartitionGroupConsumable)) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private boolean isConsumedPartitionGroupConsumable(
+            final ConsumedPartitionGroup consumedPartitionGroup) {
+        for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+            if (schedulingTopology.getResultPartition(partitionId).getState()
+                    != ResultPartitionState.CONSUMABLE) {
                 return false;
             }
         }
         return true;
     }
 
+    private boolean isCrossRegionConsumedPartitionConsumable(
+            final ConsumedPartitionGroup consumedPartitionGroup,
+            final SchedulingPipelinedRegion pipelinedRegion) {
+        for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+            if (isExternalConsumedPartition(partitionId, pipelinedRegion)) {

Review comment:
       Thanks for pointing this out. Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -49,16 +50,21 @@
 
     private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
 
-    /** Result partitions are correlated if they have the same result id. */
-    private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>>
-            correlatedResultPartitions = new HashMap<>();
+    /** ConsumedPartitionGroups are correlated if they have the same result 
id. */
+    private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>>
+            correlatedResultPartitionGroups = new HashMap<>();
 
-    private final Map<IntermediateResultPartitionID, 
Set<SchedulingPipelinedRegion>>
-            partitionConsumerRegions = new HashMap<>();
+    /** External consumer regions of each ConsumedPartitionGroup. */
+    private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
+            partitionGroupConsumerRegions = new HashMap<>();

Review comment:
       Resolved.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to