rkhachatryan commented on a change in pull request #12525:
URL: https://github.com/apache/flink/pull/12525#discussion_r437270735



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors) 
throws Exception {
                                                operator.dispose();
                                        }
                                        catch (Exception e) {
-                                               LOG.error("Error during 
disposal of stream operator.", e);
+                                               disposalException = 
ExceptionUtils.firstOrSuppressed(e, disposalException);

Review comment:
       Yes, you are right.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -692,6 +693,7 @@ private void shutdownAsyncThreads() throws Exception {
         */
        private void disposeAllOperators(boolean logOnlyErrors) throws 
Exception {
                if (operatorChain != null && !disposedOperators) {
+                       Exception disposalException = null;
                        for (StreamOperatorWrapper<?, ?> operatorWrapper : 
operatorChain.getAllOperators(true)) {
                                StreamOperator<?> operator = 
operatorWrapper.getStreamOperator();
                                if (!logOnlyErrors) {

Review comment:
       I don't think that failing fast this was intentional here. Anyways, you 
are already changing this behavior for `logOnlyErrors == false` case.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors) 
throws Exception {
                                                operator.dispose();
                                        }
                                        catch (Exception e) {
-                                               LOG.error("Error during 
disposal of stream operator.", e);
+                                               disposalException = 
ExceptionUtils.firstOrSuppressed(e, disposalException);
                                        }
                                }
                        }
                        disposedOperators = true;
+                       if (disposalException != null) {
+                               throw disposalException;

Review comment:
       > what gonna happen to the suppressed exception?
   
   It will be logged by
   ```
   LOG.error("Invoke Error", invokeException);
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -537,6 +537,7 @@ public final void invoke() throws Exception {
                        afterInvoke();

Review comment:
       There can be no root cause, just a normal shutdown.
   I think change of the log message (comment below) will make it better.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to