This is an automated email from the ASF dual-hosted git repository. kerwinzhang pushed a commit to branch 0.3.1-speed in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit e251d9475f4a7cec5d88b588bf1751b861fba7b5 Author: xiyu.zk <[email protected]> AuthorDate: Fri Oct 27 11:11:14 2023 +0800 test --- .../network/client/TransportClientFactory.java | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java index 1f26240e6..afebaa8fc 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java +++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java @@ -76,6 +76,11 @@ public class TransportClientFactory implements Closeable { private final int numConnectionsPerPeer; + private final int connectTimeoutMs; + + private final int receiveBuf; + + private final int sendBuf; private final Class<? extends Channel> socketChannelClass; private EventLoopGroup workerGroup; protected ByteBufAllocator pooledAllocator; @@ -85,6 +90,9 @@ public class TransportClientFactory implements Closeable { this.conf = context.getConf(); this.connectionPool = JavaUtils.newConcurrentHashMap(); this.numConnectionsPerPeer = conf.numConnectionsPerPeer(); + this.connectTimeoutMs = conf.connectTimeoutMs(); + this.receiveBuf = conf.receiveBuf(); + this.sendBuf = conf.sendBuf(); this.rand = new Random(); IOMode ioMode = IOMode.valueOf(conf.ioMode()); @@ -213,15 +221,15 @@ public class TransportClientFactory implements Closeable { // Disable Nagle's Algorithm since we don't want packets to wait .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectTimeoutMs()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs) .option(ChannelOption.ALLOCATOR, pooledAllocator); - if (conf.receiveBuf() > 0) { - bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf()); + if (receiveBuf > 0) { + bootstrap.option(ChannelOption.SO_RCVBUF, receiveBuf); } - if (conf.sendBuf() > 0) { - bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf()); + if (sendBuf > 0) { + bootstrap.option(ChannelOption.SO_SNDBUF, sendBuf); } final AtomicReference<TransportClient> clientRef = new AtomicReference<>(); @@ -239,9 +247,9 @@ public class TransportClientFactory implements Closeable { // Connect to the remote server ChannelFuture cf = bootstrap.connect(address); - if (!cf.await(conf.connectTimeoutMs())) { + if (!cf.await(connectTimeoutMs)) { throw new CelebornIOException( - String.format("Connecting to %s timed out (%s ms)", address, conf.connectTimeoutMs())); + String.format("Connecting to %s timed out (%s ms)", address, connectTimeoutMs)); } else if (cf.cause() != null) { throw new CelebornIOException(String.format("Failed to connect to %s", address), cf.cause()); }
