tillrohrmann commented on a change in pull request #13699:
URL: https://github.com/apache/flink/pull/13699#discussion_r510912135



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -124,62 +125,64 @@ public void stop() {
                return applicationExecutionTask;
        }
 
-       /**
-        * Runs the user program entrypoint using {@link 
#runApplicationAsync(DispatcherGateway,
-        * ScheduledExecutor, boolean)} and shuts down the given dispatcher 
when the application
-        * completes (either successfully or in case of failure).
-        */
        @VisibleForTesting
-       CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
-                       final DispatcherGateway dispatcher,
-                       final ScheduledExecutor scheduledExecutor) {
+       CompletableFuture<Void> getApplicationCompletionFuture() {
+               return applicationCompletionFuture;
+       }
 
-               applicationCompletionFuture = 
fixJobIdAndRunApplicationAsync(dispatcher, scheduledExecutor);
+       @VisibleForTesting
+       CompletableFuture<Acknowledge> getClusterShutdownFuture() {
+               return clusterShutdownFuture;
+       }
 
+       /**
+        * Runs the user program entrypoint and shuts down the given 
dispatcherGateway when
+        * the application completes (either successfully or in case of 
failure).
+        */
+       private CompletableFuture<Acknowledge> 
runApplicationAndShutdownClusterAsync(final DispatcherGateway 
dispatcherGateway) {
                return applicationCompletionFuture
                                .handle((r, t) -> {
-                                       final ApplicationStatus 
applicationStatus;
-                                       if (t != null) {
-
-                                               final 
Optional<JobCancellationException> cancellationException =
-                                                               
ExceptionUtils.findThrowable(t, JobCancellationException.class);
-
-                                               if 
(cancellationException.isPresent()) {
-                                                       // this means the Flink 
Job was cancelled
-                                                       applicationStatus = 
ApplicationStatus.CANCELED;
-                                               } else if (t instanceof 
CancellationException) {
-                                                       // this means that the 
future was cancelled
-                                                       applicationStatus = 
ApplicationStatus.UNKNOWN;
-                                               } else {
-                                                       applicationStatus = 
ApplicationStatus.FAILED;
-                                               }
 
-                                               LOG.warn("Application {}: ", 
applicationStatus, t);
-                                       } else {
-                                               applicationStatus = 
ApplicationStatus.SUCCEEDED;
+                                       if (t == null) {
                                                LOG.info("Application completed 
SUCCESSFULLY");
+                                               return 
dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+                                       }
+
+                                       final 
Optional<ApplicationFailureException> exception =
+                                                       
ExceptionUtils.findThrowable(t, ApplicationFailureException.class);
+
+                                       if (exception.isPresent()) {
+                                               final ApplicationStatus 
applicationStatus = exception.get().getStatus();
+
+                                               if (applicationStatus == 
ApplicationStatus.CANCELED || applicationStatus == ApplicationStatus.FAILED) {
+                                                       LOG.info("Application 
{}: ", applicationStatus, t);
+                                                       return 
dispatcherGateway.shutDownCluster(applicationStatus);
+                                               }
                                        }
-                                       return 
dispatcher.shutDownCluster(applicationStatus);
+
+                                       LOG.warn("Exiting with Application 
Status UNKNOWN: ", t);

Review comment:
       Maybe 
   ```suggestion
                                        LOG.warn("Application failed 
unexpectedly: ", t);
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -301,69 +306,132 @@ public void 
testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
                                .setSubmitFunction(jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
                                .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.RUNNING));
 
-               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+               // we're "listening" on this to be completed to verify that the 
error handler is called.
+               // In production, this will shut down the cluster with an 
exception.
+               final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+               final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               3, dispatcherBuilder.build(), 
scheduledExecutor, errorHandlerFuture::completeExceptionally);
 
-               final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
 
                ScheduledFuture<?> applicationExecutionFuture = 
bootstrap.getApplicationExecutionFuture();
 
                bootstrap.stop();
 
-               // wait until the bootstrap "thinks" it's done
-               shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+               // we call the error handler
+               assertException(errorHandlerFuture, 
CancellationException.class);
+
+               // we return a future that is completed exceptionally
+               assertException(shutdownFuture, CancellationException.class);
 
                // verify that the application task is being cancelled
                assertThat(applicationExecutionFuture.isCancelled(), is(true));
        }
 
        @Test
-       public void testClusterShutdownWhenStoppingBootstrap() throws Exception 
{
+       public void testErrorHandlerIsCalledWhenStoppingBootstrap() throws 
Exception {
+               final TestingDispatcherGateway.Builder dispatcherBuilder = new 
TestingDispatcherGateway.Builder()
+                               .setSubmitFunction(jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
+                               .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.RUNNING));
+
+               // we're "listening" on this to be completed to verify that the 
error handler is called.
+               // In production, this will shut down the cluster with an 
exception.
+               final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+               final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               2, dispatcherBuilder.build(), 
scheduledExecutor, errorHandlerFuture::completeExceptionally);
+
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
+
+               bootstrap.stop();
+
+               // we call the error handler
+               assertException(errorHandlerFuture, 
CancellationException.class);
+
+               // we return a future that is completed exceptionally
+               assertException(shutdownFuture, CancellationException.class);
+       }
+
+       @Test
+       public void testErrorHandlerIsCalledWhenSubmissionFails() throws 
Exception {
+               final Tuple1<Boolean> clusterShutdown = new Tuple1<>(false);
+               final TestingDispatcherGateway.Builder dispatcherBuilder = new 
TestingDispatcherGateway.Builder()
+                               .setSubmitFunction(jobGraph -> {
+                                       throw new 
FlinkRuntimeException("Nope!");
+                               })
+                               .setClusterShutdownFunction(status -> {
+                                       if (clusterShutdown.f0) {
+                                               throw new 
FlinkRuntimeException("This should be called only once");
+                                       }
+                                       clusterShutdown.f0 = true;
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               });
+
+               // we're "listening" on this to be completed to verify that the 
error handler is called.
+               // In production, this will shut down the cluster with an 
exception.
+               final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+               final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
+               final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               3, dispatcherGateway, scheduledExecutor, 
errorHandlerFuture::completeExceptionally);
+
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
+
+               // we call the error handler
+               assertException(errorHandlerFuture, 
ApplicationExecutionException.class);
+
+               // we return a future that is completed exceptionally
+               assertException(shutdownFuture, 
ApplicationExecutionException.class);
+
+               assertFalse(clusterShutdown.f0);
+               dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+               assertTrue(clusterShutdown.f0);

Review comment:
       Why is this necessary? Couldn't we write the test w/o it and then 
directly fail if `shutDownCluster` is being called?

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -373,57 +441,61 @@ public void testClusterShutdownWhenSubmissionFails() 
throws Exception {
        }
 
        @Test
-       public void testClusterShutdownWhenApplicationSucceeds() throws 
Exception {
+       public void testClusterShutdownWhenApplicationGetsCancelled() throws 
Exception {
                // we're "listening" on this to be completed to verify that the 
cluster
                // is being shut down from the ApplicationDispatcherBootstrap
                final CompletableFuture<ApplicationStatus> 
externalShutdownFuture = new CompletableFuture<>();
 
                final TestingDispatcherGateway.Builder dispatcherBuilder = new 
TestingDispatcherGateway.Builder()
                                .setSubmitFunction(jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
-                               .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.FINISHED))
-                               .setRequestJobResultFunction(jobId -> 
CompletableFuture.completedFuture(createSuccessfulJobResult(jobId)))
+                               .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.CANCELED))
+                               .setRequestJobResultFunction(jobId -> 
CompletableFuture.completedFuture(createCancelledJobResult(jobId)))
                                .setClusterShutdownFunction((status) -> {
                                        externalShutdownFuture.complete(status);
                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
                                });
 
-               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+               ApplicationDispatcherBootstrap bootstrap =
+                               createApplicationDispatcherBootstrap(3, 
dispatcherBuilder.build(), scheduledExecutor);
 
-               final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
 
                // wait until the bootstrap "thinks" it's done
                shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
                // verify that the dispatcher is actually being shut down
-               assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS), is(ApplicationStatus.SUCCEEDED));
+               assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS), is(ApplicationStatus.CANCELED));
        }
 
        @Test
-       public void testClusterShutdownWhenApplicationFails() throws Exception {
+       public void testClusterDoesNOTShutdownWhenApplicationStatusUknown() 
throws Exception {
                // we're "listening" on this to be completed to verify that the 
cluster
                // is being shut down from the ApplicationDispatcherBootstrap
-               final CompletableFuture<ApplicationStatus> 
externalShutdownFuture = new CompletableFuture<>();
-
+               final Tuple1<Boolean> clusterShutdown = new Tuple1<>(false);
                final TestingDispatcherGateway.Builder dispatcherBuilder = new 
TestingDispatcherGateway.Builder()
                                .setSubmitFunction(jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
                                .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.FAILED))
-                               .setRequestJobResultFunction(jobId -> 
CompletableFuture.completedFuture(createFailedJobResult(jobId)))
-                               .setClusterShutdownFunction((status) -> {
-                                       externalShutdownFuture.complete(status);
+                               .setRequestJobResultFunction(jobId -> 
CompletableFuture.completedFuture(createUnknownJobResult(jobId)))
+                               .setClusterShutdownFunction(status -> {
+                                       if (clusterShutdown.f0) {
+                                               throw new 
FlinkRuntimeException("This should be called only once");
+                                       }
+                                       clusterShutdown.f0 = true;
                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
                                });
 
-               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+               final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
+               final ApplicationDispatcherBootstrap bootstrap =
+                               createApplicationDispatcherBootstrap(3, 
dispatcherGateway, scheduledExecutor);
 
-               final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+               final CompletableFuture<Acknowledge> applicationFuture = 
bootstrap.getClusterShutdownFuture();
 
-               // wait until the bootstrap "thinks" it's done
-               shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+               final ApplicationFailureException exception = 
assertException(applicationFuture, ApplicationFailureException.class);
+               assertEquals(exception.getStatus(), ApplicationStatus.UNKNOWN);
 
-               // verify that the dispatcher is actually being shut down
-               assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS), is(ApplicationStatus.FAILED));
+               assertFalse(clusterShutdown.f0);
+               dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+               assertTrue(clusterShutdown.f0);

Review comment:
       Same here, why do we need this?

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -301,69 +306,132 @@ public void 
testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
                                .setSubmitFunction(jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
                                .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.RUNNING));
 
-               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+               // we're "listening" on this to be completed to verify that the 
error handler is called.
+               // In production, this will shut down the cluster with an 
exception.
+               final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+               final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               3, dispatcherBuilder.build(), 
scheduledExecutor, errorHandlerFuture::completeExceptionally);
 
-               final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
 
                ScheduledFuture<?> applicationExecutionFuture = 
bootstrap.getApplicationExecutionFuture();
 
                bootstrap.stop();
 
-               // wait until the bootstrap "thinks" it's done
-               shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+               // we call the error handler
+               assertException(errorHandlerFuture, 
CancellationException.class);
+
+               // we return a future that is completed exceptionally
+               assertException(shutdownFuture, CancellationException.class);
 
                // verify that the application task is being cancelled
                assertThat(applicationExecutionFuture.isCancelled(), is(true));
        }
 
        @Test
-       public void testClusterShutdownWhenStoppingBootstrap() throws Exception 
{
+       public void testErrorHandlerIsCalledWhenStoppingBootstrap() throws 
Exception {
+               final TestingDispatcherGateway.Builder dispatcherBuilder = new 
TestingDispatcherGateway.Builder()
+                               .setSubmitFunction(jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
+                               .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.RUNNING));
+
+               // we're "listening" on this to be completed to verify that the 
error handler is called.
+               // In production, this will shut down the cluster with an 
exception.
+               final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+               final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               2, dispatcherBuilder.build(), 
scheduledExecutor, errorHandlerFuture::completeExceptionally);
+
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
+
+               bootstrap.stop();
+
+               // we call the error handler
+               assertException(errorHandlerFuture, 
CancellationException.class);
+
+               // we return a future that is completed exceptionally
+               assertException(shutdownFuture, CancellationException.class);
+       }
+
+       @Test
+       public void testErrorHandlerIsCalledWhenSubmissionFails() throws 
Exception {
+               final Tuple1<Boolean> clusterShutdown = new Tuple1<>(false);
+               final TestingDispatcherGateway.Builder dispatcherBuilder = new 
TestingDispatcherGateway.Builder()
+                               .setSubmitFunction(jobGraph -> {
+                                       throw new 
FlinkRuntimeException("Nope!");
+                               })
+                               .setClusterShutdownFunction(status -> {
+                                       if (clusterShutdown.f0) {
+                                               throw new 
FlinkRuntimeException("This should be called only once");
+                                       }
+                                       clusterShutdown.f0 = true;
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               });
+
+               // we're "listening" on this to be completed to verify that the 
error handler is called.
+               // In production, this will shut down the cluster with an 
exception.
+               final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+               final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
+               final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               3, dispatcherGateway, scheduledExecutor, 
errorHandlerFuture::completeExceptionally);
+
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
+
+               // we call the error handler
+               assertException(errorHandlerFuture, 
ApplicationExecutionException.class);
+
+               // we return a future that is completed exceptionally
+               assertException(shutdownFuture, 
ApplicationExecutionException.class);
+
+               assertFalse(clusterShutdown.f0);
+               dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+               assertTrue(clusterShutdown.f0);

Review comment:
       Or we simply add a counter to `setClusterShutdownFunction` and assert 
that its value `0` at the end of the test?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to