rkhachatryan commented on a change in pull request #12525:
URL: https://github.com/apache/flink/pull/12525#discussion_r440237448
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -592,7 +592,7 @@ protected void afterInvoke() throws Exception {
// make an attempt to dispose the operators such that failures
in the dispose call
// still let the computation fail
- disposeAllOperators(false);
+ disposeAllOperators();
disposedOperators = true;
Review comment:
This line is now unnecessary because `disposedOperators` is updated in
`disposeAllOperators` before throwing an exception.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -686,27 +680,38 @@ private void shutdownAsyncThreads() throws Exception {
}
}
+ private Exception suppressThrowable(ThrowingRunnable<?> runnable,
Exception suppressedException) {
Review comment:
1. method name `suppressThrowable` doesn't say that it runs something.
`runSuppressed`?
1. argument name `suppressedException` is a bit misleading because it is `e`
that will be suppressed. `originalException`?
1. `@Nullable` for the 2nd argument?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -611,27 +611,21 @@ protected void cleanUpInvoke() throws Exception {
Thread.interrupted();
// stop all timers and threads
- tryShutdownTimerService();
+ Exception suppressedException =
suppressThrowable(this::tryShutdownTimerService, null);
// stop all asynchronous checkpoint threads
- try {
- cancelables.close();
- shutdownAsyncThreads();
- } catch (Throwable t) {
- // catch and log the exception to not replace the
original exception
- LOG.error("Could not shut down async checkpoint
threads", t);
- }
+ suppressedException = suppressThrowable(cancelables::close,
suppressedException);
+ suppressedException =
suppressThrowable(this::shutdownAsyncThreads, suppressedException);
// we must! perform this cleanup
- try {
- cleanup();
- } catch (Throwable t) {
- // catch and log the exception to not replace the
original exception
- LOG.error("Error during cleanup of stream task", t);
- }
+ suppressedException = suppressThrowable(this::cleanup,
suppressedException);
// if the operators were not disposed before, do a hard dispose
- disposeAllOperators(true);
+ try {
+ disposeAllOperators();
+ } catch (Exception t) {
+ suppressedException =
ExceptionUtils.firstOrSuppressed(t, suppressedException);
Review comment:
Can we use `suppressThrowable` here too?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -686,27 +680,38 @@ private void shutdownAsyncThreads() throws Exception {
}
}
+ private Exception suppressThrowable(ThrowingRunnable<?> runnable,
Exception suppressedException) {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ // TODO: investigate why Throwable instead of Exception
is used here.
+ Exception e = t instanceof Exception ? (Exception) t :
new Exception("throwable", t);
Review comment:
`"throwable", ` is unnecessary.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -611,27 +611,21 @@ protected void cleanUpInvoke() throws Exception {
Thread.interrupted();
// stop all timers and threads
- tryShutdownTimerService();
+ Exception suppressedException =
suppressThrowable(this::tryShutdownTimerService, null);
// stop all asynchronous checkpoint threads
- try {
- cancelables.close();
- shutdownAsyncThreads();
- } catch (Throwable t) {
- // catch and log the exception to not replace the
original exception
- LOG.error("Could not shut down async checkpoint
threads", t);
- }
+ suppressedException = suppressThrowable(cancelables::close,
suppressedException);
+ suppressedException =
suppressThrowable(this::shutdownAsyncThreads, suppressedException);
// we must! perform this cleanup
- try {
- cleanup();
- } catch (Throwable t) {
- // catch and log the exception to not replace the
original exception
- LOG.error("Error during cleanup of stream task", t);
- }
+ suppressedException = suppressThrowable(this::cleanup,
suppressedException);
// if the operators were not disposed before, do a hard dispose
- disposeAllOperators(true);
+ try {
+ disposeAllOperators();
+ } catch (Exception t) {
+ suppressedException =
ExceptionUtils.firstOrSuppressed(t, suppressedException);
+ }
// release the output resources. this method should never fail.
if (operatorChain != null) {
Review comment:
I think the above exceptions can be lost if a new exception is thrown by
this block.
Again, why not to use `suppressThrowable` here too?
To preserve the current behavior (exit immediately if this block fails) we
can `throw suppressedException` right after it.
However, I don't think that the current behavior is intentional.
The same about `mailboxProcessor.close` below.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -540,8 +540,8 @@ public final void invoke() throws Exception {
try {
cleanUpInvoke();
}
- catch (Throwable cleanUpException) {
- throw (Exception)
ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
+ catch (Exception cleanUpException) {
+ throw
ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
Review comment:
I think we agreed during the offline discussion to preserve the types of
what is being caught (or am I wrong?).
We can preserve `Throwable` like this:
```
} catch (Throwable cleanUpException) {
ExceptionUtils.rethrow(ExceptionUtils.firstOrSuppressed(cleanUpException,
invokeException));
```
I think this should be a separate `[hotfix]` commit.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]