dmvk commented on a change in pull request #16464:
URL: https://github.com/apache/flink/pull/16464#discussion_r668689540



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -254,37 +265,67 @@ private void runApplicationEntryPoint(
                 jobIdsFuture.complete(applicationJobIds);
             }
         } catch (Throwable t) {
-            jobIdsFuture.completeExceptionally(
-                    new ApplicationExecutionException("Could not execute 
application.", t));
+            // If we're running in a single job execution mode, it's safe to 
consider re-submission
+            // of an already finished a success.
+            final Optional<DuplicateJobSubmissionException> maybeDuplicate =
+                    ExceptionUtils.findThrowable(t, 
DuplicateJobSubmissionException.class);
+            if (enforceSingleJobExecution
+                    && maybeDuplicate.isPresent()
+                    && maybeDuplicate.get().isGloballyTerminated()) {
+                final JobID jobId = maybeDuplicate.get().getJobID();
+                tolerateMissingResult.add(jobId);
+                jobIdsFuture.complete(Collections.singletonList(jobId));
+            } else {
+                jobIdsFuture.completeExceptionally(
+                        new ApplicationExecutionException("Could not execute 
application.", t));
+            }
         }
     }
 
     private CompletableFuture<Void> getApplicationResult(
             final DispatcherGateway dispatcherGateway,
             final Collection<JobID> applicationJobIds,
+            final Set<JobID> tolerateMissingResult,
             final ScheduledExecutor executor) {
         final List<CompletableFuture<?>> jobResultFutures =
                 applicationJobIds.stream()
                         .map(
                                 jobId ->
                                         unwrapJobResultException(
-                                                
getJobResult(dispatcherGateway, jobId, executor)))
+                                                getJobResult(
+                                                        dispatcherGateway,
+                                                        jobId,
+                                                        executor,
+                                                        
tolerateMissingResult.contains(jobId))))
                         .collect(Collectors.toList());
         return FutureUtils.waitForAll(jobResultFutures);
     }
 
     private CompletableFuture<JobResult> getJobResult(
             final DispatcherGateway dispatcherGateway,
             final JobID jobId,
-            final ScheduledExecutor scheduledExecutor) {
-
+            final ScheduledExecutor scheduledExecutor,
+            final boolean tolerateMissingResult) {
         final Time timeout =
                 
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
         final Time retryPeriod =
                 
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
-
-        return JobStatusPollingUtils.getJobResult(
-                dispatcherGateway, jobId, scheduledExecutor, timeout, 
retryPeriod);
+        final CompletableFuture<JobResult> jobResultFuture =
+                JobStatusPollingUtils.getJobResult(
+                        dispatcherGateway, jobId, scheduledExecutor, timeout, 
retryPeriod);
+        if (tolerateMissingResult) {
+            // Return "unknown" job result if dispatcher no longer knows the 
actual result.
+            return FutureUtils.handleException(
+                    jobResultFuture,
+                    FlinkJobNotFoundException.class,
+                    exception ->
+                            new JobResult.Builder()
+                                    .jobId(jobId)
+                                    
.applicationStatus(ApplicationStatus.UNKNOWN)

Review comment:
       Sum up from an offline discussion with Till. We can live this limitation 
for now, until we solve 
[FLINK-11813](https://issues.apache.org/jira/browse/FLINK-11813). We'll 
document the limitation in the release notes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to