pnowojski commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r413065523



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -222,19 +226,54 @@ public SingleInputGate(
        }
 
        @Override
-       public void setup() throws IOException, InterruptedException {
+       public void setup() throws IOException {
                checkState(this.bufferPool == null, "Bug in input gate setup 
logic: Already registered buffer pool.");
                // assign exclusive buffers to input channels directly and use 
the rest for floating buffers
                assignExclusiveSegments();
 
                BufferPool bufferPool = bufferPoolFactory.get();
                setBufferPool(bufferPool);
+       }
+
+       @Override
+       public void readRecoveredState(ExecutorService executor, 
ChannelStateReader reader) throws IOException {
+               internalRequestPartitions(() -> executor.submit(() -> {
+
+                       for (InputChannel inputChannel : 
inputChannels.values()) {
+                               if (inputChannel instanceof RemoteInputChannel) 
{
+                                       try {
+                                               ((RemoteInputChannel) 
inputChannel).readRecoveredState(reader);
+                                       } catch (Throwable t) {
+                                               inputChannel.setError(t);
+                                               return;
+                                       }
+                               }

Review comment:
       I think this block of code is forgetting about `getUnconsumedBuffer()` 
from the `LocalInputChannel` that was spilled as part of the input data. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to