This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new ee8c6d6ce [CELEBORN-750] Simplify get Netty used direct memory logic
ee8c6d6ce is described below
commit ee8c6d6ce47450d6dc9fb8f4f12bb62a5508352c
Author: Fu Chen <[email protected]>
AuthorDate: Thu Jun 29 18:24:16 2023 +0800
[CELEBORN-750] Simplify get Netty used direct memory logic
### What changes were proposed in this pull request?
netty has exposed the public API `PlatformDependent.usedDirectMemory()` to
get netty used direct memory since
[netty-4.1.35.Final](https://github.com/netty/netty/pull/8945), simplifies the
logic
### Why are the changes needed?
simplifies the get netty used direct memory logic
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #1662 from cfmcgrady/netty-used-memory.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit baa0d0b3b4d41701f1573c1a6b156008e578b838)
Signed-off-by: mingji <[email protected]>
---
.../deploy/worker/memory/MemoryManager.java | 31 +++++-----------------
.../celeborn/service/deploy/worker/Worker.scala | 2 +-
2 files changed, 7 insertions(+), 26 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index 0509a0b0a..2e0d914bf 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -17,7 +17,6 @@
package org.apache.celeborn.service.deploy.worker.memory;
-import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -65,7 +64,6 @@ public class MemoryManager {
private AtomicBoolean trimInProcess = new AtomicBoolean(false);
- private AtomicLong nettyMemoryCounter = null;
private final AtomicLong sortMemoryCounter = new AtomicLong(0);
private final AtomicLong diskBufferCounter = new AtomicLong(0);
private final LongAdder pausePushDataCounter = new LongAdder();
@@ -154,8 +152,6 @@ public class MemoryManager {
readBufferTarget = (long) (readBufferThreshold * readBufferTargetRatio);
memoryShuffleStorageThreshold = (long) (maxDirectorMemory *
shuffleStorageRatio);
- initDirectMemoryIndicator();
-
checkService.scheduleWithFixedDelay(
() -> {
try {
@@ -207,7 +203,7 @@ public class MemoryManager {
() ->
logger.info(
"Direct memory usage: {}/{}, disk buffer size: {}, sort memory
size: {}, read buffer size: {}",
- Utils.bytesToString(nettyMemoryCounter.get()),
+ Utils.bytesToString(getNettyUsedDirectMemory()),
Utils.bytesToString(maxDirectorMemory),
Utils.bytesToString(diskBufferCounter.get()),
Utils.bytesToString(sortMemoryCounter.get()),
@@ -268,23 +264,6 @@ public class MemoryManager {
Utils.bytesToString(memoryShuffleStorageThreshold));
}
- private void initDirectMemoryIndicator() {
- try {
- Field field = null;
- Field[] result = PlatformDependent.class.getDeclaredFields();
- for (Field tf : result) {
- if ("DIRECT_MEMORY_COUNTER".equals(tf.getName())) {
- field = tf;
- }
- }
- field.setAccessible(true);
- nettyMemoryCounter = ((AtomicLong) field.get(PlatformDependent.class));
- } catch (Exception e) {
- logger.error("Fatal error, get netty_direct_memory failed, worker should
stop", e);
- System.exit(-1);
- }
- }
-
public MemoryManagerStat currentMemoryAction() {
long memoryUsage = getMemoryUsage();
boolean pausePushData = memoryUsage > pausePushDataThreshold;
@@ -350,12 +329,14 @@ public class MemoryManager {
diskBufferCounter.addAndGet(size * -1);
}
- public AtomicLong getNettyMemoryCounter() {
- return nettyMemoryCounter;
+ public long getNettyUsedDirectMemory() {
+ long usedDirectMemory = PlatformDependent.usedDirectMemory();
+ assert usedDirectMemory != -1;
+ return usedDirectMemory;
}
public long getMemoryUsage() {
- return nettyMemoryCounter.get() + sortMemoryCounter.get();
+ return getNettyUsedDirectMemory() + sortMemoryCounter.get();
}
public AtomicLong getSortMemoryCounter() {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 550adb05a..9d1d9f1dc 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -254,7 +254,7 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.SortedFiles, _ =>
partitionsSorter.getSortedCount)
workerSource.addGauge(WorkerSource.SortedFileSize, _ =>
partitionsSorter.getSortedSize)
workerSource.addGauge(WorkerSource.DiskBuffer, _ =>
memoryManager.getDiskBufferCounter.get())
- workerSource.addGauge(WorkerSource.NettyMemory, _ =>
memoryManager.getNettyMemoryCounter.get())
+ workerSource.addGauge(WorkerSource.NettyMemory, _ =>
memoryManager.getNettyUsedDirectMemory())
workerSource.addGauge(WorkerSource.PausePushDataCount, _ =>
memoryManager.getPausePushDataCounter)
workerSource.addGauge(
WorkerSource.PausePushDataAndReplicateCount,