This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 38323a376 [CELEBORN-2068] TransportClientFactory should close channel
explicitly to avoid resource leak for timeout or failure
38323a376 is described below
commit 38323a376efe4b2a22589fa0273ef99b4526ef0c
Author: SteNicholas <[email protected]>
AuthorDate: Fri Jul 18 17:50:08 2025 +0800
[CELEBORN-2068] TransportClientFactory should close channel explicitly to
avoid resource leak for timeout or failure
### What changes were proposed in this pull request?
`TransportClientFactory` should close channel explicitly to avoid resource
leak for timeout or failure.
### Why are the changes needed?
There is resource leak risk for timeout or failure in
`TransportClientFactory#internalCreateClient`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes #3368 from SteNicholas/CELEBORN-2068.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit cf3c05d6683eb1f96895478ba3e9c2668f3aaca2)
Signed-off-by: Wang, Fei <[email protected]>
---
.../common/network/client/TransportClientFactory.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 92ec61b70..fab9067bb 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -285,7 +285,6 @@ public class TransportClientFactory implements Closeable {
}
final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
- final AtomicReference<Channel> channelRef = new AtomicReference<>();
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@@ -293,7 +292,6 @@ public class TransportClientFactory implements Closeable {
public void initChannel(SocketChannel ch) {
TransportChannelHandler clientHandler =
context.initializePipeline(ch, decoder, true);
clientRef.set(clientHandler.getClient());
- channelRef.set(ch);
}
});
@@ -309,9 +307,11 @@ public class TransportClientFactory implements Closeable {
throw new IOException(String.format("Failed to connect to %s",
address), cf.cause());
}
} else if (!cf.await(connectTimeoutMs)) {
+ closeChannel(cf);
throw new CelebornIOException(
String.format("Connecting to %s timed out (%s ms)", address,
connectTimeoutMs));
} else if (cf.cause() != null) {
+ closeChannel(cf);
throw new CelebornIOException(String.format("Failed to connect to %s",
address), cf.cause());
}
if (context.sslEncryptionEnabled()) {
@@ -331,12 +331,12 @@ public class TransportClientFactory implements Closeable {
"failed to complete TLS handshake to {}",
address,
handshakeFuture.cause());
- cf.channel().close();
+ closeChannel(cf);
}
}
});
if (!future.await(connectionTimeoutMs)) {
- cf.channel().close();
+ closeChannel(cf);
throw new IOException(
String.format("Failed to connect to %s within connection timeout",
address));
}
@@ -395,4 +395,12 @@ public class TransportClientFactory implements Closeable {
public TransportContext getContext() {
return context;
}
+
+ private void closeChannel(ChannelFuture channelFuture) {
+ try {
+ channelFuture.channel().close();
+ } catch (Exception e) {
+ logger.warn("Failed to close channel", e);
+ }
+ }
}