sunhaibotb commented on a change in pull request #10151: [FLINK-14231] Handle
the processing-time timers before closing operator to properly support endInput
URL: https://github.com/apache/flink/pull/10151#discussion_r349540953
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -1395,6 +1381,114 @@ private void
checkpointStreamOperator(StreamOperator<?> op) throws Exception {
}
}
+ /**
+ * This class executes {@link StreamOperator#close()} of all operators
in the chain
+ * of this {@link StreamTask} one by one through the mailbox thread.
Closing happens
+ * from <b>head to tail</b> operator in the chain, contrary to {@link
StreamOperator#open()}
+ * which happens <b>tail to head</b> (see {@link #openAllOperators()}.
Review comment:
For `StreamOperatorWrapper`, I don't think it is simpler or better than the
current implementation. For example, every `StreamOperatorWrapper` also needs
to store its `ProcessingTimeService` instance, and the operations of
`endInput`, `close`, and waiting the executing timers to finish still need to
put to the mailbox (One of the reasons is timer may be registered in
`endInput`, and the timer needs to be executed by the main thread, so we can't
block the main thread).
The second approach is a good direction, but it involves to change APIs
(such as `Output`, `StreamOperator`, etc.) and some mechanisms, which need
detailed discussion as another topic.
I propose to use the current implementation in this JIRA. What do you think?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services