This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b01730579fa5afeb5eb6c203a9c6ecce611d7b46 Author: Weijie Guo <[email protected]> AuthorDate: Fri Aug 5 14:55:08 2022 +0800 [FLINK-28799] PipelinedRegionSchedulingStrategy maintain scheduled regions --- .../scheduler/strategy/PipelinedRegionSchedulingStrategy.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java index dda505a4653..2ace05c67fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java @@ -57,6 +57,8 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { private final Set<ConsumedPartitionGroup> crossRegionConsumedPartitionGroups = Collections.newSetFromMap(new IdentityHashMap<>()); + private final Set<SchedulingPipelinedRegion> scheduledRegions = new HashSet<>(); + public PipelinedRegionSchedulingStrategy( final SchedulerOperations schedulerOperations, final SchedulingTopology schedulingTopology) { @@ -163,6 +165,7 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { verticesToRestart.stream() .map(schedulingTopology::getPipelinedRegionOfVertex) .collect(Collectors.toSet()); + scheduledRegions.removeAll(regionsToRestart); maybeScheduleRegions(regionsToRestart); } @@ -216,7 +219,8 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { private void maybeScheduleRegion( final SchedulingPipelinedRegion region, final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) { - if (!areRegionInputsAllConsumable(region, consumableStatusCache)) { + if (scheduledRegions.contains(region) + || !areRegionInputsAllConsumable(region, consumableStatusCache)) { return; } @@ -225,6 +229,7 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { "BUG: trying to schedule a region which is not in CREATED state"); schedulerOperations.allocateSlotsAndDeploy(regionVerticesSorted.get(region)); + scheduledRegions.add(region); } private boolean areRegionInputsAllConsumable(
