This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 893085c9c [CELEBORN-1473] TransportClientFactory should register netty
memory metric with source for shared pooled ByteBuf allocator
893085c9c is described below
commit 893085c9c0c3fb78ffa379179a3e69748adf1cd0
Author: SteNicholas <[email protected]>
AuthorDate: Thu Aug 1 23:11:44 2024 +0800
[CELEBORN-1473] TransportClientFactory should register netty memory metric
with source for shared pooled ByteBuf allocator
### What changes were proposed in this pull request?
`TransportClientFactory` registers netty memory metric with source of
`TransportContext` for shared pooled ByteBuf allocator.
Address
https://github.com/apache/celeborn/pull/2585#issuecomment-2183720137.
### Why are the changes needed?
The default value of `celeborn.network.memory.allocator.share` is true,
which means that enables shared memory allocator at default. Meanwhile,
`RpcEnv#create` does not create `TransportClientFactory` with source, which
cause lack of `NettyMemoryMetric` for `TransportClientFactory`.
Therefore, `TransportClientFactory` should also register
`NettyMemoryMetric` with `WorkerSource` of `TransportContext` for netty memory
metrics of shared pooled memory.

### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `celeborn.network.memory.allocator.allowCache=false`
```
$ curl http://celeborn-worker:9096/metrics|grep metrics_shared_pool_used
metrics_shared_pool_1_usedHeapMemory_Value{hostName="celeborn-worker",role="Worker"}
29360128 1722505389446
metrics_shared_pool_1_usedDirectMemory_Value{hostName="celeborn-worker",role="Worker"}
188743680 1722505389446
```
- `celeborn.network.memory.allocator.allowCache=true`
```
$ curl http://celeborn-worker:9096/metrics|grep metrics_shared_pool_used
metrics_shared_pool_1_usedHeapMemory_Value{hostName="celeborn-worker",role="Worker"}
8388608 1722505646795
metrics_shared_pool_1_usedDirectMemory_Value{hostName="celeborn-worker",role="Worker"}
8388608 1722505646795
metrics_shared_pool_0_usedHeapMemory_Value{hostName="celeborn-worker",role="Worker"}
281018368 1722505646795
metrics_shared_pool_0_usedDirectMemory_Value{hostName="celeborn-worker",role="Worker"}
1438646272 1722505646796
```
Closes #2652 from SteNicholas/CELEBORN-1473.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../apache/celeborn/common/network/TransportContext.java | 8 ++++++--
.../common/network/client/TransportClientFactory.java | 3 ++-
.../celeborn/common/network/server/TransportServer.java | 6 ++----
.../org/apache/celeborn/common/network/util/NettyUtils.java | 2 +-
.../main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala | 10 +++++++---
.../org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala | 7 ++++++-
.../apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala | 3 +++
.../org/apache/celeborn/service/deploy/worker/Worker.scala | 13 ++++++++-----
8 files changed, 35 insertions(+), 17 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
index 869b3fafd..fe6cd5c77 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
@@ -130,12 +130,12 @@ public class TransportContext implements Closeable {
/** Create a server which will attempt to bind to a specific host and port.
*/
public TransportServer createServer(String host, int port) {
- return new TransportServer(this, host, port, source, msgHandler,
Collections.emptyList());
+ return new TransportServer(this, host, port, Collections.emptyList());
}
public TransportServer createServer(
String host, int port, List<TransportServerBootstrap> bootstraps) {
- return new TransportServer(this, host, port, source, msgHandler,
bootstraps);
+ return new TransportServer(this, host, port, bootstraps);
}
public TransportServer createServer(List<TransportServerBootstrap>
bootstraps) {
@@ -241,6 +241,10 @@ public class TransportContext implements Closeable {
return msgHandler;
}
+ public AbstractSource getSource() {
+ return source;
+ }
+
@Override
public void close() {
if (sslFactory != null) {
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 620715efa..8dd2380fe 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -112,7 +112,8 @@ public class TransportClientFactory implements Closeable {
this.workerGroup =
NettyUtils.createEventLoop(ioMode, conf.clientThreads(),
conf.getModuleName() + "-client");
this.pooledAllocator =
- NettyUtils.getPooledByteBufAllocator(conf, null, false,
conf.clientThreads());
+ NettyUtils.getPooledByteBufAllocator(
+ conf, context.getSource(), false, conf.clientThreads());
}
/**
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
index f2881cda1..672a450bc 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
@@ -58,13 +58,11 @@ public class TransportServer implements Closeable {
TransportContext context,
String hostToBind,
int portToBind,
- AbstractSource source,
- BaseMessageHandler appMessageHandler,
List<TransportServerBootstrap> bootstraps) {
this.context = context;
this.conf = context.getConf();
- this.source = source;
- this.appMessageHandler = Preconditions.checkNotNull(appMessageHandler);
+ this.source = context.getSource();
+ this.appMessageHandler =
Preconditions.checkNotNull(context.getMsgHandler());
this.bootstraps =
Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
boolean shouldClose = true;
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
index 00c970eb3..0a7641247 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
@@ -132,7 +132,7 @@ public class NettyUtils {
if (source != null) {
new NettyMemoryMetrics(
_sharedPooledByteBufAllocator[index],
- "shared-pool",
+ "shared-pool-" + index,
conf.networkAllocatorVerboseMetric(),
source,
Collections.emptyMap());
diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
index 0e7573dc0..f8c51d9e1 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
@@ -22,6 +22,7 @@ import java.io.File
import scala.concurrent.Future
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.metrics.source.AbstractSource
import org.apache.celeborn.common.protocol.TransportModuleConstants
import org.apache.celeborn.common.rpc.netty.NettyRpcEnvFactory
@@ -49,7 +50,8 @@ object RpcEnv {
port: Int,
conf: CelebornConf,
numUsableCores: Int,
- securityContext: Option[RpcSecurityContext] = None): RpcEnv = {
+ securityContext: Option[RpcSecurityContext] = None,
+ source: Option[AbstractSource] = None): RpcEnv = {
val config =
RpcEnvConfig(
conf,
@@ -59,7 +61,8 @@ object RpcEnv {
advertiseAddress,
port,
numUsableCores,
- securityContext)
+ securityContext,
+ source)
new NettyRpcEnvFactory().create(config)
}
}
@@ -192,7 +195,8 @@ private[celeborn] case class RpcEnvConfig(
advertiseAddress: String,
port: Int,
numUsableCores: Int,
- securityContext: Option[RpcSecurityContext]) {
+ securityContext: Option[RpcSecurityContext],
+ source: Option[AbstractSource]) {
assert(RpcEnvConfig.VALID_TRANSPORT_MODULES.contains(transportModule))
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index 8b19ca931..67732620d 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -61,7 +61,12 @@ class NettyRpcEnv(
// Visible for tests
private[netty] val transportContext =
- new TransportContext(transportConf, new NettyRpcHandler(dispatcher, this))
+ new TransportContext(
+ transportConf,
+ new NettyRpcHandler(dispatcher, this),
+ false,
+ false,
+ config.source.orNull)
private def createClientBootstraps():
java.util.List[TransportClientBootstrap] = {
val bootstrapOpt = securityContext.flatMap(_.clientSaslContext.map {
clientSaslContext =>
diff --git
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
index e2b1f0ea9..a7b1fbfa4 100644
---
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
@@ -50,6 +50,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
"localhost",
port,
0,
+ None,
None)
new NettyRpcEnvFactory().create(config)
}
@@ -73,6 +74,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
"example.com",
0,
0,
+ None,
None)
val env = new NettyRpcEnvFactory().create(config)
try {
@@ -122,6 +124,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
"localhost",
0,
numUsableCores,
+ None,
None)
val anotherEnv = new NettyRpcEnvFactory().create(config)
anotherEnv.setupEndpoint(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index ae15dbc73..d8205aa6f 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -25,7 +25,6 @@ import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
import scala.collection.JavaConverters._
-import scala.util.Random
import com.google.common.annotations.VisibleForTesting
import io.netty.util.HashedWheelTimer
@@ -60,7 +59,6 @@ import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingSta
import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake
import org.apache.celeborn.service.deploy.worker.profiler.JVMProfiler
import
org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter,
StorageManager}
-import org.apache.celeborn.service.deploy.worker.storage.StoragePolicy
private[celeborn] class Worker(
override val conf: CelebornConf,
@@ -101,7 +99,9 @@ private[celeborn] class Worker(
workerArgs.host,
workerArgs.port,
conf,
- Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())))
+ Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())),
+ None,
+ Some(workerSource))
} else {
val externalSecurityContext = new RpcSecurityContextBuilder()
.withServerSaslContext(
@@ -118,7 +118,8 @@ private[celeborn] class Worker(
workerArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
- Some(externalSecurityContext))
+ Some(externalSecurityContext),
+ Some(workerSource))
}
private[worker] var internalRpcEnvInUse =
@@ -132,7 +133,9 @@ private[celeborn] class Worker(
workerArgs.host,
workerArgs.internalPort,
conf,
- Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())))
+ Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())),
+ None,
+ Some(workerSource))
}
private val host = rpcEnv.address.host