1996fanrui commented on code in PR #21351:
URL: https://github.com/apache/flink/pull/21351#discussion_r1027398763


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +166,58 @@ private void checkReuseNettyPartitionRequestClient(
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
+    /**
+     * Verify that the netty client reuse when the netty server closes the 
channel and there is no
+     * input channel.
+     */
+    @TestTemplate
+    void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws 
Exception {
+        CompletableFuture<Void> inactiveFuture = new CompletableFuture<>();
+        NettyProtocol protocol =
+                new NettyProtocol(null, null) {
+
+                    @Override
+                    public ChannelHandler[] getServerChannelHandlers() {
+                        return new ChannelHandler[] {
+                            // Close on read
+                            new ChannelInboundHandlerAdapter() {
+                                @Override
+                                public void channelRead(ChannelHandlerContext 
ctx, Object msg) {
+                                    ctx.channel().close();
+                                }
+                            }
+                        };
+                    }
+
+                    @Override
+                    public ChannelHandler[] getClientChannelHandlers() {
+                        return new ChannelHandler[] {
+                            new ChannelInactiveFutureHandler(inactiveFuture)
+                        };
+                    }
+                };
+        NettyTestUtil.NettyServerAndClient serverAndClient = 
initServerAndClient(protocol);
+
+        PartitionRequestClientFactory factory =
+                new PartitionRequestClientFactory(
+                        serverAndClient.client(), 2, 1, 
connectionReuseEnabled);
+
+        ConnectionID connectionID = 
serverAndClient.getConnectionID(RESOURCE_ID, 0);
+        NettyPartitionRequestClient oldClient = 
factory.createPartitionRequestClient(connectionID);
+
+        assertThat(oldClient.hasError()).isFalse();
+
+        // Write something to trigger close by server
+        oldClient.writeAndFlush(Unpooled.buffer().writerIndex(16));
+
+        inactiveFuture.get();
+        assertThat(oldClient.hasError()).isTrue();
+
+        NettyPartitionRequestClient newClient = 
factory.createPartitionRequestClient(connectionID);
+        assertThat(newClient).as("Factory should create a new 
client.").isNotSameAs(oldClient);
+        shutdown(serverAndClient);

Review Comment:
   Thanks for your great suggestion, it makes sense, updated.
   
   BTW, I removed the `assertThat(serverChannelFuture).isDone();` because 
channelRegistered is not called synchronously, unit test sometimes fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to