This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 662e2be630da19d44117e6c75fb6964ec0eafc61
Author: Weijie Guo <res...@163.com>
AuthorDate: Mon Dec 23 11:02:47 2024 +0800

    [FLINK-36880][network] Handle the finished vertex in InputConsumableDecider
---
 .../strategy/AllFinishedInputConsumableDecider.java |  5 ++++-
 .../strategy/DefaultInputConsumableDecider.java     | 21 +++++++++++++++++----
 .../scheduler/strategy/InputConsumableDecider.java  |  5 ++++-
 .../PartialFinishedInputConsumableDecider.java      |  5 ++++-
 .../strategy/VertexwiseSchedulingStrategy.java      |  5 ++++-
 .../strategy/DefaultInputConsumableDeciderTest.java |  4 +++-
 .../strategy/VertexwiseSchedulingStrategyTest.java  |  2 +-
 7 files changed, 37 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java
index f8cbb260488..1c4020afd32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.scheduler.strategy;
 
+import org.apache.flink.runtime.execution.ExecutionState;
+
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
@@ -60,7 +62,8 @@ public class AllFinishedInputConsumableDecider implements 
InputConsumableDecider
         @Override
         public InputConsumableDecider createInstance(
                 SchedulingTopology schedulingTopology,
-                Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) 
{
+                Function<ExecutionVertexID, Boolean> scheduledVertexRetriever,
+                Function<ExecutionVertexID, ExecutionState> 
executionStateRetriever) {
             return new AllFinishedInputConsumableDecider();
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
index ccd354b0d0d..6e3e2fec4b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.scheduler.strategy;
 
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
 import java.util.Map;
@@ -40,12 +41,16 @@ public class DefaultInputConsumableDecider implements 
InputConsumableDecider {
 
     private final Function<ExecutionVertexID, Boolean> 
scheduledVertexRetriever;
 
+    private final Function<ExecutionVertexID, ExecutionState> 
executionStateRetriever;
+
     DefaultInputConsumableDecider(
             Function<ExecutionVertexID, Boolean> scheduledVertexRetriever,
             Function<IntermediateResultPartitionID, SchedulingResultPartition>
-                    resultPartitionRetriever) {
+                    resultPartitionRetriever,
+            Function<ExecutionVertexID, ExecutionState> 
executionStateRetriever) {
         this.scheduledVertexRetriever = scheduledVertexRetriever;
         this.resultPartitionRetriever = resultPartitionRetriever;
+        this.executionStateRetriever = executionStateRetriever;
     }
 
     @Override
@@ -86,7 +91,12 @@ public class DefaultInputConsumableDecider implements 
InputConsumableDecider {
                 ExecutionVertexID producerVertex =
                         
resultPartitionRetriever.apply(partitionId).getProducer().getId();
                 if (!verticesToSchedule.contains(producerVertex)
-                        && !scheduledVertexRetriever.apply(producerVertex)) {
+                        && !scheduledVertexRetriever.apply(producerVertex)
+                        // For jm failover: the producer can be transitioned 
to FINISHED state not
+                        // touched by scheduling strategy. This means all 
producer
+                        // partitions finished, so we can schedule the 
downstream execution.
+                        && executionStateRetriever.apply(producerVertex)
+                                != ExecutionState.FINISHED) {
                     return false;
                 }
             }
@@ -112,9 +122,12 @@ public class DefaultInputConsumableDecider implements 
InputConsumableDecider {
         @Override
         public InputConsumableDecider createInstance(
                 SchedulingTopology schedulingTopology,
-                Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) 
{
+                Function<ExecutionVertexID, Boolean> scheduledVertexRetriever,
+                Function<ExecutionVertexID, ExecutionState> 
executionStateRetriever) {
             return new DefaultInputConsumableDecider(
-                    scheduledVertexRetriever, 
schedulingTopology::getResultPartition);
+                    scheduledVertexRetriever,
+                    schedulingTopology::getResultPartition,
+                    executionStateRetriever);
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
index 1d19dd2cf62..58bc635a6cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.scheduler.strategy;
 
+import org.apache.flink.runtime.execution.ExecutionState;
+
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
@@ -53,6 +55,7 @@ public interface InputConsumableDecider {
     interface Factory {
         InputConsumableDecider createInstance(
                 SchedulingTopology schedulingTopology,
-                Function<ExecutionVertexID, Boolean> scheduledVertexRetriever);
+                Function<ExecutionVertexID, Boolean> scheduledVertexRetriever,
+                Function<ExecutionVertexID, ExecutionState> 
executionStateRetriever);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java
index df7c353e94e..4043608ffc5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.scheduler.strategy;
 
+import org.apache.flink.runtime.execution.ExecutionState;
+
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
@@ -75,7 +77,8 @@ public class PartialFinishedInputConsumableDecider implements 
InputConsumableDec
         @Override
         public InputConsumableDecider createInstance(
                 SchedulingTopology schedulingTopology,
-                Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) 
{
+                Function<ExecutionVertexID, Boolean> scheduledVertexRetriever,
+                Function<ExecutionVertexID, ExecutionState> 
executionStateRetriever) {
             return new PartialFinishedInputConsumableDecider();
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
index 7be6922b935..9ddef15e560 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
@@ -68,7 +68,10 @@ public class VertexwiseSchedulingStrategy
         this.schedulingTopology = checkNotNull(schedulingTopology);
         this.inputConsumableDecider =
                 inputConsumableDeciderFactory.createInstance(
-                        schedulingTopology, scheduledVertices::contains);
+                        schedulingTopology,
+                        scheduledVertices::contains,
+                        (executionVertexId) ->
+                                
schedulingTopology.getVertex(executionVertexId).getState());
         LOG.info(
                 "Using InputConsumableDecider {} for 
VertexwiseSchedulingStrategy.",
                 inputConsumableDecider.getClass().getName());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
index af61b1e3676..ac6bd65d33c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
@@ -206,6 +206,8 @@ class DefaultInputConsumableDeciderTest {
     private DefaultInputConsumableDecider createDefaultInputConsumableDecider(
             Set<ExecutionVertexID> scheduledVertices, SchedulingTopology 
schedulingTopology) {
         return new DefaultInputConsumableDecider(
-                scheduledVertices::contains, 
schedulingTopology::getResultPartition);
+                scheduledVertices::contains,
+                schedulingTopology::getResultPartition,
+                (id) -> schedulingTopology.getVertex(id).getState());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
index ea0023ddac1..39a51c9962f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java
@@ -282,6 +282,6 @@ class VertexwiseSchedulingStrategyTest {
         return new VertexwiseSchedulingStrategy(
                 testingSchedulerOperation,
                 schedulingTopology,
-                (ignore1, ignore2) -> inputConsumableDecider);
+                (ignore1, ignore2, ignore3) -> inputConsumableDecider);
     }
 }

Reply via email to