This is an automated email from the ASF dual-hosted git repository.
hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 8d85f679c [client] Check whether request is removed from inflight
requests to avoid NullPointerException when connection is closed. (#2127)
8d85f679c is described below
commit 8d85f679ca98c4055f9e1e8f8e5e9e94314424e0
Author: vamossagar12 <[email protected]>
AuthorDate: Mon Dec 22 07:47:09 2025 +0530
[client] Check whether request is removed from inflight requests to avoid
NullPointerException when connection is closed. (#2127)
---
.../org/apache/fluss/rpc/netty/client/ServerConnection.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index a09a50c09..2b52d87f6 100644
---
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -166,7 +166,9 @@ final class ServerConnection {
// notify all the inflight requests
for (int requestId : inflightRequests.keySet()) {
InflightRequest request = inflightRequests.remove(requestId);
- request.responseFuture.completeExceptionally(requestCause);
+ if (request != null) {
+ request.responseFuture.completeExceptionally(requestCause);
+ }
}
// notify all the pending requests
@@ -249,6 +251,12 @@ final class ServerConnection {
private void establishConnection(ChannelFuture future, boolean
isInnerClient) {
synchronized (lock) {
if (future.isSuccess()) {
+ if (state.isDisconnected()) {
+ LOG.debug(
+ "Connection established to {} but connection is
already closed.", node);
+ future.channel().close();
+ return;
+ }
LOG.debug("Established connection to server {}.", node);
channel = future.channel();
channel.pipeline()