This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b01730579fa5afeb5eb6c203a9c6ecce611d7b46
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Aug 5 14:55:08 2022 +0800

    [FLINK-28799] PipelinedRegionSchedulingStrategy maintain scheduled regions
---
 .../scheduler/strategy/PipelinedRegionSchedulingStrategy.java      | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
index dda505a4653..2ace05c67fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
@@ -57,6 +57,8 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
     private final Set<ConsumedPartitionGroup> 
crossRegionConsumedPartitionGroups =
             Collections.newSetFromMap(new IdentityHashMap<>());
 
+    private final Set<SchedulingPipelinedRegion> scheduledRegions = new 
HashSet<>();
+
     public PipelinedRegionSchedulingStrategy(
             final SchedulerOperations schedulerOperations,
             final SchedulingTopology schedulingTopology) {
@@ -163,6 +165,7 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
                 verticesToRestart.stream()
                         .map(schedulingTopology::getPipelinedRegionOfVertex)
                         .collect(Collectors.toSet());
+        scheduledRegions.removeAll(regionsToRestart);
         maybeScheduleRegions(regionsToRestart);
     }
 
@@ -216,7 +219,8 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
     private void maybeScheduleRegion(
             final SchedulingPipelinedRegion region,
             final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
-        if (!areRegionInputsAllConsumable(region, consumableStatusCache)) {
+        if (scheduledRegions.contains(region)
+                || !areRegionInputsAllConsumable(region, 
consumableStatusCache)) {
             return;
         }
 
@@ -225,6 +229,7 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
                 "BUG: trying to schedule a region which is not in CREATED 
state");
 
         
schedulerOperations.allocateSlotsAndDeploy(regionVerticesSorted.get(region));
+        scheduledRegions.add(region);
     }
 
     private boolean areRegionInputsAllConsumable(

Reply via email to