pnowojski commented on a change in pull request #15425:
URL: https://github.com/apache/flink/pull/15425#discussion_r604613973
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -730,12 +730,7 @@ public final void cancel() throws Exception {
(unusedResult, unusedError) -> {
// WARN: the method is called from the task
thread but the callback
// can be invoked from a different thread
- mailboxProcessor.allActionsCompleted();
- try {
- cancelables.close();
- } catch (IOException e) {
- throw new CompletionException(e);
- }
+ stopMailboxProcessor();
Review comment:
> Not sure if I fully understand the concern. Can you help me understand
more? Previously it is possible that the logic in stopMailboxProcessor is run
in a different thread from the mailbox thread, e.g. the SourceThread in
SourceStreamTask. So supposedly the stopMailboxProcessor logic should already
be thread safe. And expose it to subclasses should also be safe. Is that the
correct?
yes, it's correct.
But previously that was implementation detail of the `cancel()` method. By
promoting this code to a separate `protected` method, you are making it a part
of the `StreamTask` interface. That's the part I wasn't sure about. But I
believe if you fix the first issue with not calling `cancelables.close()`, this
change here can be safely reverted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]