This is an automated email from the ASF dual-hosted git repository.

kerwinzhang pushed a commit to branch 0.3.1-speed
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit e251d9475f4a7cec5d88b588bf1751b861fba7b5
Author: xiyu.zk <[email protected]>
AuthorDate: Fri Oct 27 11:11:14 2023 +0800

    test
---
 .../network/client/TransportClientFactory.java     | 22 +++++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 1f26240e6..afebaa8fc 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -76,6 +76,11 @@ public class TransportClientFactory implements Closeable {
 
   private final int numConnectionsPerPeer;
 
+  private final int connectTimeoutMs;
+
+  private final int receiveBuf;
+
+  private final int sendBuf;
   private final Class<? extends Channel> socketChannelClass;
   private EventLoopGroup workerGroup;
   protected ByteBufAllocator pooledAllocator;
@@ -85,6 +90,9 @@ public class TransportClientFactory implements Closeable {
     this.conf = context.getConf();
     this.connectionPool = JavaUtils.newConcurrentHashMap();
     this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
+    this.connectTimeoutMs = conf.connectTimeoutMs();
+    this.receiveBuf = conf.receiveBuf();
+    this.sendBuf = conf.sendBuf();
     this.rand = new Random();
 
     IOMode ioMode = IOMode.valueOf(conf.ioMode());
@@ -213,15 +221,15 @@ public class TransportClientFactory implements Closeable {
         // Disable Nagle's Algorithm since we don't want packets to wait
         .option(ChannelOption.TCP_NODELAY, true)
         .option(ChannelOption.SO_KEEPALIVE, true)
-        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectTimeoutMs())
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
         .option(ChannelOption.ALLOCATOR, pooledAllocator);
 
-    if (conf.receiveBuf() > 0) {
-      bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
+    if (receiveBuf > 0) {
+      bootstrap.option(ChannelOption.SO_RCVBUF, receiveBuf);
     }
 
-    if (conf.sendBuf() > 0) {
-      bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());
+    if (sendBuf > 0) {
+      bootstrap.option(ChannelOption.SO_SNDBUF, sendBuf);
     }
 
     final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
@@ -239,9 +247,9 @@ public class TransportClientFactory implements Closeable {
 
     // Connect to the remote server
     ChannelFuture cf = bootstrap.connect(address);
-    if (!cf.await(conf.connectTimeoutMs())) {
+    if (!cf.await(connectTimeoutMs)) {
       throw new CelebornIOException(
-          String.format("Connecting to %s timed out (%s ms)", address, 
conf.connectTimeoutMs()));
+          String.format("Connecting to %s timed out (%s ms)", address, 
connectTimeoutMs));
     } else if (cf.cause() != null) {
       throw new CelebornIOException(String.format("Failed to connect to %s", 
address), cf.cause());
     }

Reply via email to