zjureel commented on code in PR #19380:
URL: https://github.com/apache/flink/pull/19380#discussion_r974066032


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java:
##########
@@ -243,6 +246,22 @@ public void userEventTriggered(ChannelHandlerContext ctx, 
Object msg) throws Exc
             if (toRelease != null) {
                 releaseViewReader(toRelease);
             }
+        } else if (msg.getClass() == PartitionRequestNotifierTimeout.class) {
+            PartitionRequestNotifierTimeout partitionRequestNotifierTimeout = 
(PartitionRequestNotifierTimeout) msg;
+
+            // Send partition not found message to the downstream task when 
the notifier is timeout.
+            final PartitionRequestNotifier partitionRequestNotifier = 
partitionRequestNotifierTimeout
+                    .getPartitionRequestNotifier();
+            final ResultPartitionID resultPartitionId = 
partitionRequestNotifier.getResultPartitionId();
+            final InputChannelID inputChannelId = 
partitionRequestNotifier.getReceiverId();
+            availableReaders.remove(partitionRequestNotifier.getViewReader());
+            allReaders.remove(inputChannelId);

Review Comment:
   When netty server receives partition request, it will create a reader and 
add it to allReaders even when the partition is not registered. This reader 
should be removed when the partition request listener for the partition may be 
not registered in the TM anymore. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to