becketqin commented on a change in pull request #15425:
URL: https://github.com/apache/flink/pull/15425#discussion_r604607406
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -97,6 +97,11 @@ public void init() throws Exception {
inputProcessor = new StreamOneInputProcessor<>(input, output,
operatorChain);
}
+ @Override
+ protected void finishTask() throws Exception {
+ stopMailboxProcessor();
Review comment:
@kezhuw @pnowojsk Thanks for the comment. I have misunderstood the usage
of cancelables. From what Piotr describes, it seems to me that the cancelables
should only be closed in the common cleanup phase along with operator disposal.
`cleanupInvoke` is already doing that.
The most counter-intuitive part of the procedure here is that
`StreamTask.invoke()` throws `CancelTaskException` upon cancelation without
going through the cleanup logic... If the reason behind this is to allow fast
shutdown in cases that a job running in a dedicated cluster is canceled, so
that no cleanup is needed because JVM will exit, maybe we should have a flag
like SKIP_CLEANUP_ON_CANCELATION.
For this patch, I'll just remove the `cancelable.close()` section here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]