tillrohrmann commented on a change in pull request #13319:
URL: https://github.com/apache/flink/pull/13319#discussion_r483078088
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -194,8 +194,9 @@ public Dispatcher(
public void onStart() throws Exception {
try {
startDispatcherServices();
- } catch (Exception e) {
- final DispatcherException exception = new
DispatcherException(String.format("Could not start the Dispatcher %s",
getAddress()), e);
+ } catch (Throwable t) {
+ getTerminationFuture().completeExceptionally(t);
Review comment:
This should not be necessary.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
##########
@@ -513,10 +513,10 @@ public State terminate(AkkaRpcActor<?> akkaRpcActor) {
try {
terminationFuture =
akkaRpcActor.rpcEndpoint.internalCallOnStop();
} catch (Throwable t) {
- terminationFuture =
FutureUtils.completedExceptionally(
- new AkkaRpcException(
- String.format("Failure while
stopping RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),
- t));
+ String errorMsg = String.format("Failure while
stopping RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId());
+
LoggerFactory.getLogger(akkaRpcActor.rpcEndpoint.getClass()).error(errorMsg, t);
+ terminationFuture =
FutureUtils.completedExceptionally(new AkkaRpcException(errorMsg, t));
+
akkaRpcActor.rpcEndpoint.getTerminationFuture().completeExceptionally(t);
Review comment:
This should not be necessary.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -88,6 +95,18 @@
private void registerShutDownFuture() {
FutureUtils.forward(dispatcherRunner.getShutDownFuture(),
shutDownFuture);
+ BiConsumer<Object, Throwable> terminateAction = (ignored,
throwable) -> {
+ if (throwable != null) {
+ shutDownFuture.completeExceptionally(throwable);
+ } else {
+
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+ }
+ if (isRunning.get()) {
+ fatalErrorHandler.onFatalError(throwable);
+ }
+ };
+
dispatcherRunner.getTerminationFuture().whenComplete(terminateAction);
Review comment:
I think the `DispatcherRunner.getTerminationFuture` won't complete if
the `Dispatcher` terminates. The problem is that we only forward the result of
the `DispatcherLeaderProcess.terminationFuture` to the
`DispatcherRunner.terminationFuture` if we call `closeAsync`. I would suggest
to add tests to verify the intended behavior.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
##########
@@ -107,13 +112,18 @@ public void grantLeadership(UUID leaderSessionID) {
}
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
- stopDispatcherLeaderProcess();
+ try {
+ stopDispatcherLeaderProcess();
- dispatcherLeaderProcess =
createNewDispatcherLeaderProcess(leaderSessionID);
+ dispatcherLeaderProcess =
createNewDispatcherLeaderProcess(leaderSessionID);
- final DispatcherLeaderProcess newDispatcherLeaderProcess =
dispatcherLeaderProcess;
- FutureUtils.assertNoException(
-
previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
+ final DispatcherLeaderProcess
newDispatcherLeaderProcess = dispatcherLeaderProcess;
+ FutureUtils.assertNoException(
+
previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
+ } catch (Throwable t) {
+ terminationFuture.completeExceptionally(t);
+ throw t;
Review comment:
Why is this needed?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -88,6 +95,18 @@
private void registerShutDownFuture() {
FutureUtils.forward(dispatcherRunner.getShutDownFuture(),
shutDownFuture);
+ BiConsumer<Object, Throwable> terminateAction = (ignored,
throwable) -> {
+ if (throwable != null) {
+ shutDownFuture.completeExceptionally(throwable);
+ } else {
+
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+ }
+ if (isRunning.get()) {
+ fatalErrorHandler.onFatalError(throwable);
+ }
+ };
+
dispatcherRunner.getTerminationFuture().whenComplete(terminateAction);
+
resourceManager.getTerminationFuture().whenComplete(terminateAction);
Review comment:
I'd move this into a `failOnPrematureTermination` method and combine
both termination Futures with `CompletableFuture.any`. Moreover, we should not
complete the `shutDownFuture`. Simply calling `fatalErrorHandler` is good
enough.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
##########
@@ -548,11 +548,10 @@ public State start(AkkaRpcActor<?> akkaRpcActor) {
try {
akkaRpcActor.rpcEndpoint.internalCallOnStart();
} catch (Throwable throwable) {
- akkaRpcActor.stop(
- RpcEndpointTerminationResult.failure(
- new AkkaRpcException(
- String.format("Could
not start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),
- throwable)));
+ String errorMsg = String.format("Could not
start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId());
+
LoggerFactory.getLogger(akkaRpcActor.rpcEndpoint.getClass()).error(errorMsg,
throwable);
+
akkaRpcActor.stop(RpcEndpointTerminationResult.failure(new
AkkaRpcException(errorMsg, throwable)));
+
akkaRpcActor.rpcEndpoint.getTerminationFuture().completeExceptionally(throwable);
Review comment:
Same here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]