rkhachatryan commented on a change in pull request #12525:
URL: https://github.com/apache/flink/pull/12525#discussion_r438652781
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
// stop all timers and threads
tryShutdownTimerService();
+ Throwable suppressedThrowable = null;
// stop all asynchronous checkpoint threads
try {
cancelables.close();
shutdownAsyncThreads();
} catch (Throwable t) {
- // catch and log the exception to not replace the
original exception
- LOG.error("Could not shut down async checkpoint
threads", t);
+ // catch and suppress the exception to not replace the
original exception
+ suppressedThrowable =
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
Review comment:
It would be nice to preserve `"Could not shut down async checkpoint
threads"` message (by wrapping `t`); but probably stacktrace is enough.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
// stop all timers and threads
tryShutdownTimerService();
Review comment:
This call has the same try-catch-log inside. Can we apply the same
approach to it?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -537,6 +537,7 @@ public final void invoke() throws Exception {
afterInvoke();
}
catch (Exception invokeException) {
+ LOG.error("Error while running task " +
getTaskNameWithSubtaskAndId(), invokeException);
Review comment:
nit: can we use `{}` instead of string concatenation here?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
// stop all timers and threads
tryShutdownTimerService();
+ Throwable suppressedThrowable = null;
// stop all asynchronous checkpoint threads
try {
cancelables.close();
shutdownAsyncThreads();
} catch (Throwable t) {
- // catch and log the exception to not replace the
original exception
- LOG.error("Could not shut down async checkpoint
threads", t);
+ // catch and suppress the exception to not replace the
original exception
+ suppressedThrowable =
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
}
// we must! perform this cleanup
try {
cleanup();
} catch (Throwable t) {
- // catch and log the exception to not replace the
original exception
- LOG.error("Error during cleanup of stream task", t);
+ // catch and suppress the exception to not replace the
original exception
+ suppressedThrowable =
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
}
// if the operators were not disposed before, do a hard dispose
- disposeAllOperators(true);
+ try {
+ disposeAllOperators(true);
+ } catch (Throwable t) {
+ // catch and suppress all the disposal exceptions
+ suppressedThrowable =
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
Review comment:
ditto about comment and error message.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +714,14 @@ private void disposeAllOperators(boolean logOnlyErrors)
throws Exception {
operator.dispose();
}
catch (Exception e) {
Review comment:
This is a bit inconsistent with other changes where we catch
`Throwable`, not `Exception`.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
// stop all timers and threads
tryShutdownTimerService();
+ Throwable suppressedThrowable = null;
// stop all asynchronous checkpoint threads
try {
cancelables.close();
shutdownAsyncThreads();
} catch (Throwable t) {
- // catch and log the exception to not replace the
original exception
- LOG.error("Could not shut down async checkpoint
threads", t);
+ // catch and suppress the exception to not replace the
original exception
Review comment:
nit: this comment doesn't seem very useful. I'd remove it.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
// stop all timers and threads
tryShutdownTimerService();
+ Throwable suppressedThrowable = null;
// stop all asynchronous checkpoint threads
try {
cancelables.close();
shutdownAsyncThreads();
} catch (Throwable t) {
- // catch and log the exception to not replace the
original exception
- LOG.error("Could not shut down async checkpoint
threads", t);
+ // catch and suppress the exception to not replace the
original exception
+ suppressedThrowable =
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
}
// we must! perform this cleanup
try {
cleanup();
} catch (Throwable t) {
- // catch and log the exception to not replace the
original exception
- LOG.error("Error during cleanup of stream task", t);
+ // catch and suppress the exception to not replace the
original exception
+ suppressedThrowable =
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
}
// if the operators were not disposed before, do a hard dispose
- disposeAllOperators(true);
+ try {
+ disposeAllOperators(true);
Review comment:
I guess we can remove `logOnlyErrors` argument now as we are catching
exceptions.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -646,10 +653,14 @@ protected void cleanUpInvoke() throws Exception {
try {
channelIOExecutor.shutdown();
} catch (Throwable t) {
- LOG.error("Error during shutdown the channel state
unspill executor", t);
+ suppressedThrowable =
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
}
mailboxProcessor.close();
+
+ if (suppressedThrowable != null) {
+ throw (Exception) suppressedThrowable;
Review comment:
I think `suppressedThrowable` doesn't have to be `Exception`.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
// stop all timers and threads
tryShutdownTimerService();
+ Throwable suppressedThrowable = null;
// stop all asynchronous checkpoint threads
try {
cancelables.close();
shutdownAsyncThreads();
} catch (Throwable t) {
- // catch and log the exception to not replace the
original exception
- LOG.error("Could not shut down async checkpoint
threads", t);
+ // catch and suppress the exception to not replace the
original exception
+ suppressedThrowable =
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
}
// we must! perform this cleanup
try {
cleanup();
} catch (Throwable t) {
- // catch and log the exception to not replace the
original exception
- LOG.error("Error during cleanup of stream task", t);
+ // catch and suppress the exception to not replace the
original exception
+ suppressedThrowable =
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
Review comment:
ditto about the comment and losing error message.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
// stop all timers and threads
tryShutdownTimerService();
+ Throwable suppressedThrowable = null;
Review comment:
So we have 4-5 similar try-catch blocks one after another now.
I guess we can't use Guava `Closer` because we want to handle any type of
exception, right?
And not CloseableRegistry because it closes silently.
Maybe we can emulate it by having a collection of closeables and close them
in a loop:
```
List<ThrowingRunnable<?>> runs = asList(cancelables::close,
this::shutdownAsyncThreads, this::cleanup, this::disposeAllOperators, ...);
Throwable suppressedThrowable = null;
for (ThrowingRunnable<?> run: runs) {
try {
run.run();
} catch (Throwable t) {
suppressedThrowable = ExceptionUtils.firstOrSuppressed(t,
suppressedThrowable);
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]