zhijiangW commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r411465588
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -222,19 +225,65 @@ public SingleInputGate(
}
@Override
- public void setup() throws IOException, InterruptedException {
+ public void setup() throws IOException {
checkState(this.bufferPool == null, "Bug in input gate setup
logic: Already registered buffer pool.");
// assign exclusive buffers to input channels directly and use
the rest for floating buffers
assignExclusiveSegments();
BufferPool bufferPool = bufferPoolFactory.get();
setBufferPool(bufferPool);
+ }
- requestPartitions();
+ @Override
+ public void initializeStateAndRequestPartitions(
+ boolean hasStates,
+ @Nullable ExecutorService executor,
+ ChannelStateReader reader) throws Exception {
+
+ if (hasStates) {
+ checkNotNull(executor);
+ readRecoveredStateBeforeRequestPartition(executor,
reader);
+ } else {
+ requestPartitions();
+ }
Review comment:
It actually inlines two requirements on purpose. TBH I was also a bit
torn to do so before.
My previous thought was trying not to affect the normal process if not
enabling unaligned mode, that means requesting partitions directly, otherwise
short lifecycle jobs might be sensitive for the potential performance
regression with unnecessary state recovery process. And passing `nullable`
executor can also simplify the unit tests without unaligned mode not to
maintain the useless `executor`.
Another solution is to separate it into two methods. One is for
`#requestPartitions` and the other is for `readRecoveredState`. But that means
we also need to define another explicit interface method
`InputGate#requestPartitions`, which would bring more tiny steps in the
lifecycle of `InputGate`, like `setup` -> `readRecoveredState` ->
`requestPartitions`. So I chose the inlined way to reduce this overhead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]