This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new d1df84edf [CELEBORN-914][FOLLOWUP] Add aggressive mode to evict memory 
shuffle file
d1df84edf is described below

commit d1df84edf4ad97c6c9dada9d88021e0d8f89b7c5
Author: mingji <[email protected]>
AuthorDate: Mon Jul 15 10:27:30 2024 +0800

    [CELEBORN-914][FOLLOWUP] Add aggressive mode to evict memory shuffle file
    
    ### What changes were proposed in this pull request?
    Add an aggressive mode to evict memory shuffle files.
    
    ### Why are the changes needed?
    To evict more shuffle files to reduce memory pressure.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA.
    
    Closes #2602 from FMX/b914-4.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 04c4b857a1b4e4676f1fa4fda4449cd92b1a2e17)
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 23 ++++++++++++++++++++++
 docs/configuration/worker.md                       |  2 ++
 .../deploy/worker/memory/MemoryManager.java        | 14 +++++++++----
 3 files changed, 35 insertions(+), 4 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index e4e7b9026..a303833ce 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1206,6 +1206,10 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
     get(WORKER_DIRECT_MEMORY_RATIO_FOR_MEMORY_FILE_STORAGE)
   def workerMemoryFileStorageMaxFileSize: Long =
     get(WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE)
+  def workerMemoryFileStorageEictAggressiveModeEnabled: Boolean =
+    get(WORKER_MEMORY_FILE_STORAGE_EVICT_AGGRESSIVE_MODE_ENABLED)
+  def workerMemoryFileStorageEvictRatio: Double =
+    get(WORKER_MEMORY_FILE_STORAGE_EVICT_RATIO)
 
   // //////////////////////////////////////////////////////
   //                  Rate Limit controller              //
@@ -3268,6 +3272,25 @@ object CelebornConf extends Logging {
       .checkValue(v => v < Int.MaxValue, "A single memory storage file can not 
be larger than 2GB")
       .createWithDefaultString("8MB")
 
+  val WORKER_MEMORY_FILE_STORAGE_EVICT_AGGRESSIVE_MODE_ENABLED: 
ConfigEntry[Boolean] =
+    buildConf("celeborn.worker.memoryFileStorage.evict.aggressiveMode.enabled")
+      .categories("worker")
+      .doc(
+        "If this set to true, memory shuffle files will be evicted when worker 
is in PAUSED state." +
+          " If the worker's offheap memory is not ample, set this to true " +
+          "and decrease 
`celeborn.worker.directMemoryRatioForMemoryFileStorage` will be helpful.")
+      .version("0.5.1")
+      .booleanConf
+      .createWithDefault(false)
+
+  val WORKER_MEMORY_FILE_STORAGE_EVICT_RATIO: ConfigEntry[Double] =
+    buildConf("celeborn.worker.memoryFileStorage.evict.ratio")
+      .categories("worker")
+      .doc("If memory shuffle storage usage rate is above this config, the 
memory storage shuffle files will evict to free memory.")
+      .version("0.5.1")
+      .doubleConf
+      .createWithDefault(0.5)
+
   val WORKER_CONGESTION_CONTROL_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.worker.congestionControl.enabled")
       .categories("worker")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index ee6756131..0766e8a42 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -102,6 +102,8 @@ license: |
 | celeborn.worker.jvmQuake.exitCode | 502 | false | The exit code of system 
kill for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 |  | 
 | celeborn.worker.jvmQuake.kill.threshold | 60s | false | The threshold of 
system kill for the maximum GC 'deficit' which can be accumulated before 
jvmquake takes action. | 0.4.0 |  | 
 | celeborn.worker.jvmQuake.runtimeWeight | 5.0 | false | The factor by which 
to multiply running JVM time, when weighing it against GCing time. 'Deficit' is 
accumulated as `gc_time - runtime * runtime_weight`, and is compared against 
threshold to determine whether to take action. | 0.4.0 |  | 
+| celeborn.worker.memoryFileStorage.evict.aggressiveMode.enabled | false | 
false | If this set to true, memory shuffle files will be evicted when worker 
is in PAUSED state. If the worker's offheap memory is not ample, set this to 
true and decrease `celeborn.worker.directMemoryRatioForMemoryFileStorage` will 
be helpful. | 0.5.1 |  | 
+| celeborn.worker.memoryFileStorage.evict.ratio | 0.5 | false | If memory 
shuffle storage usage rate is above this config, the memory storage shuffle 
files will evict to free memory. | 0.5.1 |  | 
 | celeborn.worker.memoryFileStorage.maxFileSize | 8MB | false | Max size for a 
memory storage file. It must be less than 2GB. | 0.5.0 |  | 
 | celeborn.worker.monitor.disk.check.interval | 30s | false | Intervals 
between device monitor to check disk. | 0.3.0 | 
celeborn.worker.monitor.disk.checkInterval | 
 | celeborn.worker.monitor.disk.check.timeout | 30s | false | Timeout time for 
worker check device status. | 0.3.0 | celeborn.worker.disk.check.timeout | 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index ae25060c3..c8b6af9af 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -126,6 +126,8 @@ public class MemoryManager {
     double readBufferTargetRatio = conf.readBufferTargetRatio();
     long readBufferTargetUpdateInterval = 
conf.readBufferTargetUpdateInterval();
     long readBufferTargetNotifyThreshold = 
conf.readBufferTargetNotifyThreshold();
+    boolean aggressiveEvictModeEnabled = 
conf.workerMemoryFileStorageEictAggressiveModeEnabled();
+    double evictRatio = conf.workerMemoryFileStorageEvictRatio();
     forceAppendPauseSpentTimeThreshold = 
conf.metricsWorkerForceAppendPauseSpentTimeThreshold();
     maxDirectMemory =
         DynMethods.builder("maxDirectMemory")
@@ -232,8 +234,7 @@ public class MemoryManager {
       memoryFileStorageService.scheduleWithFixedDelay(
           () -> {
             try {
-              if ((memoryFileStorageCounter.sum() >= 0.5 * 
memoryFileStorageThreshold)
-                  && currentServingState() != ServingState.NONE_PAUSED) {
+              if (shouldEvict(aggressiveEvictModeEnabled, evictRatio)) {
                 List<PartitionDataWriter> memoryWriters =
                     new ArrayList<>(storageManager.memoryWriters().values());
                 if (memoryWriters.isEmpty()) {
@@ -247,8 +248,7 @@ public class MemoryManager {
                 try {
                   for (PartitionDataWriter writer : memoryWriters) {
                     // this branch means that there is no memory pressure
-                    if ((memoryFileStorageCounter.sum() < 0.5 * 
memoryFileStorageThreshold)
-                        || currentServingState() == ServingState.NONE_PAUSED) {
+                    if (!shouldEvict(aggressiveEvictModeEnabled, evictRatio)) {
                       break;
                     }
                     logger.debug("Evict writer {}", writer);
@@ -283,6 +283,12 @@ public class MemoryManager {
         resumeRatio);
   }
 
+  public boolean shouldEvict(boolean aggressiveMemoryFileEvictEnabled, double 
evictRatio) {
+    return currentServingState() != ServingState.NONE_PAUSED
+        && (aggressiveMemoryFileEvictEnabled
+            || (memoryFileStorageCounter.sum() >= evictRatio * 
memoryFileStorageThreshold));
+  }
+
   public ServingState currentServingState() {
     long memoryUsage = getMemoryUsage();
     // pause replicate threshold always greater than pause push data threshold

Reply via email to