This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 38323a376 [CELEBORN-2068] TransportClientFactory should close channel 
explicitly to avoid resource leak for timeout or failure
38323a376 is described below

commit 38323a376efe4b2a22589fa0273ef99b4526ef0c
Author: SteNicholas <[email protected]>
AuthorDate: Fri Jul 18 17:50:08 2025 +0800

    [CELEBORN-2068] TransportClientFactory should close channel explicitly to 
avoid resource leak for timeout or failure
    
    ### What changes were proposed in this pull request?
    
    `TransportClientFactory` should close channel explicitly to avoid resource 
leak for timeout or failure.
    
    ### Why are the changes needed?
    
    There is resource leak risk for timeout or failure in 
`TransportClientFactory#internalCreateClient`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manual test.
    
    Closes #3368 from SteNicholas/CELEBORN-2068.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit cf3c05d6683eb1f96895478ba3e9c2668f3aaca2)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../common/network/client/TransportClientFactory.java    | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 92ec61b70..fab9067bb 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -285,7 +285,6 @@ public class TransportClientFactory implements Closeable {
     }
 
     final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
-    final AtomicReference<Channel> channelRef = new AtomicReference<>();
 
     bootstrap.handler(
         new ChannelInitializer<SocketChannel>() {
@@ -293,7 +292,6 @@ public class TransportClientFactory implements Closeable {
           public void initChannel(SocketChannel ch) {
             TransportChannelHandler clientHandler = 
context.initializePipeline(ch, decoder, true);
             clientRef.set(clientHandler.getClient());
-            channelRef.set(ch);
           }
         });
 
@@ -309,9 +307,11 @@ public class TransportClientFactory implements Closeable {
         throw new IOException(String.format("Failed to connect to %s", 
address), cf.cause());
       }
     } else if (!cf.await(connectTimeoutMs)) {
+      closeChannel(cf);
       throw new CelebornIOException(
           String.format("Connecting to %s timed out (%s ms)", address, 
connectTimeoutMs));
     } else if (cf.cause() != null) {
+      closeChannel(cf);
       throw new CelebornIOException(String.format("Failed to connect to %s", 
address), cf.cause());
     }
     if (context.sslEncryptionEnabled()) {
@@ -331,12 +331,12 @@ public class TransportClientFactory implements Closeable {
                             "failed to complete TLS handshake to {}",
                             address,
                             handshakeFuture.cause());
-                        cf.channel().close();
+                        closeChannel(cf);
                       }
                     }
                   });
       if (!future.await(connectionTimeoutMs)) {
-        cf.channel().close();
+        closeChannel(cf);
         throw new IOException(
             String.format("Failed to connect to %s within connection timeout", 
address));
       }
@@ -395,4 +395,12 @@ public class TransportClientFactory implements Closeable {
   public TransportContext getContext() {
     return context;
   }
+
+  private void closeChannel(ChannelFuture channelFuture) {
+    try {
+      channelFuture.channel().close();
+    } catch (Exception e) {
+      logger.warn("Failed to close channel", e);
+    }
+  }
 }

Reply via email to