pnowojski commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r411488676



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -222,19 +225,65 @@ public SingleInputGate(
        }
 
        @Override
-       public void setup() throws IOException, InterruptedException {
+       public void setup() throws IOException {
                checkState(this.bufferPool == null, "Bug in input gate setup 
logic: Already registered buffer pool.");
                // assign exclusive buffers to input channels directly and use 
the rest for floating buffers
                assignExclusiveSegments();
 
                BufferPool bufferPool = bufferPoolFactory.get();
                setBufferPool(bufferPool);
+       }
 
-               requestPartitions();
+       @Override
+       public void initializeStateAndRequestPartitions(
+                       boolean hasStates,
+                       @Nullable ExecutorService executor,
+                       ChannelStateReader reader) throws Exception {
+
+               if (hasStates) {
+                       checkNotNull(executor);
+                       readRecoveredStateBeforeRequestPartition(executor, 
reader);
+               } else {
+                       requestPartitions();
+               }

Review comment:
       Ok, I see. This is messy :/ It comes down to this issue that input gates 
are initialised in the `Task` and state is handled in `StreamTask` :/ It should 
have been handled together upon the construction of `SingleInputGate` class.
   
   I think I would be +0.1 for dropping `initializeStateAndRequestPartitions` 
and both `readRecoveredStateBeforeRequestPartition` and `requestPartitions` to 
the interfaces, but if you think it's better to have a single method, I'm fine 
with that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to