This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new d1df84edf [CELEBORN-914][FOLLOWUP] Add aggressive mode to evict memory
shuffle file
d1df84edf is described below
commit d1df84edf4ad97c6c9dada9d88021e0d8f89b7c5
Author: mingji <[email protected]>
AuthorDate: Mon Jul 15 10:27:30 2024 +0800
[CELEBORN-914][FOLLOWUP] Add aggressive mode to evict memory shuffle file
### What changes were proposed in this pull request?
Add an aggressive mode to evict memory shuffle files.
### Why are the changes needed?
To evict more shuffle files to reduce memory pressure.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes #2602 from FMX/b914-4.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 04c4b857a1b4e4676f1fa4fda4449cd92b1a2e17)
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 23 ++++++++++++++++++++++
docs/configuration/worker.md | 2 ++
.../deploy/worker/memory/MemoryManager.java | 14 +++++++++----
3 files changed, 35 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index e4e7b9026..a303833ce 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1206,6 +1206,10 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
get(WORKER_DIRECT_MEMORY_RATIO_FOR_MEMORY_FILE_STORAGE)
def workerMemoryFileStorageMaxFileSize: Long =
get(WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE)
+ def workerMemoryFileStorageEictAggressiveModeEnabled: Boolean =
+ get(WORKER_MEMORY_FILE_STORAGE_EVICT_AGGRESSIVE_MODE_ENABLED)
+ def workerMemoryFileStorageEvictRatio: Double =
+ get(WORKER_MEMORY_FILE_STORAGE_EVICT_RATIO)
// //////////////////////////////////////////////////////
// Rate Limit controller //
@@ -3268,6 +3272,25 @@ object CelebornConf extends Logging {
.checkValue(v => v < Int.MaxValue, "A single memory storage file can not
be larger than 2GB")
.createWithDefaultString("8MB")
+ val WORKER_MEMORY_FILE_STORAGE_EVICT_AGGRESSIVE_MODE_ENABLED:
ConfigEntry[Boolean] =
+ buildConf("celeborn.worker.memoryFileStorage.evict.aggressiveMode.enabled")
+ .categories("worker")
+ .doc(
+ "If this set to true, memory shuffle files will be evicted when worker
is in PAUSED state." +
+ " If the worker's offheap memory is not ample, set this to true " +
+ "and decrease
`celeborn.worker.directMemoryRatioForMemoryFileStorage` will be helpful.")
+ .version("0.5.1")
+ .booleanConf
+ .createWithDefault(false)
+
+ val WORKER_MEMORY_FILE_STORAGE_EVICT_RATIO: ConfigEntry[Double] =
+ buildConf("celeborn.worker.memoryFileStorage.evict.ratio")
+ .categories("worker")
+ .doc("If memory shuffle storage usage rate is above this config, the
memory storage shuffle files will evict to free memory.")
+ .version("0.5.1")
+ .doubleConf
+ .createWithDefault(0.5)
+
val WORKER_CONGESTION_CONTROL_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.congestionControl.enabled")
.categories("worker")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index ee6756131..0766e8a42 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -102,6 +102,8 @@ license: |
| celeborn.worker.jvmQuake.exitCode | 502 | false | The exit code of system
kill for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 | |
| celeborn.worker.jvmQuake.kill.threshold | 60s | false | The threshold of
system kill for the maximum GC 'deficit' which can be accumulated before
jvmquake takes action. | 0.4.0 | |
| celeborn.worker.jvmQuake.runtimeWeight | 5.0 | false | The factor by which
to multiply running JVM time, when weighing it against GCing time. 'Deficit' is
accumulated as `gc_time - runtime * runtime_weight`, and is compared against
threshold to determine whether to take action. | 0.4.0 | |
+| celeborn.worker.memoryFileStorage.evict.aggressiveMode.enabled | false |
false | If this set to true, memory shuffle files will be evicted when worker
is in PAUSED state. If the worker's offheap memory is not ample, set this to
true and decrease `celeborn.worker.directMemoryRatioForMemoryFileStorage` will
be helpful. | 0.5.1 | |
+| celeborn.worker.memoryFileStorage.evict.ratio | 0.5 | false | If memory
shuffle storage usage rate is above this config, the memory storage shuffle
files will evict to free memory. | 0.5.1 | |
| celeborn.worker.memoryFileStorage.maxFileSize | 8MB | false | Max size for a
memory storage file. It must be less than 2GB. | 0.5.0 | |
| celeborn.worker.monitor.disk.check.interval | 30s | false | Intervals
between device monitor to check disk. | 0.3.0 |
celeborn.worker.monitor.disk.checkInterval |
| celeborn.worker.monitor.disk.check.timeout | 30s | false | Timeout time for
worker check device status. | 0.3.0 | celeborn.worker.disk.check.timeout |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index ae25060c3..c8b6af9af 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -126,6 +126,8 @@ public class MemoryManager {
double readBufferTargetRatio = conf.readBufferTargetRatio();
long readBufferTargetUpdateInterval =
conf.readBufferTargetUpdateInterval();
long readBufferTargetNotifyThreshold =
conf.readBufferTargetNotifyThreshold();
+ boolean aggressiveEvictModeEnabled =
conf.workerMemoryFileStorageEictAggressiveModeEnabled();
+ double evictRatio = conf.workerMemoryFileStorageEvictRatio();
forceAppendPauseSpentTimeThreshold =
conf.metricsWorkerForceAppendPauseSpentTimeThreshold();
maxDirectMemory =
DynMethods.builder("maxDirectMemory")
@@ -232,8 +234,7 @@ public class MemoryManager {
memoryFileStorageService.scheduleWithFixedDelay(
() -> {
try {
- if ((memoryFileStorageCounter.sum() >= 0.5 *
memoryFileStorageThreshold)
- && currentServingState() != ServingState.NONE_PAUSED) {
+ if (shouldEvict(aggressiveEvictModeEnabled, evictRatio)) {
List<PartitionDataWriter> memoryWriters =
new ArrayList<>(storageManager.memoryWriters().values());
if (memoryWriters.isEmpty()) {
@@ -247,8 +248,7 @@ public class MemoryManager {
try {
for (PartitionDataWriter writer : memoryWriters) {
// this branch means that there is no memory pressure
- if ((memoryFileStorageCounter.sum() < 0.5 *
memoryFileStorageThreshold)
- || currentServingState() == ServingState.NONE_PAUSED) {
+ if (!shouldEvict(aggressiveEvictModeEnabled, evictRatio)) {
break;
}
logger.debug("Evict writer {}", writer);
@@ -283,6 +283,12 @@ public class MemoryManager {
resumeRatio);
}
+ public boolean shouldEvict(boolean aggressiveMemoryFileEvictEnabled, double
evictRatio) {
+ return currentServingState() != ServingState.NONE_PAUSED
+ && (aggressiveMemoryFileEvictEnabled
+ || (memoryFileStorageCounter.sum() >= evictRatio *
memoryFileStorageThreshold));
+ }
+
public ServingState currentServingState() {
long memoryUsage = getMemoryUsage();
// pause replicate threshold always greater than pause push data threshold