eemario commented on code in PR #27686:
URL: https://github.com/apache/flink/pull/27686#discussion_r2871993722


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java:
##########
@@ -68,13 +69,22 @@ public class JobResult implements Serializable {
     /** Stores the cause of the job failure, or {@code null} if the job 
finished successfully. */
     @Nullable private final SerializedThrowable serializedThrowable;
 
+    @Nullable private final ApplicationID applicationId;

Review Comment:
   Updated rest_v1_dispatcher.yml to reflect the change.



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java:
##########
@@ -573,12 +584,12 @@ private void runApplicationEntryPoint(
                     getApplicationId(),
                     getAllRecoveredJobInfos());
 
-            if (applicationJobIds.isEmpty()) {
+            if (submittedJobIds.isEmpty()) {

Review Comment:
   Yes, the list contains all jobs. The variable has beed renamed back to 
`applicationJobIds` and explanatory comments have been added.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -685,12 +739,35 @@ public void notifyApplicationStatusChange(
                     "Application %s does not exist.",
                     applicationId);
             checkState(
-                    !applicationArchivingFutures.containsKey(applicationId),
-                    "The application (" + applicationId + ") has already been 
archived.");
+                    !applicationTerminationFutures.get(applicationId).isDone(),
+                    "The application (" + applicationId + ") has already 
terminated.");
+
+            AbstractApplication application = applications.get(applicationId);
+            Set<JobID> remainingRecoveredJobIds =

Review Comment:
   Updated the variable name.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -664,11 +713,16 @@ private CompletableFuture<Acknowledge> 
internalSubmitApplication(
         log.info("Submitting application '{}' ({}).", application.getName(), 
applicationId);
 
         applications.put(applicationId, application);
-        Set<JobID> jobs = recoveredApplicationJobIds.remove(applicationId);
-        if (jobs != null) {
-            jobs.forEach(application::addJob);
-        }
         application.registerStatusListener(this);
+        applicationTerminationFutures.put(applicationId, new 
CompletableFuture<>());
+
+        // cleanup dirty job results for the application

Review Comment:
   Perform cleanup during application submission to ensure that the JobClient 
is available when the application execution skips resubmitting 
already-terminated jobs and retrieve the JobClient directly. The comments have 
been updated for clarity.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -624,6 +648,31 @@ public CompletableFuture<Acknowledge> 
submitJob(ExecutionPlan executionPlan, Dur
                         getMainThreadExecutor(jobID));
     }
 
+    @Override
+    public CompletableFuture<Acknowledge> recoverJob(JobID jobId, Duration 
timeout) {
+        try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+            log.info("Received job recovery request for job {}.", jobId);
+        }
+        final ExecutionPlan executionPlan = recoveredJobs.remove(jobId);
+        if (executionPlan == null) {
+            return FutureUtils.completedExceptionally(
+                    new JobSubmissionException(jobId, "Cannot find the 
recovered job."));
+        }
+
+        final ApplicationID applicationId = 
executionPlan.getApplicationId().orElse(null);
+        checkState(recoveredJobIdsByApplicationId.containsKey(applicationId));
+
+        runRecoveredJob(executionPlan, false);
+
+        Set<JobID> jobIds = recoveredJobIdsByApplicationId.get(applicationId);

Review Comment:
   Updated the variable names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to