This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new ee8c6d6ce [CELEBORN-750] Simplify get Netty used direct memory logic
ee8c6d6ce is described below

commit ee8c6d6ce47450d6dc9fb8f4f12bb62a5508352c
Author: Fu Chen <[email protected]>
AuthorDate: Thu Jun 29 18:24:16 2023 +0800

    [CELEBORN-750] Simplify get Netty used direct memory logic
    
    ### What changes were proposed in this pull request?
    
    netty has exposed the public API `PlatformDependent.usedDirectMemory()` to 
get netty used direct memory since 
[netty-4.1.35.Final](https://github.com/netty/netty/pull/8945), simplifies the 
logic
    
    ### Why are the changes needed?
    
    simplifies the get netty used direct memory logic
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #1662 from cfmcgrady/netty-used-memory.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit baa0d0b3b4d41701f1573c1a6b156008e578b838)
    Signed-off-by: mingji <[email protected]>
---
 .../deploy/worker/memory/MemoryManager.java        | 31 +++++-----------------
 .../celeborn/service/deploy/worker/Worker.scala    |  2 +-
 2 files changed, 7 insertions(+), 26 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index 0509a0b0a..2e0d914bf 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.celeborn.service.deploy.worker.memory;
 
-import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -65,7 +64,6 @@ public class MemoryManager {
 
   private AtomicBoolean trimInProcess = new AtomicBoolean(false);
 
-  private AtomicLong nettyMemoryCounter = null;
   private final AtomicLong sortMemoryCounter = new AtomicLong(0);
   private final AtomicLong diskBufferCounter = new AtomicLong(0);
   private final LongAdder pausePushDataCounter = new LongAdder();
@@ -154,8 +152,6 @@ public class MemoryManager {
     readBufferTarget = (long) (readBufferThreshold * readBufferTargetRatio);
     memoryShuffleStorageThreshold = (long) (maxDirectorMemory * 
shuffleStorageRatio);
 
-    initDirectMemoryIndicator();
-
     checkService.scheduleWithFixedDelay(
         () -> {
           try {
@@ -207,7 +203,7 @@ public class MemoryManager {
         () ->
             logger.info(
                 "Direct memory usage: {}/{}, disk buffer size: {}, sort memory 
size: {}, read buffer size: {}",
-                Utils.bytesToString(nettyMemoryCounter.get()),
+                Utils.bytesToString(getNettyUsedDirectMemory()),
                 Utils.bytesToString(maxDirectorMemory),
                 Utils.bytesToString(diskBufferCounter.get()),
                 Utils.bytesToString(sortMemoryCounter.get()),
@@ -268,23 +264,6 @@ public class MemoryManager {
         Utils.bytesToString(memoryShuffleStorageThreshold));
   }
 
-  private void initDirectMemoryIndicator() {
-    try {
-      Field field = null;
-      Field[] result = PlatformDependent.class.getDeclaredFields();
-      for (Field tf : result) {
-        if ("DIRECT_MEMORY_COUNTER".equals(tf.getName())) {
-          field = tf;
-        }
-      }
-      field.setAccessible(true);
-      nettyMemoryCounter = ((AtomicLong) field.get(PlatformDependent.class));
-    } catch (Exception e) {
-      logger.error("Fatal error, get netty_direct_memory failed, worker should 
stop", e);
-      System.exit(-1);
-    }
-  }
-
   public MemoryManagerStat currentMemoryAction() {
     long memoryUsage = getMemoryUsage();
     boolean pausePushData = memoryUsage > pausePushDataThreshold;
@@ -350,12 +329,14 @@ public class MemoryManager {
     diskBufferCounter.addAndGet(size * -1);
   }
 
-  public AtomicLong getNettyMemoryCounter() {
-    return nettyMemoryCounter;
+  public long getNettyUsedDirectMemory() {
+    long usedDirectMemory = PlatformDependent.usedDirectMemory();
+    assert usedDirectMemory != -1;
+    return usedDirectMemory;
   }
 
   public long getMemoryUsage() {
-    return nettyMemoryCounter.get() + sortMemoryCounter.get();
+    return getNettyUsedDirectMemory() + sortMemoryCounter.get();
   }
 
   public AtomicLong getSortMemoryCounter() {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 550adb05a..9d1d9f1dc 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -254,7 +254,7 @@ private[celeborn] class Worker(
   workerSource.addGauge(WorkerSource.SortedFiles, _ => 
partitionsSorter.getSortedCount)
   workerSource.addGauge(WorkerSource.SortedFileSize, _ => 
partitionsSorter.getSortedSize)
   workerSource.addGauge(WorkerSource.DiskBuffer, _ => 
memoryManager.getDiskBufferCounter.get())
-  workerSource.addGauge(WorkerSource.NettyMemory, _ => 
memoryManager.getNettyMemoryCounter.get())
+  workerSource.addGauge(WorkerSource.NettyMemory, _ => 
memoryManager.getNettyUsedDirectMemory())
   workerSource.addGauge(WorkerSource.PausePushDataCount, _ => 
memoryManager.getPausePushDataCounter)
   workerSource.addGauge(
     WorkerSource.PausePushDataAndReplicateCount,

Reply via email to