This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d85f679c [client] Check whether request is removed from inflight 
requests to avoid NullPointerException when connection is closed. (#2127)
8d85f679c is described below

commit 8d85f679ca98c4055f9e1e8f8e5e9e94314424e0
Author: vamossagar12 <[email protected]>
AuthorDate: Mon Dec 22 07:47:09 2025 +0530

    [client] Check whether request is removed from inflight requests to avoid 
NullPointerException when connection is closed. (#2127)
---
 .../org/apache/fluss/rpc/netty/client/ServerConnection.java    | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index a09a50c09..2b52d87f6 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -166,7 +166,9 @@ final class ServerConnection {
             // notify all the inflight requests
             for (int requestId : inflightRequests.keySet()) {
                 InflightRequest request = inflightRequests.remove(requestId);
-                request.responseFuture.completeExceptionally(requestCause);
+                if (request != null) {
+                    request.responseFuture.completeExceptionally(requestCause);
+                }
             }
 
             // notify all the pending requests
@@ -249,6 +251,12 @@ final class ServerConnection {
     private void establishConnection(ChannelFuture future, boolean 
isInnerClient) {
         synchronized (lock) {
             if (future.isSuccess()) {
+                if (state.isDisconnected()) {
+                    LOG.debug(
+                            "Connection established to {} but connection is 
already closed.", node);
+                    future.channel().close();
+                    return;
+                }
                 LOG.debug("Established connection to server {}.", node);
                 channel = future.channel();
                 channel.pipeline()

Reply via email to