zhuzhurk commented on a change in pull request #15310:
URL: https://github.com/apache/flink/pull/15310#discussion_r600155822



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -175,9 +184,24 @@ private void maybeScheduleRegion(final 
SchedulingPipelinedRegion region) {
         schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
     }
 
-    private boolean areRegionInputsAllConsumable(final 
SchedulingPipelinedRegion region) {
-        for (SchedulingResultPartition partition : 
region.getConsumedResults()) {
-            if (partition.getState() != ResultPartitionState.CONSUMABLE) {
+    private boolean areRegionInputsAllConsumable(
+            final SchedulingPipelinedRegion region,
+            final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+        for (ConsumedPartitionGroup consumedPartitionGroup : 
region.getConsumedPartitionGroups()) {
+            if (!consumableStatusCache.computeIfAbsent(
+                    consumedPartitionGroup, 
this::isConsumedPartitionGroupConsumable)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean isConsumedPartitionGroupConsumable(
+            final ConsumedPartitionGroup consumedPartitionGroup) {
+        // For grouped pipelined result partitions, they may not be consumable 
at the same time

Review comment:
       I did not quite understand this comment, would you explain it?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
##########
@@ -62,21 +85,34 @@ public DefaultExecutionVertex getVertex(final 
ExecutionVertexID vertexId) {
 
     @Override
     public Iterable<DefaultResultPartition> getConsumedResults() {
-        if (consumedResults == null) {
+        if (consumedPartitionGroups == null) {
             initializeConsumedResults();
         }
-        return consumedResults;
+        return () -> flatMap(consumedPartitionGroups, 
resultPartitionRetriever);
     }
 
     private void initializeConsumedResults() {
-        final Set<DefaultResultPartition> consumedResults = new HashSet<>();
+        final Set<ConsumedPartitionGroup> consumedResultGroupSet = new 
HashSet<>();
         for (DefaultExecutionVertex executionVertex : 
executionVertices.values()) {
-            for (DefaultResultPartition resultPartition : 
executionVertex.getConsumedResults()) {
+            for (ConsumedPartitionGroup consumedResultGroup :
+                    executionVertex.getConsumedPartitionGroups()) {
+                SchedulingResultPartition resultPartition =
+                        
resultPartitionRetriever.apply(consumedResultGroup.getFirst());

Review comment:
       this is not correct because `consumedResultGroup` can be across multiple 
regions. And it is also a problem if a `ConsumedPartitionGroup` contains both 
intra-region consumed partition and inter-region one.
   I think we need to have a clear to avoid this case or solve it before we 
continue with this PR.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
##########
@@ -48,11 +48,15 @@
     private ResultPartitionState state;
 
     TestingSchedulingResultPartition(

Review comment:
       This method can be private

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -96,7 +101,7 @@ private void init() {
     public void startScheduling() {
         final Set<SchedulingPipelinedRegion> sourceRegions =
                 
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
-                        .filter(region -> 
!region.getConsumedResults().iterator().hasNext())
+                        .filter(region -> 
!region.getConsumedPartitionGroups().iterator().hasNext())

Review comment:
       We need to take care of it if `region.getConsumedPartitionGroups()` can 
contain intra-region partitions.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
##########
@@ -115,6 +119,7 @@ void setState(ResultPartitionState state) {
     /** Builder for {@link TestingSchedulingResultPartition}. */
     public static final class Builder {
         private IntermediateDataSetID intermediateDataSetId = new 
IntermediateDataSetID();
+        private int partitionNum = -1;

Review comment:
       I would use a valid value 0 as the default `partitionNum`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to