zhuzhurk commented on code in PR #22798:
URL: https://github.com/apache/flink/pull/22798#discussion_r1233332596


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java:
##########
@@ -65,6 +65,18 @@ public boolean isInputConsumable(
         return true;
     }
 
+    /**
+     * This method is only used to filter consumable consumed partition group 
in the
+     * onExecutionStateChange method of {@link VertexwiseSchedulingStrategy}. 
For hybrid shuffle
+     * mode, the downstream vertices will be scheduled together with their 
upstreams. Therefore,
+     * only blocking consumed partition group needs to be considered here.
+     */
+    @Override
+    public boolean isConsumedPartitionGroupConsumable(
+            final ConsumedPartitionGroup consumedPartitionGroup) {
+        return consumedPartitionGroup.areAllPartitionsFinished();

Review Comment:
   Looks to me the logic here should be 
   ```
           if 
(consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
               return false;
           } else {
               return consumedPartitionGroup.areAllPartitionsFinished();
           }
   ```
   
   It returns `return false` because we only expect this method to be invoked 
to find groups that becomes consumable after a vertex finishes, otherwise it 
can be `return true` because when using `DefaultInputConsumableDecider`, a 
`ConsumedPartitionGroup` is always consumable for a hybrid shuffle downstream 
task. However, `return false` would be helpful to improve the scheduling 
performance for hybrid shuffle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to