This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fafd86cd648e0b31e3dc78f6426616a9d2c671c5 Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Fri Dec 27 10:57:54 2024 +0800 [FLINK-36850][runtime] Disable adaptive batch execution when batch job progress recovery is enabled. --- .../AdaptiveBatchSchedulerFactory.java | 5 +- .../AdaptiveExecutionHandlerFactory.java | 16 +++++- .../flink/test/scheduling/JMFailoverITCase.java | 65 +++++++++++----------- 3 files changed, 52 insertions(+), 34 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java index f445c901593..8c8e72059a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java @@ -269,7 +269,10 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory { AdaptiveExecutionHandler adaptiveExecutionHandler = AdaptiveExecutionHandlerFactory.create( - executionPlan, userCodeLoader, futureExecutor); + executionPlan, + jobRecoveryHandler instanceof DefaultBatchJobRecoveryHandler, + userCodeLoader, + futureExecutor); return new AdaptiveBatchScheduler( log, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java index 2d7be76c729..815eee65a46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java @@ -36,8 +36,14 @@ public class AdaptiveExecutionHandlerFactory { /** * Creates an instance of {@link AdaptiveExecutionHandler} based on the provided execution plan. * + * <p>TODO: Currently, adaptive execution cannot work with batch job progress recovery, so we + * always use {@link NonAdaptiveExecutionHandler} if batch job recovery is enabled. This + * limitation will be removed in the future when we adapt adaptive batch execution to batch job + * recovery. + * * @param executionPlan The execution plan, which can be either a {@link JobGraph} or a {@link * StreamGraph}. + * @param enableBatchJobRecovery Whether to enable batch job recovery. * @param userClassLoader The class loader for the user code. * @param serializationExecutor The executor used for serialization tasks. * @return An instance of {@link AdaptiveExecutionHandler}. @@ -46,6 +52,7 @@ public class AdaptiveExecutionHandlerFactory { */ public static AdaptiveExecutionHandler create( ExecutionPlan executionPlan, + boolean enableBatchJobRecovery, ClassLoader userClassLoader, Executor serializationExecutor) throws DynamicCodeLoadingException { @@ -53,8 +60,13 @@ public class AdaptiveExecutionHandlerFactory { return new NonAdaptiveExecutionHandler((JobGraph) executionPlan); } else { checkState(executionPlan instanceof StreamGraph, "Unsupported execution plan."); - return new DefaultAdaptiveExecutionHandler( - userClassLoader, (StreamGraph) executionPlan, serializationExecutor); + if (enableBatchJobRecovery) { + StreamGraph streamGraph = (StreamGraph) executionPlan; + return new NonAdaptiveExecutionHandler(streamGraph.getJobGraph(userClassLoader)); + } else { + return new DefaultAdaptiveExecutionHandler( + userClassLoader, (StreamGraph) executionPlan, serializationExecutor); + } } } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java index add680b5ca4..2c2aa501813 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java @@ -41,13 +41,12 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; import org.apache.flink.runtime.io.network.partition.PartitionedFile; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobType; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.TestingMiniCluster; @@ -57,7 +56,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -165,12 +163,12 @@ class JMFailoverITCase { @Test void testRecoverFromJMFailover() throws Exception { - JobGraph jobGraph = prepareEnvAndGetJobGraph(); + StreamGraph streamGraph = prepareEnvAndGetStreamGraph(); // blocking all sink StubRecordSink.blockSubTasks(0, 1, 2, 3); - JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID(); + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); // wait until sink is running. tryWaitUntilCondition(() -> StubRecordSink.attemptIds.size() > 0); @@ -188,21 +186,25 @@ class JMFailoverITCase { @Test void testSourceNotAllFinished() throws Exception { - JobGraph jobGraph = prepareEnvAndGetJobGraph(); + StreamGraph streamGraph = prepareEnvAndGetStreamGraph(); // blocking source 0 SourceTail.blockSubTasks(0); - JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID(); + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); // wait until source is running. tryWaitUntilCondition(() -> SourceTail.attemptIds.size() == SOURCE_PARALLELISM); - JobVertex source = jobGraph.getVerticesSortedTopologicallyFromSources().get(0); while (true) { AccessExecutionGraph executionGraph = flinkCluster.getExecutionGraph(jobId).get(); + AccessExecutionJobVertex source = + executionGraph.getAllVertices().values().stream() + .filter(jobVertex -> jobVertex.getName().contains("Source")) + .findFirst() + .get(); long finishedTasks = - Arrays.stream(executionGraph.getJobVertex(source.getID()).getTaskVertices()) + Arrays.stream(source.getTaskVertices()) .filter(task -> task.getExecutionState() == ExecutionState.FINISHED) .count(); if (finishedTasks == SOURCE_PARALLELISM - 1) { @@ -228,12 +230,12 @@ class JMFailoverITCase { Configuration configuration = new Configuration(); configuration.set( BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, Duration.ZERO); - JobGraph jobGraph = prepareEnvAndGetJobGraph(configuration); + StreamGraph streamGraph = prepareEnvAndGetStreamGraph(configuration); // blocking all sink StubRecordSink.blockSubTasks(0, 1, 2, 3); - JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID(); + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); // wait until sink is running. tryWaitUntilCondition(() -> StubRecordSink.attemptIds.size() > 0); @@ -251,12 +253,12 @@ class JMFailoverITCase { @Test void testPartitionNotFoundTwice() throws Exception { - JobGraph jobGraph = prepareEnvAndGetJobGraph(); + StreamGraph streamGraph = prepareEnvAndGetStreamGraph(); // blocking map 0 and map 1. StubMapFunction.blockSubTasks(0, 1); - JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID(); + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); // wait until map deploying, which indicates all source finished. tryWaitUntilCondition(() -> StubMapFunction.attemptIds.size() > 0); @@ -286,12 +288,12 @@ class JMFailoverITCase { @Test void testPartitionNotFoundAndOperatorCoordinatorNotSupportBatchSnapshot() throws Exception { - JobGraph jobGraph = prepareEnvAndGetJobGraph(false); + StreamGraph streamGraph = prepareEnvAndGetStreamGraph(false); // blocking all map task StubMapFunction2.blockSubTasks(0, 1, 2, 3); - JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID(); + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); // wait until map deploying, which indicates all source finished. tryWaitUntilCondition(() -> StubMapFunction2.attemptIds.size() > 0); @@ -312,12 +314,12 @@ class JMFailoverITCase { @Test void testPartitionNotFoundAndOperatorCoordinatorSupportBatchSnapshot() throws Exception { - JobGraph jobGraph = prepareEnvAndGetJobGraph(); + StreamGraph streamGraph = prepareEnvAndGetStreamGraph(); // blocking map 0. StubMapFunction.blockSubTasks(0); - JobID jobId = flinkCluster.submitJob(jobGraph).get().getJobID(); + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); // wait until map deploying, which indicates all source finished. tryWaitUntilCondition(() -> StubMapFunction.attemptIds.size() > 0); @@ -336,28 +338,29 @@ class JMFailoverITCase { checkCountResults(); } - private JobGraph prepareEnvAndGetJobGraph() throws Exception { + private StreamGraph prepareEnvAndGetStreamGraph() throws Exception { Configuration configuration = new Configuration(); configuration.set( BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, previousWorkerRecoveryTimeout); - return prepareEnvAndGetJobGraph(configuration, true); + return prepareEnvAndGetStreamGraph(configuration, true); } - private JobGraph prepareEnvAndGetJobGraph(Configuration config) throws Exception { - return prepareEnvAndGetJobGraph(config, true); + private StreamGraph prepareEnvAndGetStreamGraph(Configuration config) throws Exception { + return prepareEnvAndGetStreamGraph(config, true); } - private JobGraph prepareEnvAndGetJobGraph(boolean operatorCoordinatorsSupportsBatchSnapshot) - throws Exception { + private StreamGraph prepareEnvAndGetStreamGraph( + boolean operatorCoordinatorsSupportsBatchSnapshot) throws Exception { Configuration configuration = new Configuration(); configuration.set( BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, previousWorkerRecoveryTimeout); - return prepareEnvAndGetJobGraph(configuration, operatorCoordinatorsSupportsBatchSnapshot); + return prepareEnvAndGetStreamGraph( + configuration, operatorCoordinatorsSupportsBatchSnapshot); } - private JobGraph prepareEnvAndGetJobGraph( + private StreamGraph prepareEnvAndGetStreamGraph( Configuration config, boolean operatorCoordinatorsSupportsBatchSnapshot) throws Exception { flinkCluster = @@ -371,8 +374,8 @@ class JMFailoverITCase { env.setRuntimeMode(RuntimeExecutionMode.BATCH); return operatorCoordinatorsSupportsBatchSnapshot - ? createJobGraph(env, methodName) - : createJobGraphWithUnsupportedBatchSnapshotOperatorCoordinator(env, methodName); + ? createStreamGraph(env, methodName) + : createStreamGraphWithUnsupportedBatchSnapshotOperatorCoordinator(env, methodName); } private TestingMiniClusterConfiguration getMiniClusterConfiguration(Configuration config) @@ -446,7 +449,7 @@ class JMFailoverITCase { new File(flinkConfiguration.get(CoreOptions.TMP_DIRS))); } - private JobGraph createJobGraph(StreamExecutionEnvironment env, String jobName) { + private StreamGraph createStreamGraph(StreamExecutionEnvironment env, String jobName) { TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); @@ -468,10 +471,10 @@ class JMFailoverITCase { streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING); streamGraph.setJobType(JobType.BATCH); streamGraph.setJobName(jobName); - return StreamingJobGraphGenerator.createJobGraph(streamGraph); + return streamGraph; } - private JobGraph createJobGraphWithUnsupportedBatchSnapshotOperatorCoordinator( + private StreamGraph createStreamGraphWithUnsupportedBatchSnapshotOperatorCoordinator( StreamExecutionEnvironment env, String jobName) throws Exception { TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo = @@ -500,7 +503,7 @@ class JMFailoverITCase { streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING); streamGraph.setJobType(JobType.BATCH); streamGraph.setJobName(jobName); - return StreamingJobGraphGenerator.createJobGraph(streamGraph); + return streamGraph; } private static void setSubtaskBlocked(