zhuzhurk commented on a change in pull request #15310:
URL: https://github.com/apache/flink/pull/15310#discussion_r600155822
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -175,9 +184,24 @@ private void maybeScheduleRegion(final
SchedulingPipelinedRegion region) {
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
- private boolean areRegionInputsAllConsumable(final
SchedulingPipelinedRegion region) {
- for (SchedulingResultPartition partition :
region.getConsumedResults()) {
- if (partition.getState() != ResultPartitionState.CONSUMABLE) {
+ private boolean areRegionInputsAllConsumable(
+ final SchedulingPipelinedRegion region,
+ final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
region.getConsumedPartitionGroups()) {
+ if (!consumableStatusCache.computeIfAbsent(
+ consumedPartitionGroup,
this::isConsumedPartitionGroupConsumable)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isConsumedPartitionGroupConsumable(
+ final ConsumedPartitionGroup consumedPartitionGroup) {
+ // For grouped pipelined result partitions, they may not be consumable
at the same time
Review comment:
I did not quite understand this comment, would you explain it?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -62,21 +85,34 @@ public DefaultExecutionVertex getVertex(final
ExecutionVertexID vertexId) {
@Override
public Iterable<DefaultResultPartition> getConsumedResults() {
- if (consumedResults == null) {
+ if (consumedPartitionGroups == null) {
initializeConsumedResults();
}
- return consumedResults;
+ return () -> flatMap(consumedPartitionGroups,
resultPartitionRetriever);
}
private void initializeConsumedResults() {
- final Set<DefaultResultPartition> consumedResults = new HashSet<>();
+ final Set<ConsumedPartitionGroup> consumedResultGroupSet = new
HashSet<>();
for (DefaultExecutionVertex executionVertex :
executionVertices.values()) {
- for (DefaultResultPartition resultPartition :
executionVertex.getConsumedResults()) {
+ for (ConsumedPartitionGroup consumedResultGroup :
+ executionVertex.getConsumedPartitionGroups()) {
+ SchedulingResultPartition resultPartition =
+
resultPartitionRetriever.apply(consumedResultGroup.getFirst());
Review comment:
this is not correct because `consumedResultGroup` can be across multiple
regions. And it is also a problem if a `ConsumedPartitionGroup` contains both
intra-region consumed partition and inter-region one.
I think we need to have a clear to avoid this case or solve it before we
continue with this PR.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
##########
@@ -48,11 +48,15 @@
private ResultPartitionState state;
TestingSchedulingResultPartition(
Review comment:
This method can be private
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -96,7 +101,7 @@ private void init() {
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
- .filter(region ->
!region.getConsumedResults().iterator().hasNext())
+ .filter(region ->
!region.getConsumedPartitionGroups().iterator().hasNext())
Review comment:
We need to take care of it if `region.getConsumedPartitionGroups()` can
contain intra-region partitions.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
##########
@@ -115,6 +119,7 @@ void setState(ResultPartitionState state) {
/** Builder for {@link TestingSchedulingResultPartition}. */
public static final class Builder {
private IntermediateDataSetID intermediateDataSetId = new
IntermediateDataSetID();
+ private int partitionNum = -1;
Review comment:
I would use a valid value 0 as the default `partitionNum`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]