This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 662e2be630da19d44117e6c75fb6964ec0eafc61 Author: Weijie Guo <res...@163.com> AuthorDate: Mon Dec 23 11:02:47 2024 +0800 [FLINK-36880][network] Handle the finished vertex in InputConsumableDecider --- .../strategy/AllFinishedInputConsumableDecider.java | 5 ++++- .../strategy/DefaultInputConsumableDecider.java | 21 +++++++++++++++++---- .../scheduler/strategy/InputConsumableDecider.java | 5 ++++- .../PartialFinishedInputConsumableDecider.java | 5 ++++- .../strategy/VertexwiseSchedulingStrategy.java | 5 ++++- .../strategy/DefaultInputConsumableDeciderTest.java | 4 +++- .../strategy/VertexwiseSchedulingStrategyTest.java | 2 +- 7 files changed, 37 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java index f8cbb260488..1c4020afd32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.scheduler.strategy; +import org.apache.flink.runtime.execution.ExecutionState; + import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -60,7 +62,8 @@ public class AllFinishedInputConsumableDecider implements InputConsumableDecider @Override public InputConsumableDecider createInstance( SchedulingTopology schedulingTopology, - Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) { + Function<ExecutionVertexID, Boolean> scheduledVertexRetriever, + Function<ExecutionVertexID, ExecutionState> executionStateRetriever) { return new AllFinishedInputConsumableDecider(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java index ccd354b0d0d..6e3e2fec4b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.scheduler.strategy; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import java.util.Map; @@ -40,12 +41,16 @@ public class DefaultInputConsumableDecider implements InputConsumableDecider { private final Function<ExecutionVertexID, Boolean> scheduledVertexRetriever; + private final Function<ExecutionVertexID, ExecutionState> executionStateRetriever; + DefaultInputConsumableDecider( Function<ExecutionVertexID, Boolean> scheduledVertexRetriever, Function<IntermediateResultPartitionID, SchedulingResultPartition> - resultPartitionRetriever) { + resultPartitionRetriever, + Function<ExecutionVertexID, ExecutionState> executionStateRetriever) { this.scheduledVertexRetriever = scheduledVertexRetriever; this.resultPartitionRetriever = resultPartitionRetriever; + this.executionStateRetriever = executionStateRetriever; } @Override @@ -86,7 +91,12 @@ public class DefaultInputConsumableDecider implements InputConsumableDecider { ExecutionVertexID producerVertex = resultPartitionRetriever.apply(partitionId).getProducer().getId(); if (!verticesToSchedule.contains(producerVertex) - && !scheduledVertexRetriever.apply(producerVertex)) { + && !scheduledVertexRetriever.apply(producerVertex) + // For jm failover: the producer can be transitioned to FINISHED state not + // touched by scheduling strategy. This means all producer + // partitions finished, so we can schedule the downstream execution. + && executionStateRetriever.apply(producerVertex) + != ExecutionState.FINISHED) { return false; } } @@ -112,9 +122,12 @@ public class DefaultInputConsumableDecider implements InputConsumableDecider { @Override public InputConsumableDecider createInstance( SchedulingTopology schedulingTopology, - Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) { + Function<ExecutionVertexID, Boolean> scheduledVertexRetriever, + Function<ExecutionVertexID, ExecutionState> executionStateRetriever) { return new DefaultInputConsumableDecider( - scheduledVertexRetriever, schedulingTopology::getResultPartition); + scheduledVertexRetriever, + schedulingTopology::getResultPartition, + executionStateRetriever); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java index 1d19dd2cf62..58bc635a6cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.scheduler.strategy; +import org.apache.flink.runtime.execution.ExecutionState; + import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -53,6 +55,7 @@ public interface InputConsumableDecider { interface Factory { InputConsumableDecider createInstance( SchedulingTopology schedulingTopology, - Function<ExecutionVertexID, Boolean> scheduledVertexRetriever); + Function<ExecutionVertexID, Boolean> scheduledVertexRetriever, + Function<ExecutionVertexID, ExecutionState> executionStateRetriever); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java index df7c353e94e..4043608ffc5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.scheduler.strategy; +import org.apache.flink.runtime.execution.ExecutionState; + import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -75,7 +77,8 @@ public class PartialFinishedInputConsumableDecider implements InputConsumableDec @Override public InputConsumableDecider createInstance( SchedulingTopology schedulingTopology, - Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) { + Function<ExecutionVertexID, Boolean> scheduledVertexRetriever, + Function<ExecutionVertexID, ExecutionState> executionStateRetriever) { return new PartialFinishedInputConsumableDecider(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java index 7be6922b935..9ddef15e560 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java @@ -68,7 +68,10 @@ public class VertexwiseSchedulingStrategy this.schedulingTopology = checkNotNull(schedulingTopology); this.inputConsumableDecider = inputConsumableDeciderFactory.createInstance( - schedulingTopology, scheduledVertices::contains); + schedulingTopology, + scheduledVertices::contains, + (executionVertexId) -> + schedulingTopology.getVertex(executionVertexId).getState()); LOG.info( "Using InputConsumableDecider {} for VertexwiseSchedulingStrategy.", inputConsumableDecider.getClass().getName()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java index af61b1e3676..ac6bd65d33c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java @@ -206,6 +206,8 @@ class DefaultInputConsumableDeciderTest { private DefaultInputConsumableDecider createDefaultInputConsumableDecider( Set<ExecutionVertexID> scheduledVertices, SchedulingTopology schedulingTopology) { return new DefaultInputConsumableDecider( - scheduledVertices::contains, schedulingTopology::getResultPartition); + scheduledVertices::contains, + schedulingTopology::getResultPartition, + (id) -> schedulingTopology.getVertex(id).getState()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java index ea0023ddac1..39a51c9962f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java @@ -282,6 +282,6 @@ class VertexwiseSchedulingStrategyTest { return new VertexwiseSchedulingStrategy( testingSchedulerOperation, schedulingTopology, - (ignore1, ignore2) -> inputConsumableDecider); + (ignore1, ignore2, ignore3) -> inputConsumableDecider); } }