zhuzhurk commented on code in PR #27362:
URL: https://github.com/apache/flink/pull/27362#discussion_r2647889934


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -691,6 +716,19 @@ private CompletableFuture<Acknowledge> 
internalSubmitJob(ExecutionPlan execution
                                         executionPlan.getJobID()));
     }
 
+    private Optional<AbstractApplication> getApplicationForJob(ExecutionPlan 
executionPlan) {

Review Comment:
   I guess this method should be used in `internalSubmitJob`?



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##########
@@ -211,8 +212,15 @@ private static CompletableFuture<JobID> submitJob(
                                 throw new CompletionException(e);
                             }
 
-                            return dispatcherGateway.submitJob(streamGraph, 
rpcTimeout);
+                            return internalSubmit(dispatcherGateway, 
streamGraph, rpcTimeout);
                         })
                 .thenApply(ack -> streamGraph.getJobID());
     }
+
+    protected CompletableFuture<Acknowledge> internalSubmit(

Review Comment:
   can be package private



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1036,7 +1086,23 @@ private CompletableFuture<ArchivedApplication> 
requestApplication(
 
         List<CompletableFuture<JobDetails>> jobDetailsFutures =
                 application.getJobs().stream()
-                        .map(jobId -> requestJobDetails(jobId, timeout))
+                        .map(
+                                jobId ->
+                                        requestJobDetails(jobId, timeout)
+                                                .exceptionally(
+                                                        t -> {
+                                                            if (t
+                                                                    instanceof
+                                                                    
FlinkJobNotFoundException) {
+                                                                log.warn(
+                                                                        
"Ignore job {} which may be expired when requesting application {}",

Review Comment:
   It's weird that a job is expired in the history while its hosting 
application is not expired.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -728,6 +766,17 @@ private void persistAndRunJob(ExecutionPlan executionPlan) 
throws Exception {
 
     private JobManagerRunner createJobMasterRunner(ExecutionPlan 
executionPlan) throws Exception {
         
checkState(!jobManagerRunnerRegistry.isRegistered(executionPlan.getJobID()));
+
+        JobStatusListener singleJobApplication = null;
+        Optional<AbstractApplication> optionalApplication =
+                executionPlan.getApplicationId().map(applications::get);
+        if (optionalApplication.isPresent()) {
+            AbstractApplication application = optionalApplication.get();
+            if (application instanceof JobStatusListener) {

Review Comment:
   I think the clearer logic should be: if an application is a 
`SingleJobApplication`, it needs be passed to the jobManager as a 
JobStatusListener.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java:
##########
@@ -45,6 +46,7 @@ JobManagerRunner createJobManagerRunner(
             JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
             FatalErrorHandler fatalErrorHandler,
             Collection<FailureEnricher> failureEnrichers,
+            JobStatusListener singleJobApplication,

Review Comment:
   singleJobApplication -> jobStatusListener
   
   The handler should simple treat it as a  JobStatusListener instead of a 
SingleJobApplication.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -418,27 +424,31 @@ private static void assertRecoveredJobsAndDirtyJobResults(
                 "There should be no overlap between the recovered 
ExecutionPlans and the passed dirty JobResults based on their job ID.");
     }
 
-    private void startRecoveredJobs() {
+    private void startRecoveredJobs(boolean wrapIntoApplication) {
         for (ExecutionPlan recoveredJob : recoveredJobs) {
-            runRecoveredJob(recoveredJob);
+            runRecoveredJob(recoveredJob, wrapIntoApplication);
         }
         recoveredJobs.clear();
     }
 
-    private void runRecoveredJob(final ExecutionPlan recoveredJob) {
+    private void runRecoveredJob(
+            final ExecutionPlan recoveredJob, final boolean 
wrapIntoApplication) {

Review Comment:
   Could you add some comments to explain the behavior around with regard to 
`wrapIntoApplication`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to