curcur commented on a change in pull request #12525:
URL: https://github.com/apache/flink/pull/12525#discussion_r437299439



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors) 
throws Exception {
                                                operator.dispose();
                                        }
                                        catch (Exception e) {
-                                               LOG.error("Error during 
disposal of stream operator.", e);
+                                               disposalException = 
ExceptionUtils.firstOrSuppressed(e, disposalException);

Review comment:
       > Isn't `e` and thus `newException` always non-null?
   
   yep, i agree. I think the assumption here is you want to suppress an 
exception only if you catch a real exception. But I guess no hurt to add the 
check.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors) 
throws Exception {
                                                operator.dispose();
                                        }
                                        catch (Exception e) {
-                                               LOG.error("Error during 
disposal of stream operator.", e);
+                                               disposalException = 
ExceptionUtils.firstOrSuppressed(e, disposalException);

Review comment:
       > Isn't `e` and thus `newException` always non-null?
   
   yep, i agree. I think the assumption here is you want to suppress an 
exception only if you catch a real exception.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors) 
throws Exception {
                                                operator.dispose();
                                        }
                                        catch (Exception e) {
-                                               LOG.error("Error during 
disposal of stream operator.", e);
+                                               disposalException = 
ExceptionUtils.firstOrSuppressed(e, disposalException);

Review comment:
       > Isn't `e` and thus `newException` always non-null?
   
   yep, i agree. I think the assumption here is suppressing an exception only 
if catching a real exception. And the `checkNotNull(newException, 
"newException");'
   explicitly requires the newException non-null.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors) 
throws Exception {
                                                operator.dispose();
                                        }
                                        catch (Exception e) {
-                                               LOG.error("Error during 
disposal of stream operator.", e);
+                                               disposalException = 
ExceptionUtils.firstOrSuppressed(e, disposalException);

Review comment:
       > Isn't `e` and thus `newException` always non-null?
   
   yep, i agree. I think the assumption here is suppressing an exception only 
if catching a real exception. And the `checkNotNull(newException, 
"newException");`
   explicitly requires the newException non-null.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -692,6 +693,7 @@ private void shutdownAsyncThreads() throws Exception {
         */
        private void disposeAllOperators(boolean logOnlyErrors) throws 
Exception {
                if (operatorChain != null && !disposedOperators) {
+                       Exception disposalException = null;
                        for (StreamOperatorWrapper<?, ?> operatorWrapper : 
operatorChain.getAllOperators(true)) {
                                StreamOperator<?> operator = 
operatorWrapper.getStreamOperator();
                                if (!logOnlyErrors) {

Review comment:
       > I think this `!logOnlyErrors` branch could also gain from your change.
   > Currently, it disposes operators until 1st failure.
   
   Yes, I agree. However, I was a little afraid to make the change from the 
safety perspective. It changes the behavior of the code, like what exception to 
throw, and the exit procedure.
   
   For example, in some cases, the system is preferring a faster crash as soon 
as the first disposal occurs (it is just an example, not saying it would be 
like that).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors) 
throws Exception {
                                                operator.dispose();
                                        }
                                        catch (Exception e) {
-                                               LOG.error("Error during 
disposal of stream operator.", e);
+                                               disposalException = 
ExceptionUtils.firstOrSuppressed(e, disposalException);
                                        }
                                }
                        }
                        disposedOperators = true;
+                       if (disposalException != null) {
+                               throw disposalException;

Review comment:
       That's true, but what gonna happen to the suppressed exception? You 
still need to throw it otherwise the exception is swallowed I guess.
   
   cc @pnowojski
   
   Is `disposeAllOperators(true);` designed not to throw any exception just 
force shut down? If that's the case, I guess the best way is just logging 
saying that `dispose error is not the rootcause?' 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -537,6 +537,7 @@ public final void invoke() throws Exception {
                        afterInvoke();

Review comment:
       This basically says we want to log the first encountered disposal error.
   
   In the catch block, we do an enforce one and clean up no matter what. So I 
guess `disposeAllOperators(true)` was intended to not throw exceptions, which I 
guess the proposed option 3 (logging "disposal error is not the root cause") 
might be a solution.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to