This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 61a9f25cd827d09e64a9a3f39c0fb85639007b89
Author: Kostas Kloudas <[email protected]>
AuthorDate: Wed Oct 21 21:00:27 2020 +0200

    [hotfix] minor refactoring in ApplicationDispatcherBootstrap
---
 .../ApplicationDispatcherBootstrap.java            | 32 +++++++++++-----------
 1 file changed, 16 insertions(+), 16 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
index 1443804..48dd704 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
@@ -142,28 +142,28 @@ public class ApplicationDispatcherBootstrap implements 
DispatcherBootstrap {
        private CompletableFuture<Acknowledge> 
runApplicationAndShutdownClusterAsync(final DispatcherGateway 
dispatcherGateway) {
                return applicationCompletionFuture
                                .handle((r, t) -> {
-                                       if (t != null) {
 
-                                               final 
Optional<ApplicationFailureException> exception =
-                                                               
ExceptionUtils.findThrowable(t, ApplicationFailureException.class);
-
-                                               if (exception.isPresent()) {
-                                                       final ApplicationStatus 
applicationStatus = exception.get().getStatus();
+                                       if (t == null) {
+                                               LOG.info("Application completed 
SUCCESSFULLY");
+                                               return 
dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+                                       }
 
-                                                       if (applicationStatus 
== ApplicationStatus.CANCELED || applicationStatus == ApplicationStatus.FAILED) 
{
-                                                               
LOG.info("Application {}: ", applicationStatus, t);
-                                                               return 
dispatcherGateway.shutDownCluster(applicationStatus);
-                                                       }
-                                               }
+                                       final 
Optional<ApplicationFailureException> exception =
+                                                       
ExceptionUtils.findThrowable(t, ApplicationFailureException.class);
 
-                                               LOG.warn("Exiting with 
Application Status UNKNOWN: ", t);
-                                               
this.errorHandler.onFatalError(new FlinkException("Application failed 
unexpectedly.", t));
+                                       if (exception.isPresent()) {
+                                               final ApplicationStatus 
applicationStatus = exception.get().getStatus();
 
-                                               return 
FutureUtils.<Acknowledge>completedExceptionally(t);
+                                               if (applicationStatus == 
ApplicationStatus.CANCELED || applicationStatus == ApplicationStatus.FAILED) {
+                                                       LOG.info("Application 
{}: ", applicationStatus, t);
+                                                       return 
dispatcherGateway.shutDownCluster(applicationStatus);
+                                               }
                                        }
 
-                                       LOG.info("Application completed 
SUCCESSFULLY");
-                                       return 
dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+                                       LOG.warn("Application failed 
unexpectedly: ", t);
+                                       this.errorHandler.onFatalError(new 
FlinkException("Application failed unexpectedly.", t));
+
+                                       return 
FutureUtils.<Acknowledge>completedExceptionally(t);
                                })
                                .thenCompose(Function.identity());
        }

Reply via email to