tillrohrmann commented on a change in pull request #13319:
URL: https://github.com/apache/flink/pull/13319#discussion_r483078088



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -194,8 +194,9 @@ public Dispatcher(
        public void onStart() throws Exception {
                try {
                        startDispatcherServices();
-               } catch (Exception e) {
-                       final DispatcherException exception = new 
DispatcherException(String.format("Could not start the Dispatcher %s", 
getAddress()), e);
+               } catch (Throwable t) {
+                       getTerminationFuture().completeExceptionally(t);

Review comment:
       This should not be necessary.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
##########
@@ -513,10 +513,10 @@ public State terminate(AkkaRpcActor<?> akkaRpcActor) {
                        try {
                                terminationFuture = 
akkaRpcActor.rpcEndpoint.internalCallOnStop();
                        } catch (Throwable t) {
-                               terminationFuture = 
FutureUtils.completedExceptionally(
-                                       new AkkaRpcException(
-                                               String.format("Failure while 
stopping RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),
-                                               t));
+                               String errorMsg = String.format("Failure while 
stopping RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId());
+                               
LoggerFactory.getLogger(akkaRpcActor.rpcEndpoint.getClass()).error(errorMsg, t);
+                               terminationFuture = 
FutureUtils.completedExceptionally(new AkkaRpcException(errorMsg, t));
+                               
akkaRpcActor.rpcEndpoint.getTerminationFuture().completeExceptionally(t);

Review comment:
       This should not be necessary.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -88,6 +95,18 @@
 
        private void registerShutDownFuture() {
                FutureUtils.forward(dispatcherRunner.getShutDownFuture(), 
shutDownFuture);
+               BiConsumer<Object, Throwable> terminateAction = (ignored, 
throwable) -> {
+                       if (throwable != null) {
+                               shutDownFuture.completeExceptionally(throwable);
+                       } else {
+                               
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+                       }
+                       if (isRunning.get()) {
+                               fatalErrorHandler.onFatalError(throwable);
+                       }
+               };
+               
dispatcherRunner.getTerminationFuture().whenComplete(terminateAction);

Review comment:
       I think the `DispatcherRunner.getTerminationFuture` won't complete if 
the `Dispatcher` terminates. The problem is that we only forward the result of 
the `DispatcherLeaderProcess.terminationFuture` to the 
`DispatcherRunner.terminationFuture` if we call `closeAsync`. I would suggest 
to add tests to verify the intended behavior.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
##########
@@ -107,13 +112,18 @@ public void grantLeadership(UUID leaderSessionID) {
        }
 
        private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
-               stopDispatcherLeaderProcess();
+               try {
+                       stopDispatcherLeaderProcess();
 
-               dispatcherLeaderProcess = 
createNewDispatcherLeaderProcess(leaderSessionID);
+                       dispatcherLeaderProcess = 
createNewDispatcherLeaderProcess(leaderSessionID);
 
-               final DispatcherLeaderProcess newDispatcherLeaderProcess = 
dispatcherLeaderProcess;
-               FutureUtils.assertNoException(
-                       
previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
+                       final DispatcherLeaderProcess 
newDispatcherLeaderProcess = dispatcherLeaderProcess;
+                       FutureUtils.assertNoException(
+                               
previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
+               } catch (Throwable t) {
+                       terminationFuture.completeExceptionally(t);
+                       throw t;

Review comment:
       Why is this needed?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -88,6 +95,18 @@
 
        private void registerShutDownFuture() {
                FutureUtils.forward(dispatcherRunner.getShutDownFuture(), 
shutDownFuture);
+               BiConsumer<Object, Throwable> terminateAction = (ignored, 
throwable) -> {
+                       if (throwable != null) {
+                               shutDownFuture.completeExceptionally(throwable);
+                       } else {
+                               
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+                       }
+                       if (isRunning.get()) {
+                               fatalErrorHandler.onFatalError(throwable);
+                       }
+               };
+               
dispatcherRunner.getTerminationFuture().whenComplete(terminateAction);
+               
resourceManager.getTerminationFuture().whenComplete(terminateAction);

Review comment:
       I'd move this into a `failOnPrematureTermination` method and combine 
both termination Futures with `CompletableFuture.any`. Moreover, we should not 
complete the `shutDownFuture`. Simply calling `fatalErrorHandler` is good 
enough.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
##########
@@ -548,11 +548,10 @@ public State start(AkkaRpcActor<?> akkaRpcActor) {
                        try {
                                akkaRpcActor.rpcEndpoint.internalCallOnStart();
                        } catch (Throwable throwable) {
-                               akkaRpcActor.stop(
-                                       RpcEndpointTerminationResult.failure(
-                                               new AkkaRpcException(
-                                                       String.format("Could 
not start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),
-                                                       throwable)));
+                               String errorMsg = String.format("Could not 
start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId());
+                               
LoggerFactory.getLogger(akkaRpcActor.rpcEndpoint.getClass()).error(errorMsg, 
throwable);
+                               
akkaRpcActor.stop(RpcEndpointTerminationResult.failure(new 
AkkaRpcException(errorMsg, throwable)));
+                               
akkaRpcActor.rpcEndpoint.getTerminationFuture().completeExceptionally(throwable);

Review comment:
       Same here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to