curcur commented on a change in pull request #12525:
URL: https://github.com/apache/flink/pull/12525#discussion_r437299439
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors)
throws Exception {
operator.dispose();
}
catch (Exception e) {
- LOG.error("Error during
disposal of stream operator.", e);
+ disposalException =
ExceptionUtils.firstOrSuppressed(e, disposalException);
Review comment:
> Isn't `e` and thus `newException` always non-null?
yep, i agree. I think the assumption here is you want to suppress an
exception only if you catch a real exception. But I guess no hurt to add the
check.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors)
throws Exception {
operator.dispose();
}
catch (Exception e) {
- LOG.error("Error during
disposal of stream operator.", e);
+ disposalException =
ExceptionUtils.firstOrSuppressed(e, disposalException);
Review comment:
> Isn't `e` and thus `newException` always non-null?
yep, i agree. I think the assumption here is you want to suppress an
exception only if you catch a real exception.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors)
throws Exception {
operator.dispose();
}
catch (Exception e) {
- LOG.error("Error during
disposal of stream operator.", e);
+ disposalException =
ExceptionUtils.firstOrSuppressed(e, disposalException);
Review comment:
> Isn't `e` and thus `newException` always non-null?
yep, i agree. I think the assumption here is suppressing an exception only
if catching a real exception. And the `checkNotNull(newException,
"newException");'
explicitly requires the newException non-null.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors)
throws Exception {
operator.dispose();
}
catch (Exception e) {
- LOG.error("Error during
disposal of stream operator.", e);
+ disposalException =
ExceptionUtils.firstOrSuppressed(e, disposalException);
Review comment:
> Isn't `e` and thus `newException` always non-null?
yep, i agree. I think the assumption here is suppressing an exception only
if catching a real exception. And the `checkNotNull(newException,
"newException");`
explicitly requires the newException non-null.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -692,6 +693,7 @@ private void shutdownAsyncThreads() throws Exception {
*/
private void disposeAllOperators(boolean logOnlyErrors) throws
Exception {
if (operatorChain != null && !disposedOperators) {
+ Exception disposalException = null;
for (StreamOperatorWrapper<?, ?> operatorWrapper :
operatorChain.getAllOperators(true)) {
StreamOperator<?> operator =
operatorWrapper.getStreamOperator();
if (!logOnlyErrors) {
Review comment:
> I think this `!logOnlyErrors` branch could also gain from your change.
> Currently, it disposes operators until 1st failure.
Yes, I agree. However, I was a little afraid to make the change from the
safety perspective. It changes the behavior of the code, like what exception to
throw, and the exit procedure.
For example, in some cases, the system is preferring a faster crash as soon
as the first disposal occurs (it is just an example, not saying it would be
like that).
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors)
throws Exception {
operator.dispose();
}
catch (Exception e) {
- LOG.error("Error during
disposal of stream operator.", e);
+ disposalException =
ExceptionUtils.firstOrSuppressed(e, disposalException);
}
}
}
disposedOperators = true;
+ if (disposalException != null) {
+ throw disposalException;
Review comment:
That's true, but what gonna happen to the suppressed exception? You
still need to throw it otherwise the exception is swallowed I guess.
cc @pnowojski
Is `disposeAllOperators(true);` designed not to throw any exception just
force shut down? If that's the case, I guess the best way is just logging
saying that `dispose error is not the rootcause?'
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -537,6 +537,7 @@ public final void invoke() throws Exception {
afterInvoke();
Review comment:
This basically says we want to log the first encountered disposal error.
In the catch block, we do an enforce one and clean up no matter what. So I
guess `disposeAllOperators(true)` was intended to not throw exceptions, which I
guess the proposed option 3 (logging "disposal error is not the root cause")
might be a solution.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]