This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6133aecd7a513236c190b7b47c073a371d1c36c1 Author: Yi Zhang <[email protected]> AuthorDate: Thu Mar 5 14:04:47 2026 +0800 [hotfix][runtime] Associate job with application after successful JobManagerRunner creation --- .../flink/runtime/dispatcher/Dispatcher.java | 25 ++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 22a3ee2c444..d7cdfb7c417 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -820,7 +820,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> jobName, jobId, application.getApplicationId()); - application.addJob(jobId); } else { // This can only occur in tests for submitJob that submit jobs without an application log.warn("Submitting job '{}' ({}) without associated application.", jobName, jobId); @@ -842,6 +841,13 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> executionPlan.getJobID())); } + private void associateJobWithApplication(JobID jobId, ApplicationID applicationId) { + checkNotNull(applicationId); + checkState(applications.containsKey(applicationId)); + + applications.get(applicationId).addJob(jobId); + } + private Optional<AbstractApplication> getApplicationForJob(ExecutionPlan executionPlan) { return executionPlan .getApplicationId() @@ -887,7 +893,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> private void persistAndRunJob(ExecutionPlan executionPlan) throws Exception { executionPlanWriter.putExecutionPlan(executionPlan); initJobClientExpiredTime(executionPlan); - runJob(createJobMasterRunner(executionPlan), ExecutionType.SUBMISSION); + runJob( + createJobMasterRunner(executionPlan), + ExecutionType.SUBMISSION, + executionPlan.getApplicationId().orElse(null)); } private JobManagerRunner createJobMasterRunner(ExecutionPlan executionPlan) throws Exception { @@ -928,6 +937,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionType) throws Exception { + runJob(jobManagerRunner, executionType, null); + } + + private void runJob( + JobManagerRunner jobManagerRunner, + ExecutionType executionType, + @Nullable ApplicationID applicationId) + throws Exception { jobManagerRunner.start(); jobManagerRunnerRegistry.register(jobManagerRunner); @@ -935,6 +952,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> partialExecutionGraphInfoStore.put(jobId, new CompletableFuture<>()); + if (applicationId != null) { + associateJobWithApplication(jobId, applicationId); + } + final CompletableFuture<CleanupJobState> cleanupJobStateFuture = jobManagerRunner .getResultFuture()
