This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 67eefc4ba [common] Fix serverConnection that has already been set to 
disconnected is still processing requests error (#1722)
67eefc4ba is described below

commit 67eefc4bae5f91fa105aeca7a8f3174772d5ed75
Author: yunhong <[email protected]>
AuthorDate: Fri Sep 19 22:28:01 2025 +0800

    [common] Fix serverConnection that has already been set to disconnected is 
still processing requests error (#1722)
---
 .../fluss/rpc/netty/client/ServerConnection.java   |  52 +++-----
 .../fluss/rpc/netty/client/NettyClientTest.java    |   7 +-
 .../rpc/netty/client/ServerConnectionTest.java     | 141 +++++++++++++++++++++
 3 files changed, 164 insertions(+), 36 deletions(-)

diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index f07afdf9e..26ceb6bc0 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.rpc.netty.client;
 
+import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.exception.DisconnectException;
 import org.apache.fluss.exception.FlussRuntimeException;
@@ -170,39 +171,18 @@ final class ServerConnection {
             }
 
             if (channel != null) {
-                channel.close()
-                        .addListener(
-                                (ChannelFutureListener)
-                                        future -> {
-
-                                            // when finishing, if netty 
successfully closes the
-                                            // channel, then the provided 
exception is used as
-                                            // the reason for the closing. If 
there was something
-                                            // wrong at the netty side, then 
that exception is
-                                            // prioritized over the provided 
one.
-                                            if (future.isSuccess()) {
-                                                if (cause instanceof 
ClosedChannelException) {
-                                                    // the 
ClosedChannelException is expected
-                                                    closeFuture.complete(null);
-                                                } else {
-                                                    
closeFuture.completeExceptionally(cause);
-                                                }
-                                            } else {
-                                                LOG.warn(
-                                                        "Something went wrong 
when trying to close connection due to : ",
-                                                        cause);
-                                                
closeFuture.completeExceptionally(future.cause());
-                                            }
-                                        });
+                // Close the channel directly, without waiting for the channel 
to close properly.
+                channel.close();
+            }
+
+            // TODO all return completeExceptionally will let some test cases 
blocked, so we
+            // need to find why the test cases are blocked and remove the if 
statement.
+            if (cause instanceof ClosedChannelException
+                    || cause.getCause() instanceof ConnectException) {
+                // the ClosedChannelException and ConnectException is expected.
+                closeFuture.complete(null);
             } else {
-                // TODO all return completeExceptionally will let some test 
cases blocked, so we
-                // need to find why the test cases are blocked and remove the 
if statement.
-                if (cause.getCause() instanceof ConnectException) {
-                    // the ConnectException is expected
-                    closeFuture.complete(null);
-                } else {
-                    closeFuture.completeExceptionally(cause);
-                }
+                closeFuture.completeExceptionally(cause);
             }
 
             connectionMetricGroup.close();
@@ -491,7 +471,8 @@ final class ServerConnection {
      * <li>READY: connection is ready to send requests.
      * <li>DISCONNECTED: connection is failed to establish.
      */
-    private enum ConnectionState {
+    @VisibleForTesting
+    enum ConnectionState {
         CONNECTING,
         CHECKING_API_VERSIONS,
         AUTHENTICATING,
@@ -565,4 +546,9 @@ final class ServerConnection {
             return ((InetSocketAddress) 
channel.remoteAddress()).getAddress().getHostAddress();
         }
     }
+
+    @VisibleForTesting
+    ConnectionState getConnectionState() {
+        return state;
+    }
 }
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
index 72699f2d3..e7f7584f6 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -156,9 +157,9 @@ final class NettyClientTest {
                                 nettyClient
                                         .sendRequest(serverNode, 
ApiKeys.API_VERSIONS, request)
                                         .get())
-                .isInstanceOf(ExecutionException.class)
-                .hasMessageContaining("Disconnected from node")
-                .hasRootCauseMessage("finishConnect(..) failed: Connection 
refused");
+                .rootCause()
+                .isInstanceOf(ConnectException.class)
+                .hasMessageContaining("Connection refused");
         assertThat(nettyClient.connections().size()).isEqualTo(0);
 
         // restart the netty server.
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
new file mode 100644
index 000000000..86328f6da
--- /dev/null
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc.netty.client;
+
+import org.apache.fluss.cluster.Endpoint;
+import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.ServerType;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.DisconnectException;
+import org.apache.fluss.metrics.groups.MetricGroup;
+import org.apache.fluss.metrics.util.NOPMetricsGroup;
+import org.apache.fluss.rpc.TestingGatewayService;
+import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
+import org.apache.fluss.rpc.messages.PbTablePath;
+import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
+import org.apache.fluss.rpc.netty.client.ServerConnection.ConnectionState;
+import org.apache.fluss.rpc.netty.server.NettyServer;
+import org.apache.fluss.rpc.netty.server.RequestsMetrics;
+import org.apache.fluss.rpc.protocol.ApiKeys;
+import org.apache.fluss.security.auth.AuthenticationFactory;
+import org.apache.fluss.security.auth.ClientAuthenticator;
+import org.apache.fluss.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.fluss.utils.NetUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.fluss.rpc.netty.NettyUtils.getClientSocketChannelClass;
+import static org.apache.fluss.rpc.netty.NettyUtils.newEventLoopGroup;
+import static org.apache.fluss.utils.NetUtils.getAvailablePort;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ServerConnection}. */
+public class ServerConnectionTest {
+
+    private EventLoopGroup eventLoopGroup;
+    private Bootstrap bootstrap;
+    private ClientAuthenticator clientAuthenticator;
+    private Configuration conf;
+    private NettyServer nettyServer;
+    private ServerNode serverNode;
+    private TestingGatewayService service;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        conf = new Configuration();
+        buildNettyServer(0);
+
+        eventLoopGroup = newEventLoopGroup(1, "fluss-netty-client-test");
+        bootstrap =
+                new Bootstrap()
+                        .group(eventLoopGroup)
+                        .channel(getClientSocketChannelClass(eventLoopGroup))
+                        .handler(new ClientChannelInitializer(5000));
+        clientAuthenticator =
+                AuthenticationFactory.loadClientAuthenticatorSupplier(new 
Configuration()).get();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (nettyServer != null) {
+            nettyServer.close();
+        }
+
+        if (eventLoopGroup != null) {
+            eventLoopGroup.shutdownGracefully();
+        }
+    }
+
+    @Test
+    void testConnectionClose() {
+        ServerConnection connection =
+                new ServerConnection(
+                        bootstrap,
+                        serverNode,
+                        TestingClientMetricGroup.newInstance(),
+                        clientAuthenticator,
+                        false);
+        ConnectionState connectionState = connection.getConnectionState();
+        assertThat(connectionState).isEqualTo(ConnectionState.CONNECTING);
+
+        GetTableSchemaRequest request =
+                new GetTableSchemaRequest()
+                        .setTablePath(
+                                new 
PbTablePath().setDatabaseName("test").setTableName("test"))
+                        .setSchemaId(0);
+        connection.send(ApiKeys.GET_TABLE_SCHEMA, request);
+
+        CompletableFuture<Void> future = connection.close();
+        connectionState = connection.getConnectionState();
+        assertThat(connectionState).isEqualTo(ConnectionState.DISCONNECTED);
+        assertThat(future.isDone()).isTrue();
+
+        assertThatThrownBy(() -> connection.send(ApiKeys.GET_TABLE_SCHEMA, 
request).get())
+                .rootCause()
+                .isInstanceOf(DisconnectException.class)
+                .hasMessageContaining("Cannot send request to server");
+        future = connection.close();
+        assertThat(future.isDone()).isTrue();
+    }
+
+    private void buildNettyServer(int serverId) throws Exception {
+        try (NetUtils.Port availablePort = getAvailablePort()) {
+            serverNode =
+                    new ServerNode(
+                            serverId, "localhost", availablePort.getPort(), 
ServerType.COORDINATOR);
+            service = new TestingGatewayService();
+            MetricGroup metricGroup = NOPMetricsGroup.newInstance();
+            nettyServer =
+                    new NettyServer(
+                            conf,
+                            Collections.singleton(
+                                    new Endpoint(serverNode.host(), 
serverNode.port(), "INTERNAL")),
+                            service,
+                            metricGroup,
+                            
RequestsMetrics.createCoordinatorServerRequestMetrics(metricGroup));
+            nettyServer.start();
+        }
+    }
+}

Reply via email to