tillrohrmann commented on a change in pull request #13583:
URL: https://github.com/apache/flink/pull/13583#discussion_r503191078
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -141,25 +141,29 @@ public void stop() {
final ApplicationStatus
applicationStatus;
if (t != null) {
- final
Optional<JobCancellationException> cancellationException =
-
ExceptionUtils.findThrowable(t, JobCancellationException.class);
-
- if
(cancellationException.isPresent()) {
+ if
(ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
// this means the Flink
Job was cancelled
applicationStatus =
ApplicationStatus.CANCELED;
- } else if (t instanceof
CancellationException) {
- // this means that the
future was cancelled
- applicationStatus =
ApplicationStatus.UNKNOWN;
- } else {
+ LOG.warn("Application
{}: ", applicationStatus, t);
Review comment:
Warn log level seems to high for a normal state transition.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
##########
@@ -82,4 +82,8 @@
default CompletableFuture<Acknowledge>
shutDownCluster(ApplicationStatus applicationStatus) {
return shutDownCluster();
}
+
+ default CompletableFuture<Acknowledge>
shutDownClusterExceptionally(final Throwable throwable) {
+ throw new UnsupportedOperationException();
+ }
Review comment:
I think we should not introduce this method. Instead the
`ApplicationDispatcherBootstrap` should get a `FatalErrorHandler` which it
simply calls if it sees an unknown exception.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -141,25 +141,29 @@ public void stop() {
final ApplicationStatus
applicationStatus;
if (t != null) {
- final
Optional<JobCancellationException> cancellationException =
-
ExceptionUtils.findThrowable(t, JobCancellationException.class);
-
- if
(cancellationException.isPresent()) {
+ if
(ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
// this means the Flink
Job was cancelled
applicationStatus =
ApplicationStatus.CANCELED;
- } else if (t instanceof
CancellationException) {
- // this means that the
future was cancelled
- applicationStatus =
ApplicationStatus.UNKNOWN;
- } else {
+ LOG.warn("Application
{}: ", applicationStatus, t);
+
+ return
dispatcher.shutDownCluster(applicationStatus);
+ }
+
+ if
(ExceptionUtils.findThrowable(t, JobExecutionException.class).isPresent()) {
Review comment:
Are you sure that a `JobExecutionException` means that the Flink job has
reached a globally terminal state? Looking at `JobResult.toJobExecutionResult`
it seems that we also throw this exception if the state is
`ApplicationStatus.UNKNOWN`.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -141,25 +141,29 @@ public void stop() {
final ApplicationStatus
applicationStatus;
if (t != null) {
- final
Optional<JobCancellationException> cancellationException =
-
ExceptionUtils.findThrowable(t, JobCancellationException.class);
-
- if
(cancellationException.isPresent()) {
+ if
(ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
// this means the Flink
Job was cancelled
applicationStatus =
ApplicationStatus.CANCELED;
- } else if (t instanceof
CancellationException) {
- // this means that the
future was cancelled
- applicationStatus =
ApplicationStatus.UNKNOWN;
- } else {
+ LOG.warn("Application
{}: ", applicationStatus, t);
+
+ return
dispatcher.shutDownCluster(applicationStatus);
+ }
+
+ if
(ExceptionUtils.findThrowable(t, JobExecutionException.class).isPresent()) {
applicationStatus =
ApplicationStatus.FAILED;
+ LOG.warn("Application
{}: ", applicationStatus, t);
Review comment:
Same here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]