reswqa commented on code in PR #21351:
URL: https://github.com/apache/flink/pull/21351#discussion_r1027297678


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +171,58 @@ private void checkReuseNettyPartitionRequestClient(
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
+    /**
+     * Verify that the netty client reuse when the netty server closes the 
channel and there is no
+     * input channel.
+     */
+    @TestTemplate
+    void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws 
Exception {
+        NettyProtocol protocol =
+                new NettyProtocol(
+                        mock(ResultPartitionProvider.class), 
mock(TaskEventDispatcher.class)) {

Review Comment:
   We should not introduce mock into the test class. Please refer to [Avoid 
Mockito](https://flink.apache.org/contributing/code-style-and-quality-common.html).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java:
##########
@@ -277,6 +270,16 @@ private boolean canBeReused() {
         return clientFactory.isConnectionReuseEnabled() && 
!clientHandler.hasChannelError();
     }
 
+    @VisibleForTesting
+    public Channel getTcpChannel() {

Review Comment:
   I think we can get the handler directly through the channel(via 
`channel.pipeline().get(NetworkClientHandler.class`), so we don't need to 
introduce the next method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java:
##########
@@ -277,6 +270,16 @@ private boolean canBeReused() {
         return clientFactory.isConnectionReuseEnabled() && 
!clientHandler.hasChannelError();
     }
 
+    @VisibleForTesting
+    public Channel getTcpChannel() {
+        return tcpChannel;
+    }
+
+    @VisibleForTesting

Review Comment:
   These two `@VisibleForTesting` methods should probably be avoided. For the 
connection reuse scenario, we are doing black box testing and should rely less 
on internal state to make assertions. I suggest that we consider another 
approach to trigger the close of the serverHandler.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +171,58 @@ private void checkReuseNettyPartitionRequestClient(
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
+    /**
+     * Verify that the netty client reuse when the netty server closes the 
channel and there is no
+     * input channel.
+     */
+    @TestTemplate
+    void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws 
Exception {
+        NettyProtocol protocol =
+                new NettyProtocol(
+                        mock(ResultPartitionProvider.class), 
mock(TaskEventDispatcher.class)) {
+
+                    @Override
+                    public ChannelHandler[] getServerChannelHandlers() {
+                        return new ChannelHandler[] {
+                            // Close on read
+                            new ChannelInboundHandlerAdapter() {
+                                @Override
+                                public void channelRead(ChannelHandlerContext 
ctx, Object msg) {
+                                    ctx.channel().close();
+                                }
+                            }
+                        };
+                    }
+
+                    @Override
+                    public ChannelHandler[] getClientChannelHandlers() {
+                        return new ChannelHandler[] {new 
ChannelInactiveFutureHandler()};
+                    }
+                };
+        NettyTestUtil.NettyServerAndClient serverAndClient = 
initServerAndClient(protocol);
+
+        PartitionRequestClientFactory factory =
+                new PartitionRequestClientFactory(
+                        serverAndClient.client(), 2, 1, 
connectionReuseEnabled);
+
+        ConnectionID connectionID = 
serverAndClient.getConnectionID(RESOURCE_ID, 0);
+        NettyPartitionRequestClient oldClient = 
factory.createPartitionRequestClient(connectionID);
+
+        ChannelInactiveFutureHandler clientHandler =
+                (ChannelInactiveFutureHandler) oldClient.getClientHandler();
+        assertFalse(clientHandler.hasChannelError());

Review Comment:
   Please use assertj instead of junit assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to