This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c6c319d86 [CELEBORN-1309][FOLLOWUP] Cap the max memory can be used for
sort buffer
c6c319d86 is described below
commit c6c319d86558f21b8d01bccb8cac4603d06b1a3e
Author: CodingCat <[email protected]>
AuthorDate: Mon Mar 25 12:08:04 2024 +0800
[CELEBORN-1309][FOLLOWUP] Cap the max memory can be used for sort buffer
### What changes were proposed in this pull request?
add a new parameter to cap the max memory can be used for sort writer buffer
### Why are the changes needed?
with a huge number of partitions, the threshold based on buffer size *
number of partitions without this cap can be too large, e.g. 64K * 100000 = 6G
### Does this PR introduce _any_ user-facing change?
a new parameter
### How was this patch tested?
ut
Closes #2388 from CodingCat/adaptive_followup.
Lead-authored-by: CodingCat <[email protected]>
Co-authored-by: zky.zhoukeyong <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../spark/shuffle/celeborn/SortBasedPusher.java | 13 ++++++-------
.../org/apache/celeborn/common/CelebornConf.scala | 19 +++++++++++++++----
docs/configuration/client.md | 5 +++--
3 files changed, 24 insertions(+), 13 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
index b8d1ab00d..7147df6f3 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
@@ -49,9 +49,8 @@ public class SortBasedPusher extends MemoryConsumer {
private final long maxMemoryThresholdInBytes;
private final double smallPushTolerateFactor;
- MemoryThresholdManager(
- int numPartitions, long sendBufferSizeInBytes, double
smallPushTolerateFactor) {
- this.maxMemoryThresholdInBytes = numPartitions * sendBufferSizeInBytes;
+ MemoryThresholdManager(double maxMemoryFactor, double
smallPushTolerateFactor) {
+ this.maxMemoryThresholdInBytes = (long)
(Runtime.getRuntime().maxMemory() * maxMemoryFactor);
this.smallPushTolerateFactor = smallPushTolerateFactor;
}
@@ -131,6 +130,8 @@ public class SortBasedPusher extends MemoryConsumer {
private final boolean useAdaptiveThreshold;
+ private final double maxMemoryFactor;
+
public SortBasedPusher(
TaskMemoryManager memoryManager,
ShuffleClient shuffleClient,
@@ -192,13 +193,11 @@ public class SortBasedPusher extends MemoryConsumer {
pushBufferMaxSize = conf.clientPushBufferMaxSize();
useAdaptiveThreshold = conf.clientPushSortUseAdaptiveMemoryThreshold();
+ maxMemoryFactor = conf.clientPushSortMaxMemoryFactor();
this.pushSortMemoryThreshold = pushSortMemoryThreshold;
this.memoryThresholdManager =
- new MemoryThresholdManager(
- this.numPartitions,
- this.pushBufferMaxSize,
- conf.clientPushSortSmallPushTolerateFactor());
+ new MemoryThresholdManager(maxMemoryFactor,
conf.clientPushSortSmallPushTolerateFactor());
int initialSize = Math.min((int) pushSortMemoryThreshold / 8, 1024 * 1024);
this.inMemSorter = new ShuffleInMemorySorter(this, initialSize);
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 99784c578..69b3a0421 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -845,6 +845,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
get(CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD)
def clientPushSortSmallPushTolerateFactor: Double =
get(CLIENT_PUSH_SORT_SMALL_PUSH_TOLERATE_FACTOR)
+ def clientPushSortMaxMemoryFactor: Double =
+ get(CLIENT_PUSH_SORT_MAX_MEMORY_FACTOR)
def clientPushSortRandomizePartitionIdEnabled: Boolean =
get(CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED)
def clientPushRetryThreads: Int = get(CLIENT_PUSH_RETRY_THREADS)
@@ -4022,7 +4024,6 @@ object CelebornConf extends Logging {
val CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD: ConfigEntry[Boolean] =
buildConf("celeborn.client.spark.push.sort.memory.useAdaptiveThreshold")
- .withAlternative("celeborn.push.sortMemory.useAdaptiveThreshold")
.categories("client")
.doc("Adaptively adjust sort-based shuffle writer's memory threshold")
.version("0.5.0")
@@ -4030,10 +4031,9 @@ object CelebornConf extends Logging {
.createWithDefault(false)
val CLIENT_PUSH_SORT_SMALL_PUSH_TOLERATE_FACTOR: ConfigEntry[Double] =
- buildConf("celeborn.client.spark.push.sort.smallPushTolerateFactor")
- .withAlternative("celeborn.push.sortMemory.adaptiveThreshold")
+ buildConf("celeborn.client.spark.push.sort.memory.smallPushTolerateFactor")
.categories("client")
- .doc("Only be in effect when
celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is" +
+ .doc(s"Only be in effect when
${CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD.key} is" +
" turned on. The larger this value is, the more aggressive Celeborn
will enlarge the " +
" Sort-based Shuffle writer's memory threshold. Specifically, this
config controls when to" +
" enlarge the sort shuffle writer's memory threshold. With N bytes
data in memory and V as" +
@@ -4045,6 +4045,17 @@ object CelebornConf extends Logging {
.checkValue(v => v >= 0.0, "Value must be no less than 0")
.createWithDefault(0.2)
+ val CLIENT_PUSH_SORT_MAX_MEMORY_FACTOR: ConfigEntry[Double] =
+ buildConf("celeborn.client.spark.push.sort.memory.maxMemoryFactor")
+ .categories("client")
+ .doc(
+ "the max portion of executor memory which can be used for
SortBasedWriter buffer (only" +
+ s" valid when ${CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD.key}
is enabled")
+ .version("0.5.0")
+ .doubleConf
+ .checkValue(v => v > 0.0 && v <= 1.0, "Value must be between 0 and 1
(inclusive)")
+ .createWithDefault(0.4)
+
val TEST_ALTERNATIVE: OptionalConfigEntry[String] =
buildConf("celeborn.test.alternative.key")
.withAlternative("celeborn.test.alternative.deprecatedKey")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index d3cf2ba0d..48763e6d6 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -103,9 +103,10 @@ license: |
| celeborn.client.spark.fetch.throwsFetchFailure | false | false | client
throws FetchFailedException instead of CelebornIOException | 0.4.0 | |
| celeborn.client.spark.push.dynamicWriteMode.enabled | false | false |
Whether to dynamically switch push write mode based on conditions.If true,
shuffle mode will be only determined by partition count | 0.5.0 | |
| celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 |
false | Threshold of shuffle partition number for dynamically switching push
writer mode. When the shuffle partition number is greater than this value, use
the sort-based shuffle writer for memory efficiency; otherwise use the
hash-based shuffle writer for speed. This configuration only takes effect when
celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 | |
+| celeborn.client.spark.push.sort.memory.maxMemoryFactor | 0.4 | false | the
max portion of executor memory which can be used for SortBasedWriter buffer
(only valid when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is
enabled | 0.5.0 | |
+| celeborn.client.spark.push.sort.memory.smallPushTolerateFactor | 0.2 | false
| Only be in effect when
celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is turned on. The
larger this value is, the more aggressive Celeborn will enlarge the Sort-based
Shuffle writer's memory threshold. Specifically, this config controls when to
enlarge the sort shuffle writer's memory threshold. With N bytes data in memory
and V as the value of this config, if the number of pushes, C, when usin [...]
| celeborn.client.spark.push.sort.memory.threshold | 64m | false | When
SortBasedPusher use memory over the threshold, will trigger push data. | 0.3.0
| celeborn.push.sortMemory.threshold |
-| celeborn.client.spark.push.sort.memory.useAdaptiveThreshold | false | false
| Adaptively adjust sort-based shuffle writer's memory threshold | 0.5.0 |
celeborn.push.sortMemory.useAdaptiveThreshold |
-| celeborn.client.spark.push.sort.smallPushTolerateFactor | 0.2 | false | Only
be in effect when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold
is turned on. The larger this value is, the more aggressive Celeborn will
enlarge the Sort-based Shuffle writer's memory threshold. Specifically, this
config controls when to enlarge the sort shuffle writer's memory threshold.
With N bytes data in memory and V as the value of this config, if the number of
pushes, C, when using sort [...]
+| celeborn.client.spark.push.sort.memory.useAdaptiveThreshold | false | false
| Adaptively adjust sort-based shuffle writer's memory threshold | 0.5.0 | |
| celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | false | This
is Celeborn's optimization on UnsafeRow for Spark and it's true by default. If
you have changed UnsafeRow's memory layout set this to false. | 0.2.2 | |
| celeborn.client.spark.shuffle.checkWorker.enabled | true | false | When
true, before registering shuffle, LifecycleManager should check if current
cluster have available workers, if cluster don't have available workers,
fallback to Spark's default shuffle | 0.5.0 | |
| celeborn.client.spark.shuffle.forceFallback.enabled | false | false |
Whether force fallback shuffle to Spark's default. | 0.3.0 |
celeborn.shuffle.forceFallback.enabled |