Thesharing commented on a change in pull request #15310:
URL: https://github.com/apache/flink/pull/15310#discussion_r602992946
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java
##########
@@ -23,10 +23,19 @@
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.topology.PipelinedRegion;
+import java.util.List;
+
/** Pipelined region on execution level, i.e., {@link ExecutionGraph} level. */
public interface SchedulingPipelinedRegion
extends PipelinedRegion<
ExecutionVertexID,
IntermediateResultPartitionID,
SchedulingExecutionVertex,
- SchedulingResultPartition> {}
+ SchedulingResultPartition> {
+ /**
+ * Get all blocking {@link ConsumedPartitionGroup}s.
+ *
+ * @return list of {@link ConsumedPartitionGroup}s
+ */
+ List<ConsumedPartitionGroup> getAllBlockingConsumedPartitionGroups();
Review comment:
Resolved. I'm wondering should we change
`SchedulingExecutionVertex#getConsumedPartitionGroups` and
`SchedulingResultPartition#getConsumerVertexGroups` into `Iterable`, too?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -19,30 +19,51 @@
package org.apache.flink.runtime.scheduler.adapter;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.util.Preconditions;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
/** Default implementation of {@link SchedulingPipelinedRegion}. */
public class DefaultSchedulingPipelinedRegion implements
SchedulingPipelinedRegion {
private final Map<ExecutionVertexID, DefaultExecutionVertex>
executionVertices;
- private Set<DefaultResultPartition> consumedResults;
+ private List<ConsumedPartitionGroup> blockingConsumedPartitionGroups;
+
+ private final Function<IntermediateResultPartitionID,
DefaultResultPartition>
+ resultPartitionRetriever;
+
+ public DefaultSchedulingPipelinedRegion(
+ Set<DefaultExecutionVertex> defaultExecutionVertices,
+ Function<IntermediateResultPartitionID, DefaultResultPartition>
+ resultPartitionRetriever) {
- public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex>
defaultExecutionVertices) {
Preconditions.checkNotNull(defaultExecutionVertices);
this.executionVertices = new HashMap<>();
for (DefaultExecutionVertex executionVertex :
defaultExecutionVertices) {
this.executionVertices.put(executionVertex.getId(),
executionVertex);
}
+
+ this.resultPartitionRetriever = resultPartitionRetriever;
Review comment:
Thanks for reminding. Added.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
##########
@@ -30,15 +33,24 @@
private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex>
regionVertices =
new HashMap<>();
- private final Set<TestingSchedulingResultPartition> consumedPartitions =
new HashSet<>();
+ private final List<ConsumedPartitionGroup> consumedPartitionGroups = new
ArrayList<>();
Review comment:
Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
##########
@@ -31,7 +31,7 @@
import java.util.stream.Collectors;
/** Utils for {@link SchedulingStrategy}. */
-class SchedulingStrategyUtils {
+public class SchedulingStrategyUtils {
Review comment:
Sorry that I forget to remove it since I removed the methods added in
the previous commits. Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
}
}
+ private void initCrossRegionConsumerPartitionGroups() {
Review comment:
Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -19,30 +19,51 @@
package org.apache.flink.runtime.scheduler.adapter;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.util.Preconditions;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
/** Default implementation of {@link SchedulingPipelinedRegion}. */
public class DefaultSchedulingPipelinedRegion implements
SchedulingPipelinedRegion {
private final Map<ExecutionVertexID, DefaultExecutionVertex>
executionVertices;
- private Set<DefaultResultPartition> consumedResults;
+ private List<ConsumedPartitionGroup> blockingConsumedPartitionGroups;
+
+ private final Function<IntermediateResultPartitionID,
DefaultResultPartition>
+ resultPartitionRetriever;
+
+ public DefaultSchedulingPipelinedRegion(
+ Set<DefaultExecutionVertex> defaultExecutionVertices,
+ Function<IntermediateResultPartitionID, DefaultResultPartition>
+ resultPartitionRetriever) {
- public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex>
defaultExecutionVertices) {
Preconditions.checkNotNull(defaultExecutionVertices);
this.executionVertices = new HashMap<>();
for (DefaultExecutionVertex executionVertex :
defaultExecutionVertices) {
this.executionVertices.put(executionVertex.getId(),
executionVertex);
}
+
+ this.resultPartitionRetriever = resultPartitionRetriever;
+ }
+
+ @VisibleForTesting
+ public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex>
defaultExecutionVertices) {
Review comment:
Agreed. Removed.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -60,23 +81,33 @@ public DefaultExecutionVertex getVertex(final
ExecutionVertexID vertexId) {
return executionVertex;
}
- @Override
- public Iterable<DefaultResultPartition> getConsumedResults() {
- if (consumedResults == null) {
- initializeConsumedResults();
- }
- return consumedResults;
- }
-
- private void initializeConsumedResults() {
- final Set<DefaultResultPartition> consumedResults = new HashSet<>();
+ private void initializeAllBlockingConsumedPartitionGroups() {
+ final Set<ConsumedPartitionGroup> consumedResultGroupSet = new
HashSet<>();
for (DefaultExecutionVertex executionVertex :
executionVertices.values()) {
- for (DefaultResultPartition resultPartition :
executionVertex.getConsumedResults()) {
- if
(!executionVertices.containsKey(resultPartition.getProducer().getId())) {
- consumedResults.add(resultPartition);
+ for (ConsumedPartitionGroup consumedResultGroup :
+ executionVertex.getConsumedPartitionGroups()) {
+ SchedulingResultPartition resultPartition =
+
resultPartitionRetriever.apply(consumedResultGroup.getFirst());
+ if (resultPartition.getResultType().isBlocking()) {
+ consumedResultGroupSet.add(consumedResultGroup);
}
}
}
- this.consumedResults = Collections.unmodifiableSet(consumedResults);
+
+ this.blockingConsumedPartitionGroups =
Review comment:
Agreed. Resolved.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
##########
@@ -47,12 +47,16 @@
private ResultPartitionState state;
- TestingSchedulingResultPartition(
- IntermediateDataSetID dataSetID, ResultPartitionType type,
ResultPartitionState state) {
+ private TestingSchedulingResultPartition(
+ IntermediateDataSetID dataSetID,
+ int partitionNum,
Review comment:
`testNotifyingBlockingResultPartitionProducerFinished` and
`testFinishedBlockingResultPartitionProducerDoNotScheduleNonCreatedRegions` in
`PipelinedRegionSchedulingStrategyTest`.
I prefer this should be changed since the `correlatedResultPartitionGroups`
in `PipelinedRegionSchedulingStrategy` use `IntermediateDataSetID` as key. It's
more reasonable to make `IntermediateResultPartitionID` has the same
`IntermediateDataSetID`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
##########
@@ -30,15 +33,24 @@
private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex>
regionVertices =
new HashMap<>();
- private final Set<TestingSchedulingResultPartition> consumedPartitions =
new HashSet<>();
+ private final List<ConsumedPartitionGroup> consumedPartitionGroups = new
ArrayList<>();
+
+ private final Map<IntermediateResultPartitionID,
TestingSchedulingResultPartition>
Review comment:
Changed this field into a local variable.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -49,16 +50,21 @@
private final DeploymentOption deploymentOption = new
DeploymentOption(false);
- /** Result partitions are correlated if they have the same result id. */
- private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>>
- correlatedResultPartitions = new HashMap<>();
+ /** ConsumedPartitionGroups are correlated if they have the same result
id. */
+ private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>>
+ correlatedResultPartitionGroups = new HashMap<>();
- private final Map<IntermediateResultPartitionID,
Set<SchedulingPipelinedRegion>>
- partitionConsumerRegions = new HashMap<>();
+ /** External consumer regions of each ConsumedPartitionGroup. */
+ private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
+ partitionGroupConsumerRegions = new HashMap<>();
private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>>
regionVerticesSorted =
new IdentityHashMap<>();
+ /** The ConsumedPartitionGroups which have inter-region producers and
inter-region producers. */
+ private final Set<ConsumedPartitionGroup>
crossRegionConsumerPartitionGroups =
Review comment:
Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
##########
@@ -74,7 +74,10 @@ private void initRegionExecutionViewByVertex() {
if (regionExecutionView.isFinished()) {
final SchedulingPipelinedRegion pipelinedRegion =
schedulingTopology.getPipelinedRegionOfVertex(finishedVertex);
- return
filterReleasablePartitions(pipelinedRegion.getConsumedResults());
+ return filterReleasablePartitions(
Review comment:
This part will be improved by FLINK-21331 later. But in this PR, first,
I'll make sure `getAllBlockingConsumedPartitionGroups` return `Set`. Also I'll
add a `distinct()` in `filterReleasablePartitions`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
}
}
+ private void initCrossRegionConsumerPartitionGroups() {
+ Set<ConsumedPartitionGroup> visitedPartitionGroups =
+ Collections.newSetFromMap(new IdentityHashMap<>());
+
+ for (SchedulingPipelinedRegion pipelinedRegion :
+ schedulingTopology.getAllPipelinedRegions()) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+ if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+ visitedPartitionGroups.add(consumedPartitionGroup);
+
+ SchedulingPipelinedRegion producerRegion = null;
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ SchedulingPipelinedRegion region =
getProducerRegion(partitionId);
+ if (producerRegion == null) {
+ producerRegion = region;
+ } else if (producerRegion != region) {
+
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private SchedulingPipelinedRegion
getProducerRegion(IntermediateResultPartitionID partitionId) {
+ return schedulingTopology.getPipelinedRegionOfVertex(
+
schedulingTopology.getResultPartition(partitionId).getProducer().getId());
+ }
+
+ private void initPartitionGroupConsumerRegions() {
+ for (SchedulingPipelinedRegion region :
schedulingTopology.getAllPipelinedRegions()) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ region.getAllBlockingConsumedPartitionGroups()) {
+ if
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
+ ||
isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+ partitionGroupConsumerRegions
+ .computeIfAbsent(consumedPartitionGroup, group ->
new HashSet<>())
+ .add(region);
+ }
+ }
+ }
+ }
+
+ private void initCorrelatedResultPartitionGroups() {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ partitionGroupConsumerRegions.keySet()) {
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ correlatedResultPartitionGroups
+ .computeIfAbsent(
+ partitionId.getIntermediateDataSetID(), id ->
new HashSet<>())
+ .add(consumedPartitionGroup);
+ }
+ }
+ }
+
@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
- .filter(region ->
!region.getConsumedResults().iterator().hasNext())
+ .filter(this::noExternalBlockingConsumedPartitions)
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}
+ private boolean
noExternalBlockingConsumedPartitions(SchedulingPipelinedRegion region) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ region.getAllBlockingConsumedPartitionGroups()) {
+ if
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
Review comment:
I'd prefer that the region with cross-region partitions should not be
scheduled like a source region. A test case has been added.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -49,16 +50,21 @@
private final DeploymentOption deploymentOption = new
DeploymentOption(false);
- /** Result partitions are correlated if they have the same result id. */
- private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>>
- correlatedResultPartitions = new HashMap<>();
+ /** ConsumedPartitionGroups are correlated if they have the same result
id. */
+ private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>>
+ correlatedResultPartitionGroups = new HashMap<>();
- private final Map<IntermediateResultPartitionID,
Set<SchedulingPipelinedRegion>>
- partitionConsumerRegions = new HashMap<>();
+ /** External consumer regions of each ConsumedPartitionGroup. */
+ private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
+ partitionGroupConsumerRegions = new HashMap<>();
private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>>
regionVerticesSorted =
new IdentityHashMap<>();
+ /** The ConsumedPartitionGroups which have inter-region producers and
inter-region producers. */
Review comment:
Thanks for pointing this out. Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
}
}
+ private void initCrossRegionConsumerPartitionGroups() {
+ Set<ConsumedPartitionGroup> visitedPartitionGroups =
+ Collections.newSetFromMap(new IdentityHashMap<>());
+
+ for (SchedulingPipelinedRegion pipelinedRegion :
+ schedulingTopology.getAllPipelinedRegions()) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+ if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+ visitedPartitionGroups.add(consumedPartitionGroup);
+
+ SchedulingPipelinedRegion producerRegion = null;
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ SchedulingPipelinedRegion region =
getProducerRegion(partitionId);
+ if (producerRegion == null) {
+ producerRegion = region;
+ } else if (producerRegion != region) {
+
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private SchedulingPipelinedRegion
getProducerRegion(IntermediateResultPartitionID partitionId) {
+ return schedulingTopology.getPipelinedRegionOfVertex(
+
schedulingTopology.getResultPartition(partitionId).getProducer().getId());
+ }
+
+ private void initPartitionGroupConsumerRegions() {
+ for (SchedulingPipelinedRegion region :
schedulingTopology.getAllPipelinedRegions()) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ region.getAllBlockingConsumedPartitionGroups()) {
+ if
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
+ ||
isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+ partitionGroupConsumerRegions
+ .computeIfAbsent(consumedPartitionGroup, group ->
new HashSet<>())
+ .add(region);
+ }
+ }
+ }
+ }
+
+ private void initCorrelatedResultPartitionGroups() {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ partitionGroupConsumerRegions.keySet()) {
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ correlatedResultPartitionGroups
+ .computeIfAbsent(
+ partitionId.getIntermediateDataSetID(), id ->
new HashSet<>())
+ .add(consumedPartitionGroup);
+ }
+ }
+ }
+
@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
- .filter(region ->
!region.getConsumedResults().iterator().hasNext())
+ .filter(this::noExternalBlockingConsumedPartitions)
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}
+ private boolean
noExternalBlockingConsumedPartitions(SchedulingPipelinedRegion region) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ region.getAllBlockingConsumedPartitionGroups()) {
+ if
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
Review comment:
Furthermore, @zhuzhurk and I find that for the case illustrated in
https://github.com/apache/flink/pull/15310#issuecomment-806018417, the region 2
won't be scheduled, even if we don't have this PR. It's because the result
partition produced by the third vertex in layer 2 is not consumable since it
locates in region 2. This leading to that the IntermediateDataSet it belongs to
will never be consumable. And region 2 will never be scheduled. We are going to
discuss this issue in a new JIRA.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
}
}
+ private void initCrossRegionConsumerPartitionGroups() {
+ Set<ConsumedPartitionGroup> visitedPartitionGroups =
+ Collections.newSetFromMap(new IdentityHashMap<>());
+
+ for (SchedulingPipelinedRegion pipelinedRegion :
+ schedulingTopology.getAllPipelinedRegions()) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+ if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+ visitedPartitionGroups.add(consumedPartitionGroup);
+
+ SchedulingPipelinedRegion producerRegion = null;
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ SchedulingPipelinedRegion region =
getProducerRegion(partitionId);
+ if (producerRegion == null) {
+ producerRegion = region;
+ } else if (producerRegion != region) {
+
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);
Review comment:
Yes. Sorry for being careless. Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -92,15 +92,82 @@ private void init() {
}
}
+ private void initCrossRegionConsumerPartitionGroups() {
+ Set<ConsumedPartitionGroup> visitedPartitionGroups =
+ Collections.newSetFromMap(new IdentityHashMap<>());
+
+ for (SchedulingPipelinedRegion pipelinedRegion :
+ schedulingTopology.getAllPipelinedRegions()) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+ if (!visitedPartitionGroups.contains(consumedPartitionGroup)) {
+ visitedPartitionGroups.add(consumedPartitionGroup);
+
+ SchedulingPipelinedRegion producerRegion = null;
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ SchedulingPipelinedRegion region =
getProducerRegion(partitionId);
+ if (producerRegion == null) {
+ producerRegion = region;
+ } else if (producerRegion != region) {
+
crossRegionConsumerPartitionGroups.add(consumedPartitionGroup);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private SchedulingPipelinedRegion
getProducerRegion(IntermediateResultPartitionID partitionId) {
+ return schedulingTopology.getPipelinedRegionOfVertex(
+
schedulingTopology.getResultPartition(partitionId).getProducer().getId());
+ }
+
+ private void initPartitionGroupConsumerRegions() {
+ for (SchedulingPipelinedRegion region :
schedulingTopology.getAllPipelinedRegions()) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ region.getAllBlockingConsumedPartitionGroups()) {
+ if
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)
+ ||
isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+ partitionGroupConsumerRegions
+ .computeIfAbsent(consumedPartitionGroup, group ->
new HashSet<>())
+ .add(region);
+ }
+ }
+ }
+ }
+
+ private void initCorrelatedResultPartitionGroups() {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ partitionGroupConsumerRegions.keySet()) {
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ correlatedResultPartitionGroups
+ .computeIfAbsent(
+ partitionId.getIntermediateDataSetID(), id ->
new HashSet<>())
+ .add(consumedPartitionGroup);
+ }
+ }
+ }
+
@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
- .filter(region ->
!region.getConsumedResults().iterator().hasNext())
+ .filter(this::noExternalBlockingConsumedPartitions)
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}
+ private boolean
noExternalBlockingConsumedPartitions(SchedulingPipelinedRegion region) {
Review comment:
Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -175,15 +246,50 @@ private void maybeScheduleRegion(final
SchedulingPipelinedRegion region) {
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
- private boolean areRegionInputsAllConsumable(final
SchedulingPipelinedRegion region) {
- for (SchedulingResultPartition partition :
region.getConsumedResults()) {
- if (partition.getState() != ResultPartitionState.CONSUMABLE) {
+ private boolean areRegionInputsAllConsumable(
+ final SchedulingPipelinedRegion region,
+ final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ region.getAllBlockingConsumedPartitionGroups()) {
+ if
(crossRegionConsumerPartitionGroups.contains(consumedPartitionGroup)) {
+ if
(!isCrossRegionConsumedPartitionConsumable(consumedPartitionGroup, region)) {
+ return false;
+ }
+ } else if
(isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
+ if (!consumableStatusCache.computeIfAbsent(
+ consumedPartitionGroup,
this::isConsumedPartitionGroupConsumable)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private boolean isConsumedPartitionGroupConsumable(
+ final ConsumedPartitionGroup consumedPartitionGroup) {
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ if (schedulingTopology.getResultPartition(partitionId).getState()
+ != ResultPartitionState.CONSUMABLE) {
return false;
}
}
return true;
}
+ private boolean isCrossRegionConsumedPartitionConsumable(
+ final ConsumedPartitionGroup consumedPartitionGroup,
+ final SchedulingPipelinedRegion pipelinedRegion) {
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ if (isExternalConsumedPartition(partitionId, pipelinedRegion)) {
Review comment:
Thanks for pointing this out. Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -49,16 +50,21 @@
private final DeploymentOption deploymentOption = new
DeploymentOption(false);
- /** Result partitions are correlated if they have the same result id. */
- private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>>
- correlatedResultPartitions = new HashMap<>();
+ /** ConsumedPartitionGroups are correlated if they have the same result
id. */
+ private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>>
+ correlatedResultPartitionGroups = new HashMap<>();
- private final Map<IntermediateResultPartitionID,
Set<SchedulingPipelinedRegion>>
- partitionConsumerRegions = new HashMap<>();
+ /** External consumer regions of each ConsumedPartitionGroup. */
+ private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>
+ partitionGroupConsumerRegions = new HashMap<>();
Review comment:
Resolved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]