rkhachatryan commented on a change in pull request #12525:
URL: https://github.com/apache/flink/pull/12525#discussion_r440237448



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -592,7 +592,7 @@ protected void afterInvoke() throws Exception {
 
                // make an attempt to dispose the operators such that failures 
in the dispose call
                // still let the computation fail
-               disposeAllOperators(false);
+               disposeAllOperators();
                disposedOperators = true;

Review comment:
       This line is now unnecessary because `disposedOperators` is updated in 
`disposeAllOperators` before throwing an exception.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -686,27 +680,38 @@ private void shutdownAsyncThreads() throws Exception {
                }
        }
 
+       private Exception suppressThrowable(ThrowingRunnable<?> runnable, 
Exception suppressedException) {

Review comment:
       1. method name `suppressThrowable` doesn't say that it runs something. 
`runSuppressed`?
   1. argument name `suppressedException` is a bit misleading because it is `e` 
that will be suppressed. `originalException`?
   1. `@Nullable` for the 2nd argument?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -611,27 +611,21 @@ protected void cleanUpInvoke() throws Exception {
                Thread.interrupted();
 
                // stop all timers and threads
-               tryShutdownTimerService();
+               Exception suppressedException = 
suppressThrowable(this::tryShutdownTimerService, null);
 
                // stop all asynchronous checkpoint threads
-               try {
-                       cancelables.close();
-                       shutdownAsyncThreads();
-               } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Could not shut down async checkpoint 
threads", t);
-               }
+               suppressedException = suppressThrowable(cancelables::close, 
suppressedException);
+               suppressedException = 
suppressThrowable(this::shutdownAsyncThreads, suppressedException);
 
                // we must! perform this cleanup
-               try {
-                       cleanup();
-               } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Error during cleanup of stream task", t);
-               }
+               suppressedException = suppressThrowable(this::cleanup, 
suppressedException);
 
                // if the operators were not disposed before, do a hard dispose
-               disposeAllOperators(true);
+               try {
+                       disposeAllOperators();
+               } catch (Exception t) {
+                       suppressedException = 
ExceptionUtils.firstOrSuppressed(t, suppressedException);

Review comment:
       Can we use `suppressThrowable` here too?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -686,27 +680,38 @@ private void shutdownAsyncThreads() throws Exception {
                }
        }
 
+       private Exception suppressThrowable(ThrowingRunnable<?> runnable, 
Exception suppressedException) {
+               try {
+                       runnable.run();
+               } catch (Throwable t) {
+                       // TODO: investigate why Throwable instead of Exception 
is used here.
+                       Exception e = t instanceof Exception ? (Exception) t : 
new Exception("throwable", t);

Review comment:
       `"throwable", ` is unnecessary.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -611,27 +611,21 @@ protected void cleanUpInvoke() throws Exception {
                Thread.interrupted();
 
                // stop all timers and threads
-               tryShutdownTimerService();
+               Exception suppressedException = 
suppressThrowable(this::tryShutdownTimerService, null);
 
                // stop all asynchronous checkpoint threads
-               try {
-                       cancelables.close();
-                       shutdownAsyncThreads();
-               } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Could not shut down async checkpoint 
threads", t);
-               }
+               suppressedException = suppressThrowable(cancelables::close, 
suppressedException);
+               suppressedException = 
suppressThrowable(this::shutdownAsyncThreads, suppressedException);
 
                // we must! perform this cleanup
-               try {
-                       cleanup();
-               } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Error during cleanup of stream task", t);
-               }
+               suppressedException = suppressThrowable(this::cleanup, 
suppressedException);
 
                // if the operators were not disposed before, do a hard dispose
-               disposeAllOperators(true);
+               try {
+                       disposeAllOperators();
+               } catch (Exception t) {
+                       suppressedException = 
ExceptionUtils.firstOrSuppressed(t, suppressedException);
+               }
 
                // release the output resources. this method should never fail.
                if (operatorChain != null) {

Review comment:
       I think the above exceptions can be lost if a new exception is thrown by 
this block.
   
   Again, why not to use `suppressThrowable` here too? 
   To preserve the current behavior (exit immediately if this block fails) we 
can `throw suppressedException` right after it.
   However, I don't think that the current behavior is intentional.
   
   The same about `mailboxProcessor.close` below.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -540,8 +540,8 @@ public final void invoke() throws Exception {
                        try {
                                cleanUpInvoke();
                        }
-                       catch (Throwable cleanUpException) {
-                               throw (Exception) 
ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
+                       catch (Exception cleanUpException) {
+                               throw 
ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);

Review comment:
       I think we agreed during the offline discussion to preserve the types of 
what is being caught (or am I wrong?).
   
   We can preserve `Throwable` like this:
   ```
   } catch (Throwable cleanUpException) {
       
ExceptionUtils.rethrow(ExceptionUtils.firstOrSuppressed(cleanUpException, 
invokeException));
   ```
   
   I think this should be a separate `[hotfix]` commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to