This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 893085c9c [CELEBORN-1473] TransportClientFactory should register netty 
memory metric with source for shared pooled ByteBuf allocator
893085c9c is described below

commit 893085c9c0c3fb78ffa379179a3e69748adf1cd0
Author: SteNicholas <[email protected]>
AuthorDate: Thu Aug 1 23:11:44 2024 +0800

    [CELEBORN-1473] TransportClientFactory should register netty memory metric 
with source for shared pooled ByteBuf allocator
    
    ### What changes were proposed in this pull request?
    
    `TransportClientFactory` registers netty memory metric with source of 
`TransportContext` for shared pooled ByteBuf allocator.
    
    Address 
https://github.com/apache/celeborn/pull/2585#issuecomment-2183720137.
    
    ### Why are the changes needed?
    
    The default value of `celeborn.network.memory.allocator.share` is true, 
which means that enables shared memory allocator at default. Meanwhile, 
`RpcEnv#create` does not create `TransportClientFactory` with source, which 
cause lack of `NettyMemoryMetric` for `TransportClientFactory`.
    
    Therefore, `TransportClientFactory` should also register 
`NettyMemoryMetric` with `WorkerSource` of `TransportContext` for netty memory 
metrics of shared pooled memory.
    
    
![Untitled-2024-06-23-0500](https://github.com/user-attachments/assets/e527be5f-d155-4129-ad49-31a351616b58)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - `celeborn.network.memory.allocator.allowCache=false`
    ```
    $ curl http://celeborn-worker:9096/metrics|grep metrics_shared_pool_used
    
metrics_shared_pool_1_usedHeapMemory_Value{hostName="celeborn-worker",role="Worker"}
 29360128 1722505389446
    
metrics_shared_pool_1_usedDirectMemory_Value{hostName="celeborn-worker",role="Worker"}
 188743680 1722505389446
    ```
    
    - `celeborn.network.memory.allocator.allowCache=true`
    ```
    $ curl http://celeborn-worker:9096/metrics|grep metrics_shared_pool_used
    
metrics_shared_pool_1_usedHeapMemory_Value{hostName="celeborn-worker",role="Worker"}
 8388608 1722505646795
    
metrics_shared_pool_1_usedDirectMemory_Value{hostName="celeborn-worker",role="Worker"}
 8388608 1722505646795
    
metrics_shared_pool_0_usedHeapMemory_Value{hostName="celeborn-worker",role="Worker"}
 281018368 1722505646795
    
metrics_shared_pool_0_usedDirectMemory_Value{hostName="celeborn-worker",role="Worker"}
 1438646272 1722505646796
    ```
    
    Closes #2652 from SteNicholas/CELEBORN-1473.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../apache/celeborn/common/network/TransportContext.java    |  8 ++++++--
 .../common/network/client/TransportClientFactory.java       |  3 ++-
 .../celeborn/common/network/server/TransportServer.java     |  6 ++----
 .../org/apache/celeborn/common/network/util/NettyUtils.java |  2 +-
 .../main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala  | 10 +++++++---
 .../org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala  |  7 ++++++-
 .../apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala |  3 +++
 .../org/apache/celeborn/service/deploy/worker/Worker.scala  | 13 ++++++++-----
 8 files changed, 35 insertions(+), 17 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java 
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
index 869b3fafd..fe6cd5c77 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
@@ -130,12 +130,12 @@ public class TransportContext implements Closeable {
 
   /** Create a server which will attempt to bind to a specific host and port. 
*/
   public TransportServer createServer(String host, int port) {
-    return new TransportServer(this, host, port, source, msgHandler, 
Collections.emptyList());
+    return new TransportServer(this, host, port, Collections.emptyList());
   }
 
   public TransportServer createServer(
       String host, int port, List<TransportServerBootstrap> bootstraps) {
-    return new TransportServer(this, host, port, source, msgHandler, 
bootstraps);
+    return new TransportServer(this, host, port, bootstraps);
   }
 
   public TransportServer createServer(List<TransportServerBootstrap> 
bootstraps) {
@@ -241,6 +241,10 @@ public class TransportContext implements Closeable {
     return msgHandler;
   }
 
+  public AbstractSource getSource() {
+    return source;
+  }
+
   @Override
   public void close() {
     if (sslFactory != null) {
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 620715efa..8dd2380fe 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -112,7 +112,8 @@ public class TransportClientFactory implements Closeable {
     this.workerGroup =
         NettyUtils.createEventLoop(ioMode, conf.clientThreads(), 
conf.getModuleName() + "-client");
     this.pooledAllocator =
-        NettyUtils.getPooledByteBufAllocator(conf, null, false, 
conf.clientThreads());
+        NettyUtils.getPooledByteBufAllocator(
+            conf, context.getSource(), false, conf.clientThreads());
   }
 
   /**
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
index f2881cda1..672a450bc 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
@@ -58,13 +58,11 @@ public class TransportServer implements Closeable {
       TransportContext context,
       String hostToBind,
       int portToBind,
-      AbstractSource source,
-      BaseMessageHandler appMessageHandler,
       List<TransportServerBootstrap> bootstraps) {
     this.context = context;
     this.conf = context.getConf();
-    this.source = source;
-    this.appMessageHandler = Preconditions.checkNotNull(appMessageHandler);
+    this.source = context.getSource();
+    this.appMessageHandler = 
Preconditions.checkNotNull(context.getMsgHandler());
     this.bootstraps = 
Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
 
     boolean shouldClose = true;
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
index 00c970eb3..0a7641247 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
@@ -132,7 +132,7 @@ public class NettyUtils {
       if (source != null) {
         new NettyMemoryMetrics(
             _sharedPooledByteBufAllocator[index],
-            "shared-pool",
+            "shared-pool-" + index,
             conf.networkAllocatorVerboseMetric(),
             source,
             Collections.emptyMap());
diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
index 0e7573dc0..f8c51d9e1 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
@@ -22,6 +22,7 @@ import java.io.File
 import scala.concurrent.Future
 
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.metrics.source.AbstractSource
 import org.apache.celeborn.common.protocol.TransportModuleConstants
 import org.apache.celeborn.common.rpc.netty.NettyRpcEnvFactory
 
@@ -49,7 +50,8 @@ object RpcEnv {
       port: Int,
       conf: CelebornConf,
       numUsableCores: Int,
-      securityContext: Option[RpcSecurityContext] = None): RpcEnv = {
+      securityContext: Option[RpcSecurityContext] = None,
+      source: Option[AbstractSource] = None): RpcEnv = {
     val config =
       RpcEnvConfig(
         conf,
@@ -59,7 +61,8 @@ object RpcEnv {
         advertiseAddress,
         port,
         numUsableCores,
-        securityContext)
+        securityContext,
+        source)
     new NettyRpcEnvFactory().create(config)
   }
 }
@@ -192,7 +195,8 @@ private[celeborn] case class RpcEnvConfig(
     advertiseAddress: String,
     port: Int,
     numUsableCores: Int,
-    securityContext: Option[RpcSecurityContext]) {
+    securityContext: Option[RpcSecurityContext],
+    source: Option[AbstractSource]) {
   assert(RpcEnvConfig.VALID_TRANSPORT_MODULES.contains(transportModule))
 }
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index 8b19ca931..67732620d 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -61,7 +61,12 @@ class NettyRpcEnv(
 
   // Visible for tests
   private[netty] val transportContext =
-    new TransportContext(transportConf, new NettyRpcHandler(dispatcher, this))
+    new TransportContext(
+      transportConf,
+      new NettyRpcHandler(dispatcher, this),
+      false,
+      false,
+      config.source.orNull)
 
   private def createClientBootstraps(): 
java.util.List[TransportClientBootstrap] = {
     val bootstrapOpt = securityContext.flatMap(_.clientSaslContext.map { 
clientSaslContext =>
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
 
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
index e2b1f0ea9..a7b1fbfa4 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
@@ -50,6 +50,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
       "localhost",
       port,
       0,
+      None,
       None)
     new NettyRpcEnvFactory().create(config)
   }
@@ -73,6 +74,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
       "example.com",
       0,
       0,
+      None,
       None)
     val env = new NettyRpcEnvFactory().create(config)
     try {
@@ -122,6 +124,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
       "localhost",
       0,
       numUsableCores,
+      None,
       None)
     val anotherEnv = new NettyRpcEnvFactory().create(config)
     anotherEnv.setupEndpoint(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index ae15dbc73..d8205aa6f 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -25,7 +25,6 @@ import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
 
 import scala.collection.JavaConverters._
-import scala.util.Random
 
 import com.google.common.annotations.VisibleForTesting
 import io.netty.util.HashedWheelTimer
@@ -60,7 +59,6 @@ import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingSta
 import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake
 import org.apache.celeborn.service.deploy.worker.profiler.JVMProfiler
 import 
org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, 
StorageManager}
-import org.apache.celeborn.service.deploy.worker.storage.StoragePolicy
 
 private[celeborn] class Worker(
     override val conf: CelebornConf,
@@ -101,7 +99,9 @@ private[celeborn] class Worker(
         workerArgs.host,
         workerArgs.port,
         conf,
-        Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())))
+        Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())),
+        None,
+        Some(workerSource))
     } else {
       val externalSecurityContext = new RpcSecurityContextBuilder()
         .withServerSaslContext(
@@ -118,7 +118,8 @@ private[celeborn] class Worker(
         workerArgs.port,
         conf,
         Math.max(64, Runtime.getRuntime.availableProcessors()),
-        Some(externalSecurityContext))
+        Some(externalSecurityContext),
+        Some(workerSource))
     }
 
   private[worker] var internalRpcEnvInUse =
@@ -132,7 +133,9 @@ private[celeborn] class Worker(
         workerArgs.host,
         workerArgs.internalPort,
         conf,
-        Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())))
+        Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())),
+        None,
+        Some(workerSource))
     }
 
   private val host = rpcEnv.address.host

Reply via email to