This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c6c319d86 [CELEBORN-1309][FOLLOWUP] Cap the max memory can be used for 
sort buffer
c6c319d86 is described below

commit c6c319d86558f21b8d01bccb8cac4603d06b1a3e
Author: CodingCat <[email protected]>
AuthorDate: Mon Mar 25 12:08:04 2024 +0800

    [CELEBORN-1309][FOLLOWUP] Cap the max memory can be used for sort buffer
    
    ### What changes were proposed in this pull request?
    
    add a new parameter to cap the max memory can be used for sort writer buffer
    
    ### Why are the changes needed?
    
    with a huge number of partitions, the threshold based on buffer size * 
number of partitions without this cap can be too large, e.g. 64K * 100000 = 6G
    
    ### Does this PR introduce _any_ user-facing change?
    
    a new parameter
    
    ### How was this patch tested?
    
    ut
    
    Closes #2388 from CodingCat/adaptive_followup.
    
    Lead-authored-by: CodingCat <[email protected]>
    Co-authored-by: zky.zhoukeyong <[email protected]>
    Co-authored-by: Keyong Zhou <[email protected]>
    Co-authored-by: Keyong Zhou <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../spark/shuffle/celeborn/SortBasedPusher.java       | 13 ++++++-------
 .../org/apache/celeborn/common/CelebornConf.scala     | 19 +++++++++++++++----
 docs/configuration/client.md                          |  5 +++--
 3 files changed, 24 insertions(+), 13 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
index b8d1ab00d..7147df6f3 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
@@ -49,9 +49,8 @@ public class SortBasedPusher extends MemoryConsumer {
     private final long maxMemoryThresholdInBytes;
     private final double smallPushTolerateFactor;
 
-    MemoryThresholdManager(
-        int numPartitions, long sendBufferSizeInBytes, double 
smallPushTolerateFactor) {
-      this.maxMemoryThresholdInBytes = numPartitions * sendBufferSizeInBytes;
+    MemoryThresholdManager(double maxMemoryFactor, double 
smallPushTolerateFactor) {
+      this.maxMemoryThresholdInBytes = (long) 
(Runtime.getRuntime().maxMemory() * maxMemoryFactor);
       this.smallPushTolerateFactor = smallPushTolerateFactor;
     }
 
@@ -131,6 +130,8 @@ public class SortBasedPusher extends MemoryConsumer {
 
   private final boolean useAdaptiveThreshold;
 
+  private final double maxMemoryFactor;
+
   public SortBasedPusher(
       TaskMemoryManager memoryManager,
       ShuffleClient shuffleClient,
@@ -192,13 +193,11 @@ public class SortBasedPusher extends MemoryConsumer {
 
     pushBufferMaxSize = conf.clientPushBufferMaxSize();
     useAdaptiveThreshold = conf.clientPushSortUseAdaptiveMemoryThreshold();
+    maxMemoryFactor = conf.clientPushSortMaxMemoryFactor();
     this.pushSortMemoryThreshold = pushSortMemoryThreshold;
 
     this.memoryThresholdManager =
-        new MemoryThresholdManager(
-            this.numPartitions,
-            this.pushBufferMaxSize,
-            conf.clientPushSortSmallPushTolerateFactor());
+        new MemoryThresholdManager(maxMemoryFactor, 
conf.clientPushSortSmallPushTolerateFactor());
 
     int initialSize = Math.min((int) pushSortMemoryThreshold / 8, 1024 * 1024);
     this.inMemSorter = new ShuffleInMemorySorter(this, initialSize);
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 99784c578..69b3a0421 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -845,6 +845,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     get(CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD)
   def clientPushSortSmallPushTolerateFactor: Double =
     get(CLIENT_PUSH_SORT_SMALL_PUSH_TOLERATE_FACTOR)
+  def clientPushSortMaxMemoryFactor: Double =
+    get(CLIENT_PUSH_SORT_MAX_MEMORY_FACTOR)
   def clientPushSortRandomizePartitionIdEnabled: Boolean =
     get(CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED)
   def clientPushRetryThreads: Int = get(CLIENT_PUSH_RETRY_THREADS)
@@ -4022,7 +4024,6 @@ object CelebornConf extends Logging {
 
   val CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD: ConfigEntry[Boolean] =
     buildConf("celeborn.client.spark.push.sort.memory.useAdaptiveThreshold")
-      .withAlternative("celeborn.push.sortMemory.useAdaptiveThreshold")
       .categories("client")
       .doc("Adaptively adjust sort-based shuffle writer's memory threshold")
       .version("0.5.0")
@@ -4030,10 +4031,9 @@ object CelebornConf extends Logging {
       .createWithDefault(false)
 
   val CLIENT_PUSH_SORT_SMALL_PUSH_TOLERATE_FACTOR: ConfigEntry[Double] =
-    buildConf("celeborn.client.spark.push.sort.smallPushTolerateFactor")
-      .withAlternative("celeborn.push.sortMemory.adaptiveThreshold")
+    buildConf("celeborn.client.spark.push.sort.memory.smallPushTolerateFactor")
       .categories("client")
-      .doc("Only be in effect when 
celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is" +
+      .doc(s"Only be in effect when 
${CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD.key} is" +
         " turned on. The larger this value is, the more aggressive Celeborn 
will enlarge the " +
         " Sort-based Shuffle writer's memory threshold. Specifically, this 
config controls when to" +
         " enlarge the sort shuffle writer's memory threshold. With N bytes 
data in memory and V as" +
@@ -4045,6 +4045,17 @@ object CelebornConf extends Logging {
       .checkValue(v => v >= 0.0, "Value must be no less than 0")
       .createWithDefault(0.2)
 
+  val CLIENT_PUSH_SORT_MAX_MEMORY_FACTOR: ConfigEntry[Double] =
+    buildConf("celeborn.client.spark.push.sort.memory.maxMemoryFactor")
+      .categories("client")
+      .doc(
+        "the max portion of executor memory which can be used for 
SortBasedWriter buffer (only" +
+          s" valid when ${CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD.key} 
is enabled")
+      .version("0.5.0")
+      .doubleConf
+      .checkValue(v => v > 0.0 && v <= 1.0, "Value must be between 0 and 1 
(inclusive)")
+      .createWithDefault(0.4)
+
   val TEST_ALTERNATIVE: OptionalConfigEntry[String] =
     buildConf("celeborn.test.alternative.key")
       .withAlternative("celeborn.test.alternative.deprecatedKey")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index d3cf2ba0d..48763e6d6 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -103,9 +103,10 @@ license: |
 | celeborn.client.spark.fetch.throwsFetchFailure | false | false | client 
throws FetchFailedException instead of CelebornIOException | 0.4.0 |  | 
 | celeborn.client.spark.push.dynamicWriteMode.enabled | false | false | 
Whether to dynamically switch push write mode based on conditions.If true, 
shuffle mode will be only determined by partition count | 0.5.0 |  | 
 | celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 | 
false | Threshold of shuffle partition number for dynamically switching push 
writer mode. When the shuffle partition number is greater than this value, use 
the sort-based shuffle writer for memory efficiency; otherwise use the 
hash-based shuffle writer for speed. This configuration only takes effect when 
celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 |  | 
+| celeborn.client.spark.push.sort.memory.maxMemoryFactor | 0.4 | false | the 
max portion of executor memory which can be used for SortBasedWriter buffer 
(only valid when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is 
enabled | 0.5.0 |  | 
+| celeborn.client.spark.push.sort.memory.smallPushTolerateFactor | 0.2 | false 
| Only be in effect when 
celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is turned on. The 
larger this value is, the more aggressive Celeborn will enlarge the  Sort-based 
Shuffle writer's memory threshold. Specifically, this config controls when to 
enlarge the sort shuffle writer's memory threshold. With N bytes data in memory 
and V as the value of this config, if the number of pushes, C, when usin [...]
 | celeborn.client.spark.push.sort.memory.threshold | 64m | false | When 
SortBasedPusher use memory over the threshold, will trigger push data. | 0.3.0 
| celeborn.push.sortMemory.threshold | 
-| celeborn.client.spark.push.sort.memory.useAdaptiveThreshold | false | false 
| Adaptively adjust sort-based shuffle writer's memory threshold | 0.5.0 | 
celeborn.push.sortMemory.useAdaptiveThreshold | 
-| celeborn.client.spark.push.sort.smallPushTolerateFactor | 0.2 | false | Only 
be in effect when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold 
is turned on. The larger this value is, the more aggressive Celeborn will 
enlarge the  Sort-based Shuffle writer's memory threshold. Specifically, this 
config controls when to enlarge the sort shuffle writer's memory threshold. 
With N bytes data in memory and V as the value of this config, if the number of 
pushes, C, when using sort  [...]
+| celeborn.client.spark.push.sort.memory.useAdaptiveThreshold | false | false 
| Adaptively adjust sort-based shuffle writer's memory threshold | 0.5.0 |  | 
 | celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | false | This 
is Celeborn's optimization on UnsafeRow for Spark and it's true by default. If 
you have changed UnsafeRow's memory layout set this to false. | 0.2.2 |  | 
 | celeborn.client.spark.shuffle.checkWorker.enabled | true | false | When 
true, before registering shuffle, LifecycleManager should check if current 
cluster have available workers, if cluster don't have available workers, 
fallback to Spark's default shuffle | 0.5.0 |  | 
 | celeborn.client.spark.shuffle.forceFallback.enabled | false | false | 
Whether force fallback shuffle to Spark's default. | 0.3.0 | 
celeborn.shuffle.forceFallback.enabled | 

Reply via email to