tillrohrmann commented on a change in pull request #13583:
URL: https://github.com/apache/flink/pull/13583#discussion_r503761015



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
##########
@@ -92,6 +91,7 @@ public ApplicationDispatcherGatewayServiceFactory(
                        throw new FlinkRuntimeException("Could not create the 
Dispatcher rpc endpoint.", e);
                }
 
+               bootstrap.setErrorHandler(exception -> 
dispatcher.getShutDownFuture().completeExceptionally(exception));

Review comment:
       We could also instantiate a separate error handler, right? Is the 
assumption that any error which `bootstrap` receives is a fatal exception and 
continuing the work does not make sense? If yes, then we could simply use a 
`FatalErrorHandler` which terminates the process.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -144,19 +151,21 @@ public void stop() {
                                                if 
(ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
                                                        // this means the Flink 
Job was cancelled
                                                        applicationStatus = 
ApplicationStatus.CANCELED;
-                                                       LOG.warn("Application 
{}: ", applicationStatus, t);
+                                                       LOG.info("Application 
{}: ", applicationStatus, t);
 
                                                        return 
dispatcher.shutDownCluster(applicationStatus);
                                                }
 
                                                if 
(ExceptionUtils.findThrowable(t, JobExecutionException.class).isPresent()) {
                                                        applicationStatus = 
ApplicationStatus.FAILED;
-                                                       LOG.warn("Application 
{}: ", applicationStatus, t);
+                                                       LOG.info("Application 
{}: ", applicationStatus, t);
 
                                                        return 
dispatcher.shutDownCluster(applicationStatus);
                                                }
 
-                                               return 
dispatcher.shutDownClusterExceptionally(t);
+                                               LOG.warn("Exiting with 
Application Status UNKNOWN: ", t);
+                                               errorHandler.onFatalError(t);
+                                               return 
CompletableFuture.completedFuture(Acknowledge.get());

Review comment:
       Why do we finish the handler with `Acknowledge` here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to