pnowojski commented on a change in pull request #15425:
URL: https://github.com/apache/flink/pull/15425#discussion_r603854862
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -97,6 +97,11 @@ public void init() throws Exception {
inputProcessor = new StreamOneInputProcessor<>(input, output,
operatorChain);
}
+ @Override
+ protected void finishTask() throws Exception {
+ stopMailboxProcessor();
Review comment:
I think I agree with @kezhuw , but I'm worried this is just a bug and it
would brake clean shutdown of `stop-with-savepoint WITH drain`. As far as I can
tell, `cancelables.close()` is closing some state backend resources. If they
are closed, I wouldn't be surprised that operators/user functions would start
failing during the clean shutdown. Calling
`mailboxProcessor.allActionsCompleted()` here should be enough.
Probably this is missing a test coverage.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -730,12 +730,7 @@ public final void cancel() throws Exception {
(unusedResult, unusedError) -> {
// WARN: the method is called from the task
thread but the callback
// can be invoked from a different thread
- mailboxProcessor.allActionsCompleted();
- try {
- cancelables.close();
- } catch (IOException e) {
- throw new CompletionException(e);
- }
+ stopMailboxProcessor();
Review comment:
After your change `stopMailboxProcessor()` has an implicit contract that
it should be thread safe and because of that I'm not sure if extracting this
code to separate method as it is is a good idea. There is `WARN` comment that
you have missed.
(Previously that was a private implementation detail of this lambda method
that it has to be thread safe. Now you are moving it to the `StreamTask`
interface)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]