zhijiangW commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r420178858
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -464,6 +472,20 @@ protected void beforeInvoke() throws Exception {
writer.readRecoveredState(getEnvironment().getTaskStateManager().getChannelStateReader());
}
}
+
+ // It would get possible benefits to recovery input
side after output side, which guarantees the
+ // output can request more floating buffers from global
firstly.
+ InputGate[] inputGates =
getEnvironment().getAllInputGates();
+ if (inputGates != null) {
+ for (InputGate inputGate : inputGates) {
+
inputGate.readRecoveredState(channelIOExecutor,
getEnvironment().getTaskStateManager().getChannelStateReader());
+ }
+
+ // Note that we must request partition after
all the single gate finishes recovery.
+ for (InputGate inputGate : inputGates) {
+
inputGate.requestPartitions(channelIOExecutor);
+ }
Review comment:
Yeah, actually I also considered the way of requesting partition by
mailbox thread after all futures completed returned by
`inputGate.readRecoveredState`.
But I also thought of another existing case to execute partition request by
non-task main thread. During `SingleInputGate#updateInputChannel`, when the
unknown channel transform into local or remote channel, then it would request
partition directly by rpc thread. So this case makes sense, then my assumption
was that partition request actually can be executed by any other threads
without race condition issues. So I take the current way instead to save some
efforts.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]