This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 997b14653 [client] Fix exception occurs when initializing server 
connection. (#1740)
997b14653 is described below

commit 997b1465342683b25fe14ab2ebec0aa0daa2cb02
Author: Hongshun Wang <[email protected]>
AuthorDate: Sat Sep 27 13:47:38 2025 +0800

    [client] Fix exception occurs when initializing server connection. (#1740)
---
 .../apache/fluss/rpc/netty/client/NettyClient.java    | 16 +++++++---------
 .../fluss/rpc/netty/client/ServerConnection.java      | 15 ++++++++++-----
 .../fluss/rpc/netty/client/NettyClientTest.java       | 19 +++++++++++++++++++
 .../fluss/rpc/netty/client/ServerConnectionTest.java  |  1 +
 4 files changed, 37 insertions(+), 14 deletions(-)

diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
index 3334e392a..f567b8584 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
@@ -192,15 +192,13 @@ public final class NettyClient implements RpcClient {
                 serverId,
                 ignored -> {
                     LOG.debug("Creating connection to server {}.", node);
-                    ServerConnection connection =
-                            new ServerConnection(
-                                    bootstrap,
-                                    node,
-                                    clientMetricGroup,
-                                    authenticatorSupplier.get(),
-                                    isInnerClient);
-                    connection.whenClose(ignore -> 
connections.remove(serverId, connection));
-                    return connection;
+                    return new ServerConnection(
+                            bootstrap,
+                            node,
+                            clientMetricGroup,
+                            authenticatorSupplier.get(),
+                            (con, ignore) -> connections.remove(serverId, con),
+                            isInnerClient);
                 });
     }
 
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index 26ceb6bc0..3aa85e24f 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -58,7 +58,7 @@ import java.util.ArrayDeque;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
 
 import static org.apache.fluss.utils.IOUtils.closeQuietly;
 
@@ -104,15 +104,20 @@ final class ServerConnection {
             ServerNode node,
             ClientMetricGroup clientMetricGroup,
             ClientAuthenticator authenticator,
+            BiConsumer<ServerConnection, Throwable> closeCallback,
             boolean isInnerClient) {
         this.node = node;
         this.state = ConnectionState.CONNECTING;
         this.connectionMetricGroup = 
clientMetricGroup.createConnectionMetricGroup(node.uid());
+        this.authenticator = authenticator;
+        this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
+        whenClose(closeCallback);
+
+        // connect and handle should be last in case of other variables are 
nullable and close
+        // callback is not registered when connection established.
         bootstrap
                 .connect(node.host(), node.port())
                 .addListener(future -> establishConnection((ChannelFuture) 
future, isInnerClient));
-        this.authenticator = authenticator;
-        this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
     }
 
     public ServerNode getServerNode() {
@@ -131,8 +136,8 @@ final class ServerConnection {
     }
 
     /** Register a callback to be called when the connection is closed. */
-    public void whenClose(Consumer<Throwable> closeCallback) {
-        closeFuture.whenComplete((v, throwable) -> 
closeCallback.accept(throwable));
+    private void whenClose(BiConsumer<ServerConnection, Throwable> 
closeCallback) {
+        closeFuture.whenComplete((v, throwable) -> closeCallback.accept(this, 
throwable));
     }
 
     /** Close the connection. */
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
index e7f7584f6..94b79b556 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
@@ -234,6 +234,25 @@ final class NettyClientTest {
         }
     }
 
+    @Test
+    void testExceptionWhenInitializeServerConnection() throws Exception {
+        ApiVersionsRequest request =
+                new ApiVersionsRequest()
+                        .setClientSoftwareName("testing_client_100")
+                        .setClientSoftwareVersion("1.0");
+        // close the netty server.
+        nettyServer.close();
+
+        // send request and create server connection.
+        assertThatThrownBy(
+                        () ->
+                                nettyClient
+                                        .sendRequest(serverNode, 
ApiKeys.API_VERSIONS, request)
+                                        .get())
+                .hasMessageContaining("Disconnected from node");
+        assertThat(nettyClient.connections()).isEmpty();
+    }
+
     private void buildNettyServer(int serverId) throws Exception {
         try (NetUtils.Port availablePort = getAvailablePort()) {
             serverNode =
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
index 86328f6da..6b2363c88 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
@@ -96,6 +96,7 @@ public class ServerConnectionTest {
                         serverNode,
                         TestingClientMetricGroup.newInstance(),
                         clientAuthenticator,
+                        (con, ignore) -> {},
                         false);
         ConnectionState connectionState = connection.getConnectionState();
         assertThat(connectionState).isEqualTo(ConnectionState.CONNECTING);

Reply via email to