This is an automated email from the ASF dual-hosted git repository.
fchen pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new d1d9217a6 [CELEBORN-1097] Optimize the retrieval of configuration in
the internalCreateClient
d1d9217a6 is described below
commit d1d9217a6d5cbda12a0667f48f5b51422eb53de7
Author: xiyu.zk <[email protected]>
AuthorDate: Tue Oct 31 09:56:34 2023 +0800
[CELEBORN-1097] Optimize the retrieval of configuration in the
internalCreateClient
### What changes were proposed in this pull request?
Optimize the retrieval of configuration in the internalCreateClient
### Why are the changes needed?
Directly accessing configuration information through 'conf.xx' in
'internalCreateClient' is time-consuming.

### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #2055 from kerwin-zk/client-factory-conf.
Authored-by: xiyu.zk <[email protected]>
Signed-off-by: Fu Chen <[email protected]>
(cherry picked from commit cf194a5e3a5d17e8dce30e960a345f4d8c38a4ad)
Signed-off-by: Fu Chen <[email protected]>
---
.../flink/network/FlinkTransportClientFactory.java | 6 +----
.../network/client/TransportClientFactory.java | 29 ++++++++++++----------
2 files changed, 17 insertions(+), 18 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
index 5492c9505..e9e716ef3 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
@@ -42,6 +42,7 @@ public class FlinkTransportClientFactory extends
TransportClientFactory {
super(context);
bufferSuppliers = JavaUtils.newConcurrentHashMap();
this.fetchMaxRetries = fetchMaxRetries;
+ this.pooledAllocator = new UnpooledByteBufAllocator(true);
}
public TransportClient createClientWithRetry(String remoteHost, int
remotePort)
@@ -75,11 +76,6 @@ public class FlinkTransportClientFactory extends
TransportClientFactory {
remoteHost, remotePort, -1, new
TransportFrameDecoderWithBufferSupplier(bufferSuppliers));
}
- @Override
- protected void initializeMemoryAllocator() {
- this.pooledAllocator = new UnpooledByteBufAllocator(true);
- }
-
public void registerSupplier(long streamId, Supplier<ByteBuf> supplier) {
bufferSuppliers.put(streamId, supplier);
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 1f26240e6..ba1a0dde9 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -68,7 +68,6 @@ public class TransportClientFactory implements Closeable {
private static final Logger logger =
LoggerFactory.getLogger(TransportClientFactory.class);
private final TransportContext context;
- private final TransportConf conf;
private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;
/** Random number generator for picking connections between peers. */
@@ -76,15 +75,23 @@ public class TransportClientFactory implements Closeable {
private final int numConnectionsPerPeer;
+ private final int connectTimeoutMs;
+
+ private final int receiveBuf;
+
+ private final int sendBuf;
private final Class<? extends Channel> socketChannelClass;
private EventLoopGroup workerGroup;
protected ByteBufAllocator pooledAllocator;
public TransportClientFactory(TransportContext context) {
this.context = Preconditions.checkNotNull(context);
- this.conf = context.getConf();
+ TransportConf conf = context.getConf();
this.connectionPool = JavaUtils.newConcurrentHashMap();
this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
+ this.connectTimeoutMs = conf.connectTimeoutMs();
+ this.receiveBuf = conf.receiveBuf();
+ this.sendBuf = conf.sendBuf();
this.rand = new Random();
IOMode ioMode = IOMode.valueOf(conf.ioMode());
@@ -92,10 +99,6 @@ public class TransportClientFactory implements Closeable {
logger.info("mode " + ioMode + " threads " + conf.clientThreads());
this.workerGroup =
NettyUtils.createEventLoop(ioMode, conf.clientThreads(),
conf.getModuleName() + "-client");
- initializeMemoryAllocator();
- }
-
- protected void initializeMemoryAllocator() {
this.pooledAllocator = NettyUtils.getPooledByteBufAllocator(conf, null,
false);
}
@@ -213,15 +216,15 @@ public class TransportClientFactory implements Closeable {
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectTimeoutMs())
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
.option(ChannelOption.ALLOCATOR, pooledAllocator);
- if (conf.receiveBuf() > 0) {
- bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
+ if (receiveBuf > 0) {
+ bootstrap.option(ChannelOption.SO_RCVBUF, receiveBuf);
}
- if (conf.sendBuf() > 0) {
- bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());
+ if (sendBuf > 0) {
+ bootstrap.option(ChannelOption.SO_SNDBUF, sendBuf);
}
final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
@@ -239,9 +242,9 @@ public class TransportClientFactory implements Closeable {
// Connect to the remote server
ChannelFuture cf = bootstrap.connect(address);
- if (!cf.await(conf.connectTimeoutMs())) {
+ if (!cf.await(connectTimeoutMs)) {
throw new CelebornIOException(
- String.format("Connecting to %s timed out (%s ms)", address,
conf.connectTimeoutMs()));
+ String.format("Connecting to %s timed out (%s ms)", address,
connectTimeoutMs));
} else if (cf.cause() != null) {
throw new CelebornIOException(String.format("Failed to connect to %s",
address), cf.cause());
}