This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6133aecd7a513236c190b7b47c073a371d1c36c1
Author: Yi Zhang <[email protected]>
AuthorDate: Thu Mar 5 14:04:47 2026 +0800

    [hotfix][runtime] Associate job with application after successful 
JobManagerRunner creation
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 25 ++++++++++++++++++++--
 1 file changed, 23 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 22a3ee2c444..d7cdfb7c417 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -820,7 +820,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                     jobName,
                     jobId,
                     application.getApplicationId());
-            application.addJob(jobId);
         } else {
             // This can only occur in tests for submitJob that submit jobs 
without an application
             log.warn("Submitting job '{}' ({}) without associated 
application.", jobName, jobId);
@@ -842,6 +841,13 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                         executionPlan.getJobID()));
     }
 
+    private void associateJobWithApplication(JobID jobId, ApplicationID 
applicationId) {
+        checkNotNull(applicationId);
+        checkState(applications.containsKey(applicationId));
+
+        applications.get(applicationId).addJob(jobId);
+    }
+
     private Optional<AbstractApplication> getApplicationForJob(ExecutionPlan 
executionPlan) {
         return executionPlan
                 .getApplicationId()
@@ -887,7 +893,10 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     private void persistAndRunJob(ExecutionPlan executionPlan) throws 
Exception {
         executionPlanWriter.putExecutionPlan(executionPlan);
         initJobClientExpiredTime(executionPlan);
-        runJob(createJobMasterRunner(executionPlan), ExecutionType.SUBMISSION);
+        runJob(
+                createJobMasterRunner(executionPlan),
+                ExecutionType.SUBMISSION,
+                executionPlan.getApplicationId().orElse(null));
     }
 
     private JobManagerRunner createJobMasterRunner(ExecutionPlan 
executionPlan) throws Exception {
@@ -928,6 +937,14 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
 
     private void runJob(JobManagerRunner jobManagerRunner, ExecutionType 
executionType)
             throws Exception {
+        runJob(jobManagerRunner, executionType, null);
+    }
+
+    private void runJob(
+            JobManagerRunner jobManagerRunner,
+            ExecutionType executionType,
+            @Nullable ApplicationID applicationId)
+            throws Exception {
         jobManagerRunner.start();
         jobManagerRunnerRegistry.register(jobManagerRunner);
 
@@ -935,6 +952,10 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
 
         partialExecutionGraphInfoStore.put(jobId, new CompletableFuture<>());
 
+        if (applicationId != null) {
+            associateJobWithApplication(jobId, applicationId);
+        }
+
         final CompletableFuture<CleanupJobState> cleanupJobStateFuture =
                 jobManagerRunner
                         .getResultFuture()

Reply via email to