zentol commented on a change in pull request #13319:
URL: https://github.com/apache/flink/pull/13319#discussion_r489283049



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -74,22 +79,46 @@
                        @Nonnull ResourceManager<?> resourceManager,
                        @Nonnull LeaderRetrievalService 
dispatcherLeaderRetrievalService,
                        @Nonnull LeaderRetrievalService 
resourceManagerRetrievalService,
-                       @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint) {
+                       @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint,
+                       @Nonnull FatalErrorHandler fatalErrorHandler,
+                       @Nonnull CompletableFuture<DispatcherGateway> 
dispatcherGatewayCompletableFuture) {
                this.dispatcherRunner = dispatcherRunner;
                this.resourceManager = resourceManager;
                this.dispatcherLeaderRetrievalService = 
dispatcherLeaderRetrievalService;
                this.resourceManagerRetrievalService = 
resourceManagerRetrievalService;
                this.webMonitorEndpoint = webMonitorEndpoint;
+               this.fatalErrorHandler = fatalErrorHandler;
                this.terminationFuture = new CompletableFuture<>();
                this.shutDownFuture = new CompletableFuture<>();
 
                registerShutDownFuture();
+               failOnPrematureTermination(dispatcherGatewayCompletableFuture);
        }
 
        private void registerShutDownFuture() {
                FutureUtils.forward(dispatcherRunner.getShutDownFuture(), 
shutDownFuture);
        }
 
+       private void 
failOnPrematureTermination(CompletableFuture<DispatcherGateway> 
dispatcherGatewayCompletableFuture) {
+               dispatcherGatewayCompletableFuture.whenComplete((dispatcher, 
throwable) -> {
+                       if (dispatcher != null && dispatcher instanceof 
Dispatcher) {
+                               CompletableFuture.anyOf(((Dispatcher) 
dispatcher).getTerminationFuture(), resourceManager.getTerminationFuture())
+                                       .whenComplete((ignored, t) -> {
+                                               if (isRunning.get()) {
+                                                       
fatalErrorHandler.onFatalError(t);
+                                               }
+                                       });
+                       } else {
+                               LOG.warn("DispatcherGateway({}) not instance of 
Dispatcher or failed to get DispatcherGateway!", dispatcher, throwable);

Review comment:
       This kind of implementation details should not be written to the logs.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -74,22 +79,46 @@
                        @Nonnull ResourceManager<?> resourceManager,
                        @Nonnull LeaderRetrievalService 
dispatcherLeaderRetrievalService,
                        @Nonnull LeaderRetrievalService 
resourceManagerRetrievalService,
-                       @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint) {
+                       @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint,
+                       @Nonnull FatalErrorHandler fatalErrorHandler,
+                       @Nonnull CompletableFuture<DispatcherGateway> 
dispatcherGatewayCompletableFuture) {
                this.dispatcherRunner = dispatcherRunner;
                this.resourceManager = resourceManager;
                this.dispatcherLeaderRetrievalService = 
dispatcherLeaderRetrievalService;
                this.resourceManagerRetrievalService = 
resourceManagerRetrievalService;
                this.webMonitorEndpoint = webMonitorEndpoint;
+               this.fatalErrorHandler = fatalErrorHandler;
                this.terminationFuture = new CompletableFuture<>();
                this.shutDownFuture = new CompletableFuture<>();
 
                registerShutDownFuture();
+               failOnPrematureTermination(dispatcherGatewayCompletableFuture);
        }
 
        private void registerShutDownFuture() {
                FutureUtils.forward(dispatcherRunner.getShutDownFuture(), 
shutDownFuture);
        }
 
+       private void 
failOnPrematureTermination(CompletableFuture<DispatcherGateway> 
dispatcherGatewayCompletableFuture) {
+               dispatcherGatewayCompletableFuture.whenComplete((dispatcher, 
throwable) -> {
+                       if (dispatcher != null && dispatcher instanceof 
Dispatcher) {

Review comment:
       This breaks the DispatcherGateway abstraction and reinforces the 
assumption that the RM and Dispatcher run in the same JVM which may change in 
the future.
   Why can we not use the shutdown future of the `DispatcherRunner`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -74,22 +79,46 @@
                        @Nonnull ResourceManager<?> resourceManager,
                        @Nonnull LeaderRetrievalService 
dispatcherLeaderRetrievalService,
                        @Nonnull LeaderRetrievalService 
resourceManagerRetrievalService,
-                       @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint) {
+                       @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint,
+                       @Nonnull FatalErrorHandler fatalErrorHandler,
+                       @Nonnull CompletableFuture<DispatcherGateway> 
dispatcherGatewayCompletableFuture) {
                this.dispatcherRunner = dispatcherRunner;
                this.resourceManager = resourceManager;
                this.dispatcherLeaderRetrievalService = 
dispatcherLeaderRetrievalService;
                this.resourceManagerRetrievalService = 
resourceManagerRetrievalService;
                this.webMonitorEndpoint = webMonitorEndpoint;
+               this.fatalErrorHandler = fatalErrorHandler;
                this.terminationFuture = new CompletableFuture<>();
                this.shutDownFuture = new CompletableFuture<>();
 
                registerShutDownFuture();
+               failOnPrematureTermination(dispatcherGatewayCompletableFuture);
        }
 
        private void registerShutDownFuture() {
                FutureUtils.forward(dispatcherRunner.getShutDownFuture(), 
shutDownFuture);
        }
 
+       private void 
failOnPrematureTermination(CompletableFuture<DispatcherGateway> 
dispatcherGatewayCompletableFuture) {
+               dispatcherGatewayCompletableFuture.whenComplete((dispatcher, 
throwable) -> {
+                       if (dispatcher != null && dispatcher instanceof 
Dispatcher) {
+                               CompletableFuture.anyOf(((Dispatcher) 
dispatcher).getTerminationFuture(), resourceManager.getTerminationFuture())
+                                       .whenComplete((ignored, t) -> {
+                                               if (isRunning.get()) {
+                                                       
fatalErrorHandler.onFatalError(t);

Review comment:
       It would be good to log a warning here that the DRMC shut down because 
the dispatcher/rm unexpected terminated.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -74,22 +79,46 @@
                        @Nonnull ResourceManager<?> resourceManager,
                        @Nonnull LeaderRetrievalService 
dispatcherLeaderRetrievalService,
                        @Nonnull LeaderRetrievalService 
resourceManagerRetrievalService,
-                       @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint) {
+                       @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint,
+                       @Nonnull FatalErrorHandler fatalErrorHandler,
+                       @Nonnull CompletableFuture<DispatcherGateway> 
dispatcherGatewayCompletableFuture) {
                this.dispatcherRunner = dispatcherRunner;
                this.resourceManager = resourceManager;
                this.dispatcherLeaderRetrievalService = 
dispatcherLeaderRetrievalService;
                this.resourceManagerRetrievalService = 
resourceManagerRetrievalService;
                this.webMonitorEndpoint = webMonitorEndpoint;
+               this.fatalErrorHandler = fatalErrorHandler;
                this.terminationFuture = new CompletableFuture<>();
                this.shutDownFuture = new CompletableFuture<>();
 
                registerShutDownFuture();
+               failOnPrematureTermination(dispatcherGatewayCompletableFuture);
        }
 
        private void registerShutDownFuture() {
                FutureUtils.forward(dispatcherRunner.getShutDownFuture(), 
shutDownFuture);
        }
 
+       private void 
failOnPrematureTermination(CompletableFuture<DispatcherGateway> 
dispatcherGatewayCompletableFuture) {
+               dispatcherGatewayCompletableFuture.whenComplete((dispatcher, 
throwable) -> {
+                       if (dispatcher != null && dispatcher instanceof 
Dispatcher) {
+                               CompletableFuture.anyOf(((Dispatcher) 
dispatcher).getTerminationFuture(), resourceManager.getTerminationFuture())
+                                       .whenComplete((ignored, t) -> {

Review comment:
       It may be a good idea to check whether t is indeed not null and log 
something accordingly if this is not the case. If it is null, then the process 
shut down without an error unexpectedly, which if I understand things correctly 
should not happen.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to