zhuzhurk commented on a change in pull request #15310:
URL: https://github.com/apache/flink/pull/15310#discussion_r602884482



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -19,30 +19,51 @@
 
 package org.apache.flink.runtime.scheduler.adapter;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 /** Default implementation of {@link SchedulingPipelinedRegion}. */
 public class DefaultSchedulingPipelinedRegion implements 
SchedulingPipelinedRegion {
 
     private final Map<ExecutionVertexID, DefaultExecutionVertex> 
executionVertices;
 
-    private Set<DefaultResultPartition> consumedResults;
+    private List<ConsumedPartitionGroup> blockingConsumedPartitionGroups;
+
+    private final Function<IntermediateResultPartitionID, 
DefaultResultPartition>
+            resultPartitionRetriever;
+
+    public DefaultSchedulingPipelinedRegion(
+            Set<DefaultExecutionVertex> defaultExecutionVertices,
+            Function<IntermediateResultPartitionID, DefaultResultPartition>
+                    resultPartitionRetriever) {
 
-    public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex> 
defaultExecutionVertices) {
         Preconditions.checkNotNull(defaultExecutionVertices);
 
         this.executionVertices = new HashMap<>();
         for (DefaultExecutionVertex executionVertex : 
defaultExecutionVertices) {
             this.executionVertices.put(executionVertex.getId(), 
executionVertex);
         }
+
+        this.resultPartitionRetriever = resultPartitionRetriever;
+    }
+
+    @VisibleForTesting
+    public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex> 
defaultExecutionVertices) {

Review comment:
       Can we avoid introducing such a testing method? I also think 
`resultPartitionRetriever` should never be `null`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
##########
@@ -47,12 +47,16 @@
 
     private ResultPartitionState state;
 
-    TestingSchedulingResultPartition(
-            IntermediateDataSetID dataSetID, ResultPartitionType type, 
ResultPartitionState state) {
+    private TestingSchedulingResultPartition(
+            IntermediateDataSetID dataSetID,
+            int partitionNum,

Review comment:
       Which tests will be broken if not doing this change?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -49,16 +50,21 @@
 
     private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
 
-    /** Result partitions are correlated if they have the same result id. */
-    private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>>
-            correlatedResultPartitions = new HashMap<>();
+    /** ConsumedPartitionGroups are correlated if they have the same result 
id. */
+    private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>>
+            correlatedResultPartitionGroups = new HashMap<>();
 
-    private final Map<IntermediateResultPartitionID, 
Set<SchedulingPipelinedRegion>>
-            partitionConsumerRegions = new HashMap<>();
+    /** External consumer regions of each ConsumedPartitionGroup. */
+    private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
+            partitionGroupConsumerRegions = new HashMap<>();

Review comment:
       better to use `IdentityHashMap`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
##########
@@ -74,7 +74,10 @@ private void initRegionExecutionViewByVertex() {
         if (regionExecutionView.isFinished()) {
             final SchedulingPipelinedRegion pipelinedRegion =
                     
schedulingTopology.getPipelinedRegionOfVertex(finishedVertex);
-            return 
filterReleasablePartitions(pipelinedRegion.getConsumedResults());
+            return filterReleasablePartitions(

Review comment:
       I think the input of `filterReleasablePartitions` should be changed to a 
`Set` to ensure there is no duplicated element. Or at least we should add a 
`unique()` filter in the implementation of `filterReleasablePartitions`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -49,16 +50,21 @@
 
     private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
 
-    /** Result partitions are correlated if they have the same result id. */
-    private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>>
-            correlatedResultPartitions = new HashMap<>();
+    /** ConsumedPartitionGroups are correlated if they have the same result 
id. */
+    private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>>
+            correlatedResultPartitionGroups = new HashMap<>();
 
-    private final Map<IntermediateResultPartitionID, 
Set<SchedulingPipelinedRegion>>
-            partitionConsumerRegions = new HashMap<>();
+    /** External consumer regions of each ConsumedPartitionGroup. */
+    private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
+            partitionGroupConsumerRegions = new HashMap<>();
 
     private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>> 
regionVerticesSorted =
             new IdentityHashMap<>();
 
+    /** The ConsumedPartitionGroups which have inter-region producers and 
inter-region producers. */
+    private final Set<ConsumedPartitionGroup> 
crossRegionConsumerPartitionGroups =

Review comment:
       `crossRegionConsumerPartitionGroups` -> 
`crossRegionConsumedPartitionGroups `

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
         }
     }
 
+    private void initCrossRegionConsumerPartitionGroups() {
+        Set<ConsumedPartitionGroup> visitedPartitionGroups =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+
+        for (SchedulingPipelinedRegion pipelinedRegion :
+                schedulingTopology.getAllPipelinedRegions()) {
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+                if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+                    visitedPartitionGroups.add(consumedPartitionGroup);
+
+                    SchedulingPipelinedRegion producerRegion = null;
+                    for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                        SchedulingPipelinedRegion region = 
getProducerRegion(partitionId);
+                        if (producerRegion == null) {
+                            producerRegion = region;
+                        } else if (producerRegion != region) {
+                            
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);

Review comment:
       break;

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
         }
     }
 
+    private void initCrossRegionConsumerPartitionGroups() {
+        Set<ConsumedPartitionGroup> visitedPartitionGroups =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+
+        for (SchedulingPipelinedRegion pipelinedRegion :
+                schedulingTopology.getAllPipelinedRegions()) {
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+                if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+                    visitedPartitionGroups.add(consumedPartitionGroup);
+
+                    SchedulingPipelinedRegion producerRegion = null;
+                    for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                        SchedulingPipelinedRegion region = 
getProducerRegion(partitionId);
+                        if (producerRegion == null) {
+                            producerRegion = region;
+                        } else if (producerRegion != region) {
+                            
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private SchedulingPipelinedRegion 
getProducerRegion(IntermediateResultPartitionID partitionId) {
+        return schedulingTopology.getPipelinedRegionOfVertex(
+                
schedulingTopology.getResultPartition(partitionId).getProducer().getId());
+    }
+
+    private void initPartitionGroupConsumerRegions() {
+        for (SchedulingPipelinedRegion region : 
schedulingTopology.getAllPipelinedRegions()) {
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    region.getAllBlockingConsumedPartitionGroups()) {
+                if 
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
+                        || 
isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+                    partitionGroupConsumerRegions
+                            .computeIfAbsent(consumedPartitionGroup, group -> 
new HashSet<>())
+                            .add(region);
+                }
+            }
+        }
+    }
+
+    private void initCorrelatedResultPartitionGroups() {
+        for (ConsumedPartitionGroup consumedPartitionGroup :
+                partitionGroupConsumerRegions.keySet()) {
+            for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                correlatedResultPartitionGroups
+                        .computeIfAbsent(
+                                partitionId.getIntermediateDataSetID(), id -> 
new HashSet<>())
+                        .add(consumedPartitionGroup);
+            }
+        }
+    }
+
     @Override
     public void startScheduling() {
         final Set<SchedulingPipelinedRegion> sourceRegions =
                 
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
-                        .filter(region -> 
!region.getConsumedResults().iterator().hasNext())
+                        .filter(this::noExternalBlockingConsumedPartitions)
                         .collect(Collectors.toSet());
         maybeScheduleRegions(sourceRegions);
     }
 
+    private boolean 
noExternalBlockingConsumedPartitions(SchedulingPipelinedRegion region) {

Review comment:
       I would prefer this method to be `isSourceRegion`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java
##########
@@ -23,10 +23,19 @@
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.topology.PipelinedRegion;
 
+import java.util.List;
+
 /** Pipelined region on execution level, i.e., {@link ExecutionGraph} level. */
 public interface SchedulingPipelinedRegion
         extends PipelinedRegion<
                 ExecutionVertexID,
                 IntermediateResultPartitionID,
                 SchedulingExecutionVertex,
-                SchedulingResultPartition> {}
+                SchedulingResultPartition> {
+    /**
+     * Get all blocking {@link ConsumedPartitionGroup}s.
+     *
+     * @return list of {@link ConsumedPartitionGroup}s
+     */
+    List<ConsumedPartitionGroup> getAllBlockingConsumedPartitionGroups();

Review comment:
       I would suggest this to be an `Iterable`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
         }
     }
 
+    private void initCrossRegionConsumerPartitionGroups() {

Review comment:
       `initCrossRegionConsumerPartitionGroups` -> 
`initCrossRegionConsumedPartitionGroups `

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -49,16 +50,21 @@
 
     private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
 
-    /** Result partitions are correlated if they have the same result id. */
-    private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>>
-            correlatedResultPartitions = new HashMap<>();
+    /** ConsumedPartitionGroups are correlated if they have the same result 
id. */
+    private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>>
+            correlatedResultPartitionGroups = new HashMap<>();
 
-    private final Map<IntermediateResultPartitionID, 
Set<SchedulingPipelinedRegion>>
-            partitionConsumerRegions = new HashMap<>();
+    /** External consumer regions of each ConsumedPartitionGroup. */
+    private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
+            partitionGroupConsumerRegions = new HashMap<>();
 
     private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>> 
regionVerticesSorted =
             new IdentityHashMap<>();
 
+    /** The ConsumedPartitionGroups which have inter-region producers and 
inter-region producers. */

Review comment:
       `which have inter-region producers and inter-region producers.` -> 
`which are produced by multiple regions.`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -19,30 +19,51 @@
 
 package org.apache.flink.runtime.scheduler.adapter;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 /** Default implementation of {@link SchedulingPipelinedRegion}. */
 public class DefaultSchedulingPipelinedRegion implements 
SchedulingPipelinedRegion {
 
     private final Map<ExecutionVertexID, DefaultExecutionVertex> 
executionVertices;
 
-    private Set<DefaultResultPartition> consumedResults;
+    private List<ConsumedPartitionGroup> blockingConsumedPartitionGroups;
+
+    private final Function<IntermediateResultPartitionID, 
DefaultResultPartition>
+            resultPartitionRetriever;
+
+    public DefaultSchedulingPipelinedRegion(
+            Set<DefaultExecutionVertex> defaultExecutionVertices,
+            Function<IntermediateResultPartitionID, DefaultResultPartition>
+                    resultPartitionRetriever) {
 
-    public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex> 
defaultExecutionVertices) {
         Preconditions.checkNotNull(defaultExecutionVertices);
 
         this.executionVertices = new HashMap<>();
         for (DefaultExecutionVertex executionVertex : 
defaultExecutionVertices) {
             this.executionVertices.put(executionVertex.getId(), 
executionVertex);
         }
+
+        this.resultPartitionRetriever = resultPartitionRetriever;

Review comment:
       checkNotNull

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
##########
@@ -30,15 +33,24 @@
     private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex> 
regionVertices =
             new HashMap<>();
 
-    private final Set<TestingSchedulingResultPartition> consumedPartitions = 
new HashSet<>();
+    private final List<ConsumedPartitionGroup> consumedPartitionGroups = new 
ArrayList<>();
+
+    private final Map<IntermediateResultPartitionID, 
TestingSchedulingResultPartition>

Review comment:
       Is it needed as a field?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
##########
@@ -30,15 +33,24 @@
     private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex> 
regionVertices =
             new HashMap<>();
 
-    private final Set<TestingSchedulingResultPartition> consumedPartitions = 
new HashSet<>();
+    private final List<ConsumedPartitionGroup> consumedPartitionGroups = new 
ArrayList<>();

Review comment:
       Could we make it a `Set` instead of a `List`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -60,23 +81,33 @@ public DefaultExecutionVertex getVertex(final 
ExecutionVertexID vertexId) {
         return executionVertex;
     }
 
-    @Override
-    public Iterable<DefaultResultPartition> getConsumedResults() {
-        if (consumedResults == null) {
-            initializeConsumedResults();
-        }
-        return consumedResults;
-    }
-
-    private void initializeConsumedResults() {
-        final Set<DefaultResultPartition> consumedResults = new HashSet<>();
+    private void initializeAllBlockingConsumedPartitionGroups() {
+        final Set<ConsumedPartitionGroup> consumedResultGroupSet = new 
HashSet<>();
         for (DefaultExecutionVertex executionVertex : 
executionVertices.values()) {
-            for (DefaultResultPartition resultPartition : 
executionVertex.getConsumedResults()) {
-                if 
(!executionVertices.containsKey(resultPartition.getProducer().getId())) {
-                    consumedResults.add(resultPartition);
+            for (ConsumedPartitionGroup consumedResultGroup :
+                    executionVertex.getConsumedPartitionGroups()) {
+                SchedulingResultPartition resultPartition =
+                        
resultPartitionRetriever.apply(consumedResultGroup.getFirst());
+                if (resultPartition.getResultType().isBlocking()) {
+                    consumedResultGroupSet.add(consumedResultGroup);
                 }
             }
         }
-        this.consumedResults = Collections.unmodifiableSet(consumedResults);
+
+        this.blockingConsumedPartitionGroups =

Review comment:
       There is no need to convert the `Set` to a `List`. I think 
`blockingConsumedPartitionGroups` should just be a `Set`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
##########
@@ -31,7 +31,7 @@
 import java.util.stream.Collectors;
 
 /** Utils for {@link SchedulingStrategy}. */
-class SchedulingStrategyUtils {
+public class SchedulingStrategyUtils {

Review comment:
       There is no need to do this change

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -175,15 +246,50 @@ private void maybeScheduleRegion(final 
SchedulingPipelinedRegion region) {
         schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
     }
 
-    private boolean areRegionInputsAllConsumable(final 
SchedulingPipelinedRegion region) {
-        for (SchedulingResultPartition partition : 
region.getConsumedResults()) {
-            if (partition.getState() != ResultPartitionState.CONSUMABLE) {
+    private boolean areRegionInputsAllConsumable(
+            final SchedulingPipelinedRegion region,
+            final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+        for (ConsumedPartitionGroup consumedPartitionGroup :
+                region.getAllBlockingConsumedPartitionGroups()) {
+            if 
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)) {
+                if 
(!isCrossRegionConsumedPartitionConsumable(consumedPartitionGroup, region)) {
+                    return false;
+                }
+            } else if 
(isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+                if (!consumableStatusCache.computeIfAbsent(
+                        consumedPartitionGroup, 
this::isConsumedPartitionGroupConsumable)) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private boolean isConsumedPartitionGroupConsumable(
+            final ConsumedPartitionGroup consumedPartitionGroup) {
+        for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+            if (schedulingTopology.getResultPartition(partitionId).getState()
+                    != ResultPartitionState.CONSUMABLE) {
                 return false;
             }
         }
         return true;
     }
 
+    private boolean isCrossRegionConsumedPartitionConsumable(
+            final ConsumedPartitionGroup consumedPartitionGroup,
+            final SchedulingPipelinedRegion pipelinedRegion) {
+        for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+            if (isExternalConsumedPartition(partitionId, pipelinedRegion)) {

Review comment:
       It's better to merge the two nested `if`s into one.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
         }
     }
 
+    private void initCrossRegionConsumerPartitionGroups() {
+        Set<ConsumedPartitionGroup> visitedPartitionGroups =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+
+        for (SchedulingPipelinedRegion pipelinedRegion :
+                schedulingTopology.getAllPipelinedRegions()) {
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+                if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+                    visitedPartitionGroups.add(consumedPartitionGroup);
+
+                    SchedulingPipelinedRegion producerRegion = null;
+                    for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                        SchedulingPipelinedRegion region = 
getProducerRegion(partitionId);
+                        if (producerRegion == null) {
+                            producerRegion = region;
+                        } else if (producerRegion != region) {
+                            
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private SchedulingPipelinedRegion 
getProducerRegion(IntermediateResultPartitionID partitionId) {
+        return schedulingTopology.getPipelinedRegionOfVertex(
+                
schedulingTopology.getResultPartition(partitionId).getProducer().getId());
+    }
+
+    private void initPartitionGroupConsumerRegions() {
+        for (SchedulingPipelinedRegion region : 
schedulingTopology.getAllPipelinedRegions()) {
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    region.getAllBlockingConsumedPartitionGroups()) {
+                if 
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
+                        || 
isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+                    partitionGroupConsumerRegions
+                            .computeIfAbsent(consumedPartitionGroup, group -> 
new HashSet<>())
+                            .add(region);
+                }
+            }
+        }
+    }
+
+    private void initCorrelatedResultPartitionGroups() {
+        for (ConsumedPartitionGroup consumedPartitionGroup :
+                partitionGroupConsumerRegions.keySet()) {
+            for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+                correlatedResultPartitionGroups
+                        .computeIfAbsent(
+                                partitionId.getIntermediateDataSetID(), id -> 
new HashSet<>())
+                        .add(consumedPartitionGroup);
+            }
+        }
+    }
+
     @Override
     public void startScheduling() {
         final Set<SchedulingPipelinedRegion> sourceRegions =
                 
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
-                        .filter(region -> 
!region.getConsumedResults().iterator().hasNext())
+                        .filter(this::noExternalBlockingConsumedPartitions)
                         .collect(Collectors.toSet());
         maybeScheduleRegions(sourceRegions);
     }
 
+    private boolean 
noExternalBlockingConsumedPartitions(SchedulingPipelinedRegion region) {
+        for (ConsumedPartitionGroup consumedPartitionGroup :
+                region.getAllBlockingConsumedPartitionGroups()) {
+            if 
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)

Review comment:
       It is no correct here. If a region has cross-region 
`ConsumerPartitionGroups` but has no external `ConsumerPartitionGroup`, it 
should also be treated as a source region.
   I think we need to add a test case for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to