This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fafd86cd648e0b31e3dc78f6426616a9d2c671c5
Author: JunRuiLee <jrlee....@gmail.com>
AuthorDate: Fri Dec 27 10:57:54 2024 +0800

    [FLINK-36850][runtime] Disable adaptive batch execution when batch job 
progress recovery is enabled.
---
 .../AdaptiveBatchSchedulerFactory.java             |  5 +-
 .../AdaptiveExecutionHandlerFactory.java           | 16 +++++-
 .../flink/test/scheduling/JMFailoverITCase.java    | 65 +++++++++++-----------
 3 files changed, 52 insertions(+), 34 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
index f445c901593..8c8e72059a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
@@ -269,7 +269,10 @@ public class AdaptiveBatchSchedulerFactory implements 
SchedulerNGFactory {
 
         AdaptiveExecutionHandler adaptiveExecutionHandler =
                 AdaptiveExecutionHandlerFactory.create(
-                        executionPlan, userCodeLoader, futureExecutor);
+                        executionPlan,
+                        jobRecoveryHandler instanceof 
DefaultBatchJobRecoveryHandler,
+                        userCodeLoader,
+                        futureExecutor);
 
         return new AdaptiveBatchScheduler(
                 log,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java
index 2d7be76c729..815eee65a46 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java
@@ -36,8 +36,14 @@ public class AdaptiveExecutionHandlerFactory {
     /**
      * Creates an instance of {@link AdaptiveExecutionHandler} based on the 
provided execution plan.
      *
+     * <p>TODO: Currently, adaptive execution cannot work with batch job 
progress recovery, so we
+     * always use {@link NonAdaptiveExecutionHandler} if batch job recovery is 
enabled. This
+     * limitation will be removed in the future when we adapt adaptive batch 
execution to batch job
+     * recovery.
+     *
      * @param executionPlan The execution plan, which can be either a {@link 
JobGraph} or a {@link
      *     StreamGraph}.
+     * @param enableBatchJobRecovery Whether to enable batch job recovery.
      * @param userClassLoader The class loader for the user code.
      * @param serializationExecutor The executor used for serialization tasks.
      * @return An instance of {@link AdaptiveExecutionHandler}.
@@ -46,6 +52,7 @@ public class AdaptiveExecutionHandlerFactory {
      */
     public static AdaptiveExecutionHandler create(
             ExecutionPlan executionPlan,
+            boolean enableBatchJobRecovery,
             ClassLoader userClassLoader,
             Executor serializationExecutor)
             throws DynamicCodeLoadingException {
@@ -53,8 +60,13 @@ public class AdaptiveExecutionHandlerFactory {
             return new NonAdaptiveExecutionHandler((JobGraph) executionPlan);
         } else {
             checkState(executionPlan instanceof StreamGraph, "Unsupported 
execution plan.");
-            return new DefaultAdaptiveExecutionHandler(
-                    userClassLoader, (StreamGraph) executionPlan, 
serializationExecutor);
+            if (enableBatchJobRecovery) {
+                StreamGraph streamGraph = (StreamGraph) executionPlan;
+                return new 
NonAdaptiveExecutionHandler(streamGraph.getJobGraph(userClassLoader));
+            } else {
+                return new DefaultAdaptiveExecutionHandler(
+                        userClassLoader, (StreamGraph) executionPlan, 
serializationExecutor);
+            }
         }
     }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java
index add680b5ca4..2c2aa501813 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java
@@ -41,13 +41,12 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
 import org.apache.flink.runtime.io.network.partition.PartitionedFile;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobType;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.TestingMiniCluster;
@@ -57,7 +56,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -165,12 +163,12 @@ class JMFailoverITCase {
 
     @Test
     void testRecoverFromJMFailover() throws Exception {
-        JobGraph jobGraph = prepareEnvAndGetJobGraph();
+        StreamGraph streamGraph = prepareEnvAndGetStreamGraph();
 
         // blocking all sink
         StubRecordSink.blockSubTasks(0, 1, 2, 3);
 
-        JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID();
+        JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID();
 
         // wait until sink is running.
         tryWaitUntilCondition(() -> StubRecordSink.attemptIds.size() > 0);
@@ -188,21 +186,25 @@ class JMFailoverITCase {
 
     @Test
     void testSourceNotAllFinished() throws Exception {
-        JobGraph jobGraph = prepareEnvAndGetJobGraph();
+        StreamGraph streamGraph = prepareEnvAndGetStreamGraph();
 
         // blocking source 0
         SourceTail.blockSubTasks(0);
 
-        JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID();
+        JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID();
 
         // wait until source is running.
         tryWaitUntilCondition(() -> SourceTail.attemptIds.size() == 
SOURCE_PARALLELISM);
 
-        JobVertex source = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
         while (true) {
             AccessExecutionGraph executionGraph = 
flinkCluster.getExecutionGraph(jobId).get();
+            AccessExecutionJobVertex source =
+                    executionGraph.getAllVertices().values().stream()
+                            .filter(jobVertex -> 
jobVertex.getName().contains("Source"))
+                            .findFirst()
+                            .get();
             long finishedTasks =
-                    
Arrays.stream(executionGraph.getJobVertex(source.getID()).getTaskVertices())
+                    Arrays.stream(source.getTaskVertices())
                             .filter(task -> task.getExecutionState() == 
ExecutionState.FINISHED)
                             .count();
             if (finishedTasks == SOURCE_PARALLELISM - 1) {
@@ -228,12 +230,12 @@ class JMFailoverITCase {
         Configuration configuration = new Configuration();
         configuration.set(
                 
BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, 
Duration.ZERO);
-        JobGraph jobGraph = prepareEnvAndGetJobGraph(configuration);
+        StreamGraph streamGraph = prepareEnvAndGetStreamGraph(configuration);
 
         // blocking all sink
         StubRecordSink.blockSubTasks(0, 1, 2, 3);
 
-        JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID();
+        JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID();
 
         // wait until sink is running.
         tryWaitUntilCondition(() -> StubRecordSink.attemptIds.size() > 0);
@@ -251,12 +253,12 @@ class JMFailoverITCase {
 
     @Test
     void testPartitionNotFoundTwice() throws Exception {
-        JobGraph jobGraph = prepareEnvAndGetJobGraph();
+        StreamGraph streamGraph = prepareEnvAndGetStreamGraph();
 
         // blocking map 0 and map 1.
         StubMapFunction.blockSubTasks(0, 1);
 
-        JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID();
+        JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID();
 
         // wait until map deploying, which indicates all source finished.
         tryWaitUntilCondition(() -> StubMapFunction.attemptIds.size() > 0);
@@ -286,12 +288,12 @@ class JMFailoverITCase {
 
     @Test
     void testPartitionNotFoundAndOperatorCoordinatorNotSupportBatchSnapshot() 
throws Exception {
-        JobGraph jobGraph = prepareEnvAndGetJobGraph(false);
+        StreamGraph streamGraph = prepareEnvAndGetStreamGraph(false);
 
         // blocking all map task
         StubMapFunction2.blockSubTasks(0, 1, 2, 3);
 
-        JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID();
+        JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID();
 
         // wait until map deploying, which indicates all source finished.
         tryWaitUntilCondition(() -> StubMapFunction2.attemptIds.size() > 0);
@@ -312,12 +314,12 @@ class JMFailoverITCase {
 
     @Test
     void testPartitionNotFoundAndOperatorCoordinatorSupportBatchSnapshot() 
throws Exception {
-        JobGraph jobGraph = prepareEnvAndGetJobGraph();
+        StreamGraph streamGraph = prepareEnvAndGetStreamGraph();
 
         // blocking map 0.
         StubMapFunction.blockSubTasks(0);
 
-        JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID();
+        JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID();
 
         // wait until map deploying, which indicates all source finished.
         tryWaitUntilCondition(() -> StubMapFunction.attemptIds.size() > 0);
@@ -336,28 +338,29 @@ class JMFailoverITCase {
         checkCountResults();
     }
 
-    private JobGraph prepareEnvAndGetJobGraph() throws Exception {
+    private StreamGraph prepareEnvAndGetStreamGraph() throws Exception {
         Configuration configuration = new Configuration();
         configuration.set(
                 
BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT,
                 previousWorkerRecoveryTimeout);
-        return prepareEnvAndGetJobGraph(configuration, true);
+        return prepareEnvAndGetStreamGraph(configuration, true);
     }
 
-    private JobGraph prepareEnvAndGetJobGraph(Configuration config) throws 
Exception {
-        return prepareEnvAndGetJobGraph(config, true);
+    private StreamGraph prepareEnvAndGetStreamGraph(Configuration config) 
throws Exception {
+        return prepareEnvAndGetStreamGraph(config, true);
     }
 
-    private JobGraph prepareEnvAndGetJobGraph(boolean 
operatorCoordinatorsSupportsBatchSnapshot)
-            throws Exception {
+    private StreamGraph prepareEnvAndGetStreamGraph(
+            boolean operatorCoordinatorsSupportsBatchSnapshot) throws 
Exception {
         Configuration configuration = new Configuration();
         configuration.set(
                 
BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT,
                 previousWorkerRecoveryTimeout);
-        return prepareEnvAndGetJobGraph(configuration, 
operatorCoordinatorsSupportsBatchSnapshot);
+        return prepareEnvAndGetStreamGraph(
+                configuration, operatorCoordinatorsSupportsBatchSnapshot);
     }
 
-    private JobGraph prepareEnvAndGetJobGraph(
+    private StreamGraph prepareEnvAndGetStreamGraph(
             Configuration config, boolean 
operatorCoordinatorsSupportsBatchSnapshot)
             throws Exception {
         flinkCluster =
@@ -371,8 +374,8 @@ class JMFailoverITCase {
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
 
         return operatorCoordinatorsSupportsBatchSnapshot
-                ? createJobGraph(env, methodName)
-                : 
createJobGraphWithUnsupportedBatchSnapshotOperatorCoordinator(env, methodName);
+                ? createStreamGraph(env, methodName)
+                : 
createStreamGraphWithUnsupportedBatchSnapshotOperatorCoordinator(env, 
methodName);
     }
 
     private TestingMiniClusterConfiguration 
getMiniClusterConfiguration(Configuration config)
@@ -446,7 +449,7 @@ class JMFailoverITCase {
                 new File(flinkConfiguration.get(CoreOptions.TMP_DIRS)));
     }
 
-    private JobGraph createJobGraph(StreamExecutionEnvironment env, String 
jobName) {
+    private StreamGraph createStreamGraph(StreamExecutionEnvironment env, 
String jobName) {
         TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo =
                 new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO);
 
@@ -468,10 +471,10 @@ class JMFailoverITCase {
         
streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
         streamGraph.setJobType(JobType.BATCH);
         streamGraph.setJobName(jobName);
-        return StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        return streamGraph;
     }
 
-    private JobGraph 
createJobGraphWithUnsupportedBatchSnapshotOperatorCoordinator(
+    private StreamGraph 
createStreamGraphWithUnsupportedBatchSnapshotOperatorCoordinator(
             StreamExecutionEnvironment env, String jobName) throws Exception {
 
         TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo =
@@ -500,7 +503,7 @@ class JMFailoverITCase {
         
streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
         streamGraph.setJobType(JobType.BATCH);
         streamGraph.setJobName(jobName);
-        return StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        return streamGraph;
     }
 
     private static void setSubtaskBlocked(

Reply via email to