tillrohrmann commented on a change in pull request #16464:
URL: https://github.com/apache/flink/pull/16464#discussion_r668621316



##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -650,6 +657,105 @@ public void 
testClusterDoesNOTShutdownWhenApplicationStatusUknown() throws Excep
         assertEquals(exception.getStatus(), ApplicationStatus.UNKNOWN);
     }
 
+    @Test
+    public void testDuplicateJobSubmissionWithTerminatedJobId() throws 
Throwable {
+        final JobID testJobID = new JobID(0, 2);
+        final Configuration configurationUnderTest = getConfiguration();
+        configurationUnderTest.set(
+                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
+        configurationUnderTest.set(
+                HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                new TestingDispatcherGateway.Builder()
+                        .setSubmitFunction(
+                                jobGraph -> {
+                                    final CompletableFuture<Acknowledge> 
submit =
+                                            new CompletableFuture<>();
+                                    submit.completeExceptionally(
+                                            
DuplicateJobSubmissionException.ofGloballyTerminated(
+                                                    testJobID));
+                                    return submit;
+                                })
+                        .setRequestJobStatusFunction(
+                                jobId -> 
CompletableFuture.completedFuture(JobStatus.FINISHED))
+                        .setRequestJobResultFunction(
+                                jobId ->
+                                        CompletableFuture.completedFuture(
+                                                
createSuccessfulJobResult(jobId)));
+        final CompletableFuture<Void> applicationFuture =
+                runApplication(dispatcherBuilder, configurationUnderTest, 1);
+        applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    /**
+     * In this scenario, job result is no longer present in the {@link
+     * org.apache.flink.runtime.dispatcher.Dispatcher dispatcher} (job has 
terminated and job
+     * manager failed over), but we know that job has already terminated from 
{@link
+     * org.apache.flink.runtime.highavailability.RunningJobsRegistry running 
jobs registry}.
+     */
+    @Test
+    public void 
testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResult() throws 
Throwable {
+        final JobID testJobID = new JobID(0, 2);
+        final Configuration configurationUnderTest = getConfiguration();
+        configurationUnderTest.set(
+                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
+        configurationUnderTest.set(
+                HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                new TestingDispatcherGateway.Builder()
+                        .setSubmitFunction(
+                                jobGraph -> {
+                                    final CompletableFuture<Acknowledge> 
submit =
+                                            new CompletableFuture<>();
+                                    submit.completeExceptionally(
+                                            
DuplicateJobSubmissionException.ofGloballyTerminated(
+                                                    testJobID));
+                                    return submit;
+                                })
+                        .setRequestJobStatusFunction(
+                                jobId ->
+                                        FutureUtils.completedExceptionally(
+                                                new 
FlinkJobNotFoundException(jobId)))
+                        .setRequestJobResultFunction(
+                                jobId ->
+                                        FutureUtils.completedExceptionally(
+                                                new 
FlinkJobNotFoundException(jobId)));
+        final CompletableFuture<Void> applicationFuture =
+                runApplication(dispatcherBuilder, configurationUnderTest, 1);
+        applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testDuplicateJobSubmissionWithRunningJobId() throws Throwable {
+        final JobID testJobID = new JobID(0, 2);
+        final Configuration configurationUnderTest = getConfiguration();
+        configurationUnderTest.set(
+                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
+        configurationUnderTest.set(
+                HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                new TestingDispatcherGateway.Builder()
+                        .setSubmitFunction(
+                                jobGraph -> {
+                                    final CompletableFuture<Acknowledge> 
submit =
+                                            new CompletableFuture<>();
+                                    submit.completeExceptionally(
+                                            
DuplicateJobSubmissionException.of(testJobID));
+                                    return submit;

Review comment:
       ```suggestion
                                       return 
FutureUtils.completedExceptionally(
                                               
DuplicateJobSubmissionException.of(testJobID));
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
##########
@@ -1261,6 +1261,43 @@ public static void 
assertNoException(CompletableFuture<?> completableFuture) {
         handleUncaughtException(completableFuture, 
FatalExitExceptionHandler.INSTANCE);
     }
 
+    /**
+     * Checks that the given {@link CompletableFuture} is not completed 
exceptionally with the
+     * specified class. If the future is completed exceptionally with the 
specific class, then try
+     * to recover using a given exception handler. If the exception does not 
match the specified
+     * class, just pass it through to later stages.
+     *
+     * @param completableFuture to assert for a given exception
+     * @param exceptionClass exception class to assert for
+     * @param exceptionHandler to call if the future is completed 
exceptionally with the specific
+     *     exception
+     * @return completable future, that can recover from a specified exception
+     */
+    public static <T, E extends Throwable> CompletableFuture<T> 
handleException(
+            CompletableFuture<T> completableFuture,
+            Class<E> exceptionClass,
+            Function<E, T> exceptionHandler) {

Review comment:
       For `T` it might not make a huge difference but for `E` it can make a 
difference. That's why I would suggest to change the signature to
   
   ```
   CompletableFuture<? extends T> completableFuture,
   Class<E> exceptionClass,
   Function<? super E, ? extends T> exceptionHandler
   ```

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -254,37 +265,67 @@ private void runApplicationEntryPoint(
                 jobIdsFuture.complete(applicationJobIds);
             }
         } catch (Throwable t) {
-            jobIdsFuture.completeExceptionally(
-                    new ApplicationExecutionException("Could not execute 
application.", t));
+            // If we're running in a single job execution mode, it's safe to 
consider re-submission
+            // of an already finished a success.
+            final Optional<DuplicateJobSubmissionException> maybeDuplicate =
+                    ExceptionUtils.findThrowable(t, 
DuplicateJobSubmissionException.class);
+            if (enforceSingleJobExecution
+                    && maybeDuplicate.isPresent()
+                    && maybeDuplicate.get().isGloballyTerminated()) {
+                final JobID jobId = maybeDuplicate.get().getJobID();
+                tolerateMissingResult.add(jobId);
+                jobIdsFuture.complete(Collections.singletonList(jobId));
+            } else {
+                jobIdsFuture.completeExceptionally(
+                        new ApplicationExecutionException("Could not execute 
application.", t));
+            }
         }
     }
 
     private CompletableFuture<Void> getApplicationResult(
             final DispatcherGateway dispatcherGateway,
             final Collection<JobID> applicationJobIds,
+            final Set<JobID> tolerateMissingResult,
             final ScheduledExecutor executor) {
         final List<CompletableFuture<?>> jobResultFutures =
                 applicationJobIds.stream()
                         .map(
                                 jobId ->
                                         unwrapJobResultException(
-                                                
getJobResult(dispatcherGateway, jobId, executor)))
+                                                getJobResult(
+                                                        dispatcherGateway,
+                                                        jobId,
+                                                        executor,
+                                                        
tolerateMissingResult.contains(jobId))))
                         .collect(Collectors.toList());
         return FutureUtils.waitForAll(jobResultFutures);
     }
 
     private CompletableFuture<JobResult> getJobResult(
             final DispatcherGateway dispatcherGateway,
             final JobID jobId,
-            final ScheduledExecutor scheduledExecutor) {
-
+            final ScheduledExecutor scheduledExecutor,
+            final boolean tolerateMissingResult) {
         final Time timeout =
                 
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
         final Time retryPeriod =
                 
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
-
-        return JobStatusPollingUtils.getJobResult(
-                dispatcherGateway, jobId, scheduledExecutor, timeout, 
retryPeriod);
+        final CompletableFuture<JobResult> jobResultFuture =
+                JobStatusPollingUtils.getJobResult(
+                        dispatcherGateway, jobId, scheduledExecutor, timeout, 
retryPeriod);
+        if (tolerateMissingResult) {
+            // Return "unknown" job result if dispatcher no longer knows the 
actual result.
+            return FutureUtils.handleException(
+                    jobResultFuture,
+                    FlinkJobNotFoundException.class,
+                    exception ->
+                            new JobResult.Builder()
+                                    .jobId(jobId)
+                                    
.applicationStatus(ApplicationStatus.UNKNOWN)

Review comment:
       Also, we might wanna prevent that the user can call 
`JobExecutionResult.getAccumulatorResult`. I would suggest that we fail if such 
a call happens.

##########
File path: 
flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
##########
@@ -1261,6 +1261,43 @@ public static void 
assertNoException(CompletableFuture<?> completableFuture) {
         handleUncaughtException(completableFuture, 
FatalExitExceptionHandler.INSTANCE);
     }
 
+    /**
+     * Checks that the given {@link CompletableFuture} is not completed 
exceptionally with the
+     * specified class. If the future is completed exceptionally with the 
specific class, then try
+     * to recover using a given exception handler. If the exception does not 
match the specified
+     * class, just pass it through to later stages.
+     *
+     * @param completableFuture to assert for a given exception
+     * @param exceptionClass exception class to assert for
+     * @param exceptionHandler to call if the future is completed 
exceptionally with the specific
+     *     exception
+     * @return completable future, that can recover from a specified exception
+     */
+    public static <T, E extends Throwable> CompletableFuture<T> 
handleException(
+            CompletableFuture<T> completableFuture,
+            Class<E> exceptionClass,
+            Function<E, T> exceptionHandler) {
+        final CompletableFuture<T> handledFuture = new CompletableFuture<>();
+        checkNotNull(completableFuture)
+                .whenComplete(
+                        (result, throwable) -> {
+                            if (throwable == null) {
+                                handledFuture.complete(result);
+                            } else if 
(exceptionClass.isAssignableFrom(throwable.getClass())) {
+                                @SuppressWarnings("unchecked")
+                                final E exception = (E) throwable;

Review comment:
       ```suggestion
                                   final E cast = 
exceptionClass.cast(throwable);

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/DuplicateJobSubmissionException.java
##########
@@ -25,7 +25,27 @@
  */
 public class DuplicateJobSubmissionException extends JobSubmissionException {
 
-    public DuplicateJobSubmissionException(JobID jobID) {
+    public static DuplicateJobSubmissionException of(JobID jobId) {
+        return new DuplicateJobSubmissionException(jobId, false);
+    }
+
+    public static DuplicateJobSubmissionException ofGloballyTerminated(JobID 
jobId) {
+        return new DuplicateJobSubmissionException(jobId, true);
+    }

Review comment:
       I like this solution :-)

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
##########
@@ -824,6 +825,63 @@ private void testFutureContinuation(
         Assert.assertTrue(continuationFuture.isDone());
     }
 
+    @Test
+    public void testHandleExceptionWithCompletedFuture() {
+        final CompletableFuture<String> future = 
CompletableFuture.completedFuture("foobar");
+        final CompletableFuture<String> handled =
+                FutureUtils.handleException(future, Exception.class, exception 
-> "handled");
+        assertEquals("foobar", handled.join());
+    }
+
+    @Test
+    public void testHandleExceptionWithNormalCompletion() {
+        final CompletableFuture<String> future = new CompletableFuture<>();
+        final CompletableFuture<String> handled =
+                FutureUtils.handleException(future, Exception.class, exception 
-> "handled");
+        future.complete("foobar");
+        assertEquals("foobar", handled.join());
+    }
+
+    @Test
+    public void testHandleExceptionWithMatchingExceptionallyCompletedFuture() {
+        final CompletableFuture<String> future = new CompletableFuture<>();
+        final CompletableFuture<String> handled =
+                FutureUtils.handleException(
+                        future, UnsupportedOperationException.class, exception 
-> "handled");
+        future.completeExceptionally(new 
UnsupportedOperationException("foobar"));
+        assertEquals("handled", handled.join());
+    }
+
+    @Test
+    public void 
testHandleExceptionWithNotMatchingExceptionallyCompletedFuture() {
+        final CompletableFuture<String> future = new CompletableFuture<>();
+        final CompletableFuture<String> handled =
+                FutureUtils.handleException(
+                        future, UnsupportedOperationException.class, exception 
-> "handled");
+        future.completeExceptionally(new IllegalArgumentException("foobar"));
+        final CompletionException completionException =
+                assertThrows(CompletionException.class, handled::join);
+        assertTrue(completionException.getCause() instanceof 
IllegalArgumentException);
+        assertEquals("foobar", completionException.getCause().getMessage());

Review comment:
       Might be simplified to `assertEquals(completionException.getCause(), 
failureCause)`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
##########
@@ -824,6 +825,63 @@ private void testFutureContinuation(
         Assert.assertTrue(continuationFuture.isDone());
     }
 
+    @Test
+    public void testHandleExceptionWithCompletedFuture() {
+        final CompletableFuture<String> future = 
CompletableFuture.completedFuture("foobar");
+        final CompletableFuture<String> handled =
+                FutureUtils.handleException(future, Exception.class, exception 
-> "handled");
+        assertEquals("foobar", handled.join());
+    }
+
+    @Test
+    public void testHandleExceptionWithNormalCompletion() {
+        final CompletableFuture<String> future = new CompletableFuture<>();
+        final CompletableFuture<String> handled =
+                FutureUtils.handleException(future, Exception.class, exception 
-> "handled");
+        future.complete("foobar");
+        assertEquals("foobar", handled.join());
+    }
+
+    @Test
+    public void testHandleExceptionWithMatchingExceptionallyCompletedFuture() {
+        final CompletableFuture<String> future = new CompletableFuture<>();
+        final CompletableFuture<String> handled =
+                FutureUtils.handleException(
+                        future, UnsupportedOperationException.class, exception 
-> "handled");
+        future.completeExceptionally(new 
UnsupportedOperationException("foobar"));
+        assertEquals("handled", handled.join());
+    }
+
+    @Test
+    public void 
testHandleExceptionWithNotMatchingExceptionallyCompletedFuture() {
+        final CompletableFuture<String> future = new CompletableFuture<>();
+        final CompletableFuture<String> handled =
+                FutureUtils.handleException(
+                        future, UnsupportedOperationException.class, exception 
-> "handled");
+        future.completeExceptionally(new IllegalArgumentException("foobar"));
+        final CompletionException completionException =
+                assertThrows(CompletionException.class, handled::join);
+        assertTrue(completionException.getCause() instanceof 
IllegalArgumentException);
+        assertEquals("foobar", completionException.getCause().getMessage());

Review comment:
       Alternatively, we could think about adding a `Matcher` to 
`FlinkMatchers` that checks the type and message of an exception.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -254,37 +265,67 @@ private void runApplicationEntryPoint(
                 jobIdsFuture.complete(applicationJobIds);
             }
         } catch (Throwable t) {
-            jobIdsFuture.completeExceptionally(
-                    new ApplicationExecutionException("Could not execute 
application.", t));
+            // If we're running in a single job execution mode, it's safe to 
consider re-submission
+            // of an already finished a success.
+            final Optional<DuplicateJobSubmissionException> maybeDuplicate =
+                    ExceptionUtils.findThrowable(t, 
DuplicateJobSubmissionException.class);
+            if (enforceSingleJobExecution
+                    && maybeDuplicate.isPresent()
+                    && maybeDuplicate.get().isGloballyTerminated()) {
+                final JobID jobId = maybeDuplicate.get().getJobID();
+                tolerateMissingResult.add(jobId);
+                jobIdsFuture.complete(Collections.singletonList(jobId));
+            } else {
+                jobIdsFuture.completeExceptionally(
+                        new ApplicationExecutionException("Could not execute 
application.", t));
+            }
         }
     }
 
     private CompletableFuture<Void> getApplicationResult(
             final DispatcherGateway dispatcherGateway,
             final Collection<JobID> applicationJobIds,
+            final Set<JobID> tolerateMissingResult,
             final ScheduledExecutor executor) {
         final List<CompletableFuture<?>> jobResultFutures =
                 applicationJobIds.stream()
                         .map(
                                 jobId ->
                                         unwrapJobResultException(
-                                                
getJobResult(dispatcherGateway, jobId, executor)))
+                                                getJobResult(
+                                                        dispatcherGateway,
+                                                        jobId,
+                                                        executor,
+                                                        
tolerateMissingResult.contains(jobId))))
                         .collect(Collectors.toList());
         return FutureUtils.waitForAll(jobResultFutures);
     }
 
     private CompletableFuture<JobResult> getJobResult(
             final DispatcherGateway dispatcherGateway,
             final JobID jobId,
-            final ScheduledExecutor scheduledExecutor) {
-
+            final ScheduledExecutor scheduledExecutor,
+            final boolean tolerateMissingResult) {
         final Time timeout =
                 
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
         final Time retryPeriod =
                 
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
-
-        return JobStatusPollingUtils.getJobResult(
-                dispatcherGateway, jobId, scheduledExecutor, timeout, 
retryPeriod);
+        final CompletableFuture<JobResult> jobResultFuture =
+                JobStatusPollingUtils.getJobResult(
+                        dispatcherGateway, jobId, scheduledExecutor, timeout, 
retryPeriod);
+        if (tolerateMissingResult) {
+            // Return "unknown" job result if dispatcher no longer knows the 
actual result.
+            return FutureUtils.handleException(
+                    jobResultFuture,
+                    FlinkJobNotFoundException.class,
+                    exception ->
+                            new JobResult.Builder()
+                                    .jobId(jobId)
+                                    
.applicationStatus(ApplicationStatus.UNKNOWN)

Review comment:
       I think an `ApplicationStatus.UNKNOWN` will result in a 
`JobExecutionException` when calling `JobResult.toJobExecutionResult`. I think 
this will fail the client when calling 
`StreamingExecutionEnvironment.execute()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to