zentol commented on a change in pull request #13319:
URL: https://github.com/apache/flink/pull/13319#discussion_r489283049
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -74,22 +79,46 @@
@Nonnull ResourceManager<?> resourceManager,
@Nonnull LeaderRetrievalService
dispatcherLeaderRetrievalService,
@Nonnull LeaderRetrievalService
resourceManagerRetrievalService,
- @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint) {
+ @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint,
+ @Nonnull FatalErrorHandler fatalErrorHandler,
+ @Nonnull CompletableFuture<DispatcherGateway>
dispatcherGatewayCompletableFuture) {
this.dispatcherRunner = dispatcherRunner;
this.resourceManager = resourceManager;
this.dispatcherLeaderRetrievalService =
dispatcherLeaderRetrievalService;
this.resourceManagerRetrievalService =
resourceManagerRetrievalService;
this.webMonitorEndpoint = webMonitorEndpoint;
+ this.fatalErrorHandler = fatalErrorHandler;
this.terminationFuture = new CompletableFuture<>();
this.shutDownFuture = new CompletableFuture<>();
registerShutDownFuture();
+ failOnPrematureTermination(dispatcherGatewayCompletableFuture);
}
private void registerShutDownFuture() {
FutureUtils.forward(dispatcherRunner.getShutDownFuture(),
shutDownFuture);
}
+ private void
failOnPrematureTermination(CompletableFuture<DispatcherGateway>
dispatcherGatewayCompletableFuture) {
+ dispatcherGatewayCompletableFuture.whenComplete((dispatcher,
throwable) -> {
+ if (dispatcher != null && dispatcher instanceof
Dispatcher) {
+ CompletableFuture.anyOf(((Dispatcher)
dispatcher).getTerminationFuture(), resourceManager.getTerminationFuture())
+ .whenComplete((ignored, t) -> {
+ if (isRunning.get()) {
+
fatalErrorHandler.onFatalError(t);
+ }
+ });
+ } else {
+ LOG.warn("DispatcherGateway({}) not instance of
Dispatcher or failed to get DispatcherGateway!", dispatcher, throwable);
Review comment:
This kind of implementation details should not be written to the logs.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -74,22 +79,46 @@
@Nonnull ResourceManager<?> resourceManager,
@Nonnull LeaderRetrievalService
dispatcherLeaderRetrievalService,
@Nonnull LeaderRetrievalService
resourceManagerRetrievalService,
- @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint) {
+ @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint,
+ @Nonnull FatalErrorHandler fatalErrorHandler,
+ @Nonnull CompletableFuture<DispatcherGateway>
dispatcherGatewayCompletableFuture) {
this.dispatcherRunner = dispatcherRunner;
this.resourceManager = resourceManager;
this.dispatcherLeaderRetrievalService =
dispatcherLeaderRetrievalService;
this.resourceManagerRetrievalService =
resourceManagerRetrievalService;
this.webMonitorEndpoint = webMonitorEndpoint;
+ this.fatalErrorHandler = fatalErrorHandler;
this.terminationFuture = new CompletableFuture<>();
this.shutDownFuture = new CompletableFuture<>();
registerShutDownFuture();
+ failOnPrematureTermination(dispatcherGatewayCompletableFuture);
}
private void registerShutDownFuture() {
FutureUtils.forward(dispatcherRunner.getShutDownFuture(),
shutDownFuture);
}
+ private void
failOnPrematureTermination(CompletableFuture<DispatcherGateway>
dispatcherGatewayCompletableFuture) {
+ dispatcherGatewayCompletableFuture.whenComplete((dispatcher,
throwable) -> {
+ if (dispatcher != null && dispatcher instanceof
Dispatcher) {
Review comment:
This breaks the DispatcherGateway abstraction and reinforces the
assumption that the RM and Dispatcher run in the same JVM which may change in
the future.
Why can we not use the shutdown future of the `DispatcherRunner`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -74,22 +79,46 @@
@Nonnull ResourceManager<?> resourceManager,
@Nonnull LeaderRetrievalService
dispatcherLeaderRetrievalService,
@Nonnull LeaderRetrievalService
resourceManagerRetrievalService,
- @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint) {
+ @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint,
+ @Nonnull FatalErrorHandler fatalErrorHandler,
+ @Nonnull CompletableFuture<DispatcherGateway>
dispatcherGatewayCompletableFuture) {
this.dispatcherRunner = dispatcherRunner;
this.resourceManager = resourceManager;
this.dispatcherLeaderRetrievalService =
dispatcherLeaderRetrievalService;
this.resourceManagerRetrievalService =
resourceManagerRetrievalService;
this.webMonitorEndpoint = webMonitorEndpoint;
+ this.fatalErrorHandler = fatalErrorHandler;
this.terminationFuture = new CompletableFuture<>();
this.shutDownFuture = new CompletableFuture<>();
registerShutDownFuture();
+ failOnPrematureTermination(dispatcherGatewayCompletableFuture);
}
private void registerShutDownFuture() {
FutureUtils.forward(dispatcherRunner.getShutDownFuture(),
shutDownFuture);
}
+ private void
failOnPrematureTermination(CompletableFuture<DispatcherGateway>
dispatcherGatewayCompletableFuture) {
+ dispatcherGatewayCompletableFuture.whenComplete((dispatcher,
throwable) -> {
+ if (dispatcher != null && dispatcher instanceof
Dispatcher) {
+ CompletableFuture.anyOf(((Dispatcher)
dispatcher).getTerminationFuture(), resourceManager.getTerminationFuture())
+ .whenComplete((ignored, t) -> {
+ if (isRunning.get()) {
+
fatalErrorHandler.onFatalError(t);
Review comment:
It would be good to log a warning here that the DRMC shut down because
the dispatcher/rm unexpected terminated.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -74,22 +79,46 @@
@Nonnull ResourceManager<?> resourceManager,
@Nonnull LeaderRetrievalService
dispatcherLeaderRetrievalService,
@Nonnull LeaderRetrievalService
resourceManagerRetrievalService,
- @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint) {
+ @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint,
+ @Nonnull FatalErrorHandler fatalErrorHandler,
+ @Nonnull CompletableFuture<DispatcherGateway>
dispatcherGatewayCompletableFuture) {
this.dispatcherRunner = dispatcherRunner;
this.resourceManager = resourceManager;
this.dispatcherLeaderRetrievalService =
dispatcherLeaderRetrievalService;
this.resourceManagerRetrievalService =
resourceManagerRetrievalService;
this.webMonitorEndpoint = webMonitorEndpoint;
+ this.fatalErrorHandler = fatalErrorHandler;
this.terminationFuture = new CompletableFuture<>();
this.shutDownFuture = new CompletableFuture<>();
registerShutDownFuture();
+ failOnPrematureTermination(dispatcherGatewayCompletableFuture);
}
private void registerShutDownFuture() {
FutureUtils.forward(dispatcherRunner.getShutDownFuture(),
shutDownFuture);
}
+ private void
failOnPrematureTermination(CompletableFuture<DispatcherGateway>
dispatcherGatewayCompletableFuture) {
+ dispatcherGatewayCompletableFuture.whenComplete((dispatcher,
throwable) -> {
+ if (dispatcher != null && dispatcher instanceof
Dispatcher) {
+ CompletableFuture.anyOf(((Dispatcher)
dispatcher).getTerminationFuture(), resourceManager.getTerminationFuture())
+ .whenComplete((ignored, t) -> {
Review comment:
It may be a good idea to check whether t is indeed not null and log
something accordingly if this is not the case. If it is null, then the process
shut down without an error unexpectedly, which if I understand things correctly
should not happen.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]