zhuzhurk commented on a change in pull request #15310:
URL: https://github.com/apache/flink/pull/15310#discussion_r603082091



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -96,7 +101,7 @@ private void init() {
     public void startScheduling() {
         final Set<SchedulingPipelinedRegion> sourceRegions =
                 
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
-                        .filter(region -> 
!region.getConsumedResults().iterator().hasNext())
+                        .filter(region -> 
!region.getConsumedPartitionGroups().iterator().hasNext())

Review comment:
       outdated.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -175,9 +184,24 @@ private void maybeScheduleRegion(final 
SchedulingPipelinedRegion region) {
         schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
     }
 
-    private boolean areRegionInputsAllConsumable(final 
SchedulingPipelinedRegion region) {
-        for (SchedulingResultPartition partition : 
region.getConsumedResults()) {
-            if (partition.getState() != ResultPartitionState.CONSUMABLE) {
+    private boolean areRegionInputsAllConsumable(
+            final SchedulingPipelinedRegion region,
+            final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+        for (ConsumedPartitionGroup consumedPartitionGroup : 
region.getConsumedPartitionGroups()) {
+            if (!consumableStatusCache.computeIfAbsent(
+                    consumedPartitionGroup, 
this::isConsumedPartitionGroupConsumable)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean isConsumedPartitionGroupConsumable(
+            final ConsumedPartitionGroup consumedPartitionGroup) {
+        // For grouped pipelined result partitions, they may not be consumable 
at the same time

Review comment:
       outdated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to