rkhachatryan commented on a change in pull request #12525:
URL: https://github.com/apache/flink/pull/12525#discussion_r438652781



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
                // stop all timers and threads
                tryShutdownTimerService();
 
+               Throwable suppressedThrowable = null;
                // stop all asynchronous checkpoint threads
                try {
                        cancelables.close();
                        shutdownAsyncThreads();
                } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Could not shut down async checkpoint 
threads", t);
+                       // catch and suppress the exception to not replace the 
original exception
+                       suppressedThrowable = 
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);

Review comment:
       It would be nice to preserve `"Could not shut down async checkpoint 
threads"` message (by wrapping `t`); but probably stacktrace is enough.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
                // stop all timers and threads
                tryShutdownTimerService();

Review comment:
       This call has the same try-catch-log inside. Can we apply the same 
approach to it?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -537,6 +537,7 @@ public final void invoke() throws Exception {
                        afterInvoke();
                }
                catch (Exception invokeException) {
+                       LOG.error("Error while running task " + 
getTaskNameWithSubtaskAndId(), invokeException);

Review comment:
       nit: can we use `{}` instead of string concatenation here?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
                // stop all timers and threads
                tryShutdownTimerService();
 
+               Throwable suppressedThrowable = null;
                // stop all asynchronous checkpoint threads
                try {
                        cancelables.close();
                        shutdownAsyncThreads();
                } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Could not shut down async checkpoint 
threads", t);
+                       // catch and suppress the exception to not replace the 
original exception
+                       suppressedThrowable = 
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
                }
 
                // we must! perform this cleanup
                try {
                        cleanup();
                } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Error during cleanup of stream task", t);
+                       // catch and suppress the exception to not replace the 
original exception
+                       suppressedThrowable = 
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
                }
 
                // if the operators were not disposed before, do a hard dispose
-               disposeAllOperators(true);
+               try {
+                       disposeAllOperators(true);
+               } catch (Throwable t) {
+                       // catch and suppress all the disposal exceptions
+                       suppressedThrowable = 
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);

Review comment:
       ditto about comment and error message.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +714,14 @@ private void disposeAllOperators(boolean logOnlyErrors) 
throws Exception {
                                                operator.dispose();
                                        }
                                        catch (Exception e) {

Review comment:
       This is a bit inconsistent with other changes where we catch 
`Throwable`, not `Exception`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
                // stop all timers and threads
                tryShutdownTimerService();
 
+               Throwable suppressedThrowable = null;
                // stop all asynchronous checkpoint threads
                try {
                        cancelables.close();
                        shutdownAsyncThreads();
                } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Could not shut down async checkpoint 
threads", t);
+                       // catch and suppress the exception to not replace the 
original exception

Review comment:
       nit: this comment doesn't seem very useful. I'd remove it.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
                // stop all timers and threads
                tryShutdownTimerService();
 
+               Throwable suppressedThrowable = null;
                // stop all asynchronous checkpoint threads
                try {
                        cancelables.close();
                        shutdownAsyncThreads();
                } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Could not shut down async checkpoint 
threads", t);
+                       // catch and suppress the exception to not replace the 
original exception
+                       suppressedThrowable = 
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
                }
 
                // we must! perform this cleanup
                try {
                        cleanup();
                } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Error during cleanup of stream task", t);
+                       // catch and suppress the exception to not replace the 
original exception
+                       suppressedThrowable = 
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
                }
 
                // if the operators were not disposed before, do a hard dispose
-               disposeAllOperators(true);
+               try {
+                       disposeAllOperators(true);

Review comment:
       I guess we can remove `logOnlyErrors` argument now as we are catching 
exceptions.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -646,10 +653,14 @@ protected void cleanUpInvoke() throws Exception {
                try {
                        channelIOExecutor.shutdown();
                } catch (Throwable t) {
-                       LOG.error("Error during shutdown the channel state 
unspill executor", t);
+                       suppressedThrowable = 
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
                }
 
                mailboxProcessor.close();
+
+               if (suppressedThrowable != null) {
+                       throw (Exception) suppressedThrowable;

Review comment:
       I think `suppressedThrowable` doesn't have to be `Exception`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
                // stop all timers and threads
                tryShutdownTimerService();
 
+               Throwable suppressedThrowable = null;
                // stop all asynchronous checkpoint threads
                try {
                        cancelables.close();
                        shutdownAsyncThreads();
                } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Could not shut down async checkpoint 
threads", t);
+                       // catch and suppress the exception to not replace the 
original exception
+                       suppressedThrowable = 
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);
                }
 
                // we must! perform this cleanup
                try {
                        cleanup();
                } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Error during cleanup of stream task", t);
+                       // catch and suppress the exception to not replace the 
original exception
+                       suppressedThrowable = 
ExceptionUtils.firstOrSuppressed(t, suppressedThrowable);

Review comment:
       ditto about the comment and losing error message.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -613,25 +614,31 @@ protected void cleanUpInvoke() throws Exception {
                // stop all timers and threads
                tryShutdownTimerService();
 
+               Throwable suppressedThrowable = null;

Review comment:
       So we have 4-5 similar try-catch blocks one after another now.
   
   I guess we can't use Guava `Closer` because we want to handle any type of 
exception, right?
   And not CloseableRegistry because it closes silently.
   
   Maybe we can emulate it by having a collection of closeables and close them 
in a loop:
   ```
   List<ThrowingRunnable<?>> runs = asList(cancelables::close, 
this::shutdownAsyncThreads, this::cleanup, this::disposeAllOperators, ...);
   Throwable suppressedThrowable = null;
   for (ThrowingRunnable<?> run: runs) {
       try {
           run.run();
       } catch (Throwable t) {
           suppressedThrowable = ExceptionUtils.firstOrSuppressed(t, 
suppressedThrowable);
       }
   }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to