dmvk commented on a change in pull request #17000:
URL: https://github.com/apache/flink/pull/17000#discussion_r698223817
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -147,35 +146,34 @@ public void stop() {
*/
private CompletableFuture<Acknowledge>
runApplicationAndShutdownClusterAsync(
final DispatcherGateway dispatcherGateway) {
- return applicationCompletionFuture
- .handle(
- (ignored, t) -> {
- if (t == null) {
- LOG.info("Application completed SUCCESSFULLY");
- return dispatcherGateway.shutDownCluster(
- ApplicationStatus.SUCCEEDED);
- }
-
- final Optional<UnsuccessfulExecutionException>
maybeException =
- ExceptionUtils.findThrowable(
- t,
UnsuccessfulExecutionException.class);
- if (maybeException.isPresent()) {
- final ApplicationStatus applicationStatus =
- maybeException.get().getStatus();
- if (applicationStatus ==
ApplicationStatus.CANCELED
- || applicationStatus ==
ApplicationStatus.FAILED) {
- LOG.info("Application {}: ",
applicationStatus, t);
- return
dispatcherGateway.shutDownCluster(applicationStatus);
- }
- }
-
- LOG.warn("Application failed unexpectedly: ", t);
- this.errorHandler.onFatalError(
- new FlinkException("Application failed
unexpectedly.", t));
-
- return
FutureUtils.<Acknowledge>completedExceptionally(t);
- })
- .thenCompose(Function.identity());
+ final CompletableFuture<Acknowledge> shutdownFuture =
+ applicationCompletionFuture
+ .handle(
+ (ignored, t) -> {
+ if (t == null) {
+ LOG.info("Application completed
SUCCESSFULLY");
+ return
dispatcherGateway.shutDownCluster(
+ ApplicationStatus.SUCCEEDED);
+ }
+ final
Optional<UnsuccessfulExecutionException> maybeException =
+ ExceptionUtils.findThrowable(
+ t,
UnsuccessfulExecutionException.class);
+ if (maybeException.isPresent()) {
+ final ApplicationStatus
applicationStatus =
+
maybeException.get().getStatus();
+ if (applicationStatus ==
ApplicationStatus.CANCELED
+ || applicationStatus ==
ApplicationStatus.FAILED) {
+ LOG.info("Application {}: ",
applicationStatus, t);
+ return
dispatcherGateway.shutDownCluster(
+ applicationStatus);
+ }
+ }
+ LOG.warn("Application failed unexpectedly:
", t);
+ return
dispatcherGateway.shutDownClusterExceptionally(t);
+ })
Review comment:
Hmm, I think I may have misinterpreted the issue description then 🤔 I
initially thought that don't want any exceptions (even from the user code) to
shut down the process fatally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]