dmvk commented on a change in pull request #16464:
URL: https://github.com/apache/flink/pull/16464#discussion_r668652644



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -254,37 +265,67 @@ private void runApplicationEntryPoint(
                 jobIdsFuture.complete(applicationJobIds);
             }
         } catch (Throwable t) {
-            jobIdsFuture.completeExceptionally(
-                    new ApplicationExecutionException("Could not execute 
application.", t));
+            // If we're running in a single job execution mode, it's safe to 
consider re-submission
+            // of an already finished a success.
+            final Optional<DuplicateJobSubmissionException> maybeDuplicate =
+                    ExceptionUtils.findThrowable(t, 
DuplicateJobSubmissionException.class);
+            if (enforceSingleJobExecution
+                    && maybeDuplicate.isPresent()
+                    && maybeDuplicate.get().isGloballyTerminated()) {
+                final JobID jobId = maybeDuplicate.get().getJobID();
+                tolerateMissingResult.add(jobId);
+                jobIdsFuture.complete(Collections.singletonList(jobId));
+            } else {
+                jobIdsFuture.completeExceptionally(
+                        new ApplicationExecutionException("Could not execute 
application.", t));
+            }
         }
     }
 
     private CompletableFuture<Void> getApplicationResult(
             final DispatcherGateway dispatcherGateway,
             final Collection<JobID> applicationJobIds,
+            final Set<JobID> tolerateMissingResult,
             final ScheduledExecutor executor) {
         final List<CompletableFuture<?>> jobResultFutures =
                 applicationJobIds.stream()
                         .map(
                                 jobId ->
                                         unwrapJobResultException(
-                                                
getJobResult(dispatcherGateway, jobId, executor)))
+                                                getJobResult(
+                                                        dispatcherGateway,
+                                                        jobId,
+                                                        executor,
+                                                        
tolerateMissingResult.contains(jobId))))
                         .collect(Collectors.toList());
         return FutureUtils.waitForAll(jobResultFutures);
     }
 
     private CompletableFuture<JobResult> getJobResult(
             final DispatcherGateway dispatcherGateway,
             final JobID jobId,
-            final ScheduledExecutor scheduledExecutor) {
-
+            final ScheduledExecutor scheduledExecutor,
+            final boolean tolerateMissingResult) {
         final Time timeout =
                 
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
         final Time retryPeriod =
                 
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
-
-        return JobStatusPollingUtils.getJobResult(
-                dispatcherGateway, jobId, scheduledExecutor, timeout, 
retryPeriod);
+        final CompletableFuture<JobResult> jobResultFuture =
+                JobStatusPollingUtils.getJobResult(
+                        dispatcherGateway, jobId, scheduledExecutor, timeout, 
retryPeriod);
+        if (tolerateMissingResult) {
+            // Return "unknown" job result if dispatcher no longer knows the 
actual result.
+            return FutureUtils.handleException(
+                    jobResultFuture,
+                    FlinkJobNotFoundException.class,
+                    exception ->
+                            new JobResult.Builder()
+                                    .jobId(jobId)
+                                    
.applicationStatus(ApplicationStatus.UNKNOWN)

Review comment:
       Hmm, this is tricky. Right now I think we're just failing call to 
`execute()` with DuplicateJobSubmission, so there should be no more calls from 
client code. We're basically just ignoring that exception in 
ApplicationDispatcherBoostrap so cluster can properly cleanup.
   
   Letting user to intetract with the result would probably require some 
changes to EmbeddedJobClient 🤔 I need to give a closer look, if there is any 
way we can move the duplication handling there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to