pnowojski commented on a change in pull request #13000:
URL: https://github.com/apache/flink/pull/13000#discussion_r462289373
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -562,6 +567,7 @@ private void runMailboxLoop() throws Exception {
protected void afterInvoke() throws Exception {
LOG.debug("Finished task {}", getName());
+ getCompletionFuture().exceptionally(unused -> null).join();
Review comment:
1.
> It's already reported in SourceStreamTask.processInput using
mailboxProcessor.reportThrowable
I would rephrase it that "it SHOULD be reported in...". But what about
handling our bugs, when it won't be reported? Could we make it more error
prone? For example make sure that no exceptions should ever reach
`StreamTask#getCompletionFuture().join()`? And if something would reach it
here, it would be equivalent of an `IllegalStateException`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -562,6 +567,7 @@ private void runMailboxLoop() throws Exception {
protected void afterInvoke() throws Exception {
LOG.debug("Finished task {}", getName());
+ getCompletionFuture().exceptionally(unused -> null).join();
Review comment:
1.
> It's already reported in SourceStreamTask.processInput using
mailboxProcessor.reportThrowable
I would rephrase it that "it SHOULD be reported in...". But what about
handling our bugs, when it won't be reported? Could we make it more error
prone? For example make sure that no exceptions should ever reach
`StreamTask#getCompletionFuture().join()`? And if something would reach it
here, it would be equivalent of an `IllegalStateException`?
Maybe `SourceStreamTask#getCompletionFuture()` could wrap the future
somehow?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -562,6 +567,7 @@ private void runMailboxLoop() throws Exception {
protected void afterInvoke() throws Exception {
LOG.debug("Finished task {}", getName());
+ getCompletionFuture().exceptionally(unused -> null).join();
Review comment:
1.
> It's already reported in SourceStreamTask.processInput using
mailboxProcessor.reportThrowable
I think you are right, it should be correct now.
But I would just rephrase it that "it SHOULD be reported in...". what about
handling our bugs, when it won't be reported? Could we make it more error
prone? For example make sure that no exceptions should ever reach
`StreamTask#getCompletionFuture().join()`? And if something would reach it
here, it would be equivalent of an `IllegalStateException`?
Maybe `SourceStreamTask#getCompletionFuture()` could wrap the future
somehow? `SourceStreamTask` is the owner of logic handling/forwarding the
exceptions, so it would be a slightly better place to ignore exceptions
compared to `StreamTask`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -562,6 +567,7 @@ private void runMailboxLoop() throws Exception {
protected void afterInvoke() throws Exception {
LOG.debug("Finished task {}", getName());
+ getCompletionFuture().exceptionally(unused -> null).join();
Review comment:
ok, let's keep it as it is.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]