This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 4304be1a6 [CELEBORN-1172][SPARK] Support dynamic switch shuffle push 
write mode based on partition number
4304be1a6 is described below

commit 4304be1a60c4d3371354ff92f4ed4a807d8840e2
Author: liangyongyuan <[email protected]>
AuthorDate: Thu Dec 21 16:58:51 2023 +0800

    [CELEBORN-1172][SPARK] Support dynamic switch shuffle push write mode based 
on partition number
    
    ### What changes were proposed in this pull request?
    Dynamically determine the writing mode in Spark based on the number of 
partitions.
    
    ### Why are the changes needed?
    Enhance the flexibility of shuffle writes to improve performance.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add uts
    
    Closes #2160 from lyy-pineapple/dynamic-write-mode.
    
    Lead-authored-by: liangyongyuan <[email protected]>
    Co-authored-by: cxzl25 <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../shuffle/celeborn/SparkShuffleManager.java      | 29 ++++++++++++++++++--
 .../celeborn/CelebornShuffleManagerSuite.scala     | 22 +++++++++++++++
 .../org/apache/celeborn/common/CelebornConf.scala  | 32 ++++++++++++++++++++--
 docs/configuration/client.md                       |  4 ++-
 4 files changed, 80 insertions(+), 7 deletions(-)

diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 03a38b933..f7c0e5985 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -110,7 +110,8 @@ public class SparkShuffleManager implements ShuffleManager {
     this.celebornConf = SparkUtils.fromSparkConf(conf);
     this.cores = executorCores(conf);
     this.fallbackPolicyRunner = new 
CelebornShuffleFallbackPolicyRunner(celebornConf);
-    if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
+    if ((ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
+            || celebornConf.dynamicWriteModeEnabled())
         && celebornConf.clientPushSortPipelineEnabled()) {
       asyncPushers = new ExecutorService[cores];
       for (int i = 0; i < asyncPushers.length; i++) {
@@ -281,7 +282,29 @@ public class SparkShuffleManager implements ShuffleManager 
{
         int shuffleId = SparkUtils.celebornShuffleId(shuffleClient, h, 
context, true);
         shuffleIdTracker.track(h.shuffleId(), shuffleId);
 
-        if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
+        ShuffleMode shuffleMode = celebornConf.shuffleWriterMode();
+        if (celebornConf.dynamicWriteModeEnabled()) {
+          int partitionCount = h.dependency().partitioner().numPartitions();
+          if (partitionCount > 
celebornConf.dynamicWriteModePartitionNumThreshold()) {
+            logger.info(
+                "Shuffle {} write mode is changed to SORT because "
+                    + "partition count {} is greater than threshold {}",
+                shuffleId,
+                partitionCount,
+                celebornConf.dynamicWriteModePartitionNumThreshold());
+            shuffleMode = ShuffleMode.SORT;
+          } else {
+            logger.info(
+                "Shuffle {} write mode is changed to HASH because "
+                    + "partition count {} is less than threshold {}",
+                shuffleId,
+                partitionCount,
+                celebornConf.dynamicWriteModePartitionNumThreshold());
+            shuffleMode = ShuffleMode.HASH;
+          }
+        }
+
+        if (ShuffleMode.SORT.equals(shuffleMode)) {
           ExecutorService pushThread =
               celebornConf.clientPushSortPipelineEnabled() ? getPusherThread() 
: null;
           return new SortBasedShuffleWriter<>(
@@ -294,7 +317,7 @@ public class SparkShuffleManager implements ShuffleManager {
               metrics,
               pushThread,
               SendBufferPool.get(cores, sendBufferPoolCheckInterval, 
sendBufferPoolExpireTimeout));
-        } else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
+        } else if (ShuffleMode.HASH.equals(shuffleMode)) {
           SendBufferPool pool =
               SendBufferPool.get(cores, sendBufferPoolCheckInterval, 
sendBufferPoolExpireTimeout);
           if (COLUMNAR_SHUFFLE_CLASSES_PRESENT && 
celebornConf.columnarShuffleEnabled()) {
diff --git 
a/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
 
b/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
index 019d3f48e..4e998ffc7 100644
--- 
a/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
+++ 
b/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
@@ -67,4 +67,26 @@ class SparkShuffleManagerSuite extends Logging {
     // scalastyle:on println
     sc.stop()
   }
+
+  def testChangeWriteModeByPartitionCount(): Unit = {
+    val conf = new SparkConf().setIfMissing("spark.master", "local")
+      .setIfMissing(
+        "spark.shuffle.manager",
+        "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
+      .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
+      .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
+      .set("spark.shuffle.service.enabled", "false")
+      .set("spark.shuffle.useOldFetchProtocol", "true")
+      .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
+      
.set(s"spark.${CelebornConf.CLIENT_PUSH_DYNAMIC_WRITE_MODE_ENABLED.key}", 
"true")
+      .set(
+        
s"spark.${CelebornConf.CLIENT_PUSH_DYNAMIC_WRITE_MODE_PARTITION_NUM_THRESHOLD.key}",
+        "15")
+      .setAppName("test")
+    val sc = new SparkContext(conf)
+    // scalastyle:off println
+    sc.parallelize(1 to 1000, 10).repartition(20).repartition(10).count()
+    // scalastyle:on println
+    sc.stop()
+  }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index e89f30078..3e779c702 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -862,6 +862,10 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   //                   Client Shuffle                    //
   // //////////////////////////////////////////////////////
   def shuffleWriterMode: ShuffleMode = 
ShuffleMode.valueOf(get(SPARK_SHUFFLE_WRITER_MODE))
+  def dynamicWriteModeEnabled =
+    get(CLIENT_PUSH_DYNAMIC_WRITE_MODE_ENABLED)
+  def dynamicWriteModePartitionNumThreshold =
+    get(CLIENT_PUSH_DYNAMIC_WRITE_MODE_PARTITION_NUM_THRESHOLD)
   def shufflePartitionType: PartitionType = 
PartitionType.valueOf(get(SHUFFLE_PARTITION_TYPE))
   def shuffleRangeReadFilterEnabled: Boolean = 
get(SHUFFLE_RANGE_READ_FILTER_ENABLED)
   def shuffleForceFallbackEnabled: Boolean = 
get(SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED)
@@ -3757,13 +3761,35 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(true)
 
+  val CLIENT_PUSH_DYNAMIC_WRITE_MODE_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.client.spark.push.dynamicWriteMode.enabled")
+      .categories("client")
+      .doc("Whether to dynamically switch push write mode based on 
conditions.If true, " +
+        s"shuffle mode will be only determined by partition count")
+      .version("0.5.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val CLIENT_PUSH_DYNAMIC_WRITE_MODE_PARTITION_NUM_THRESHOLD: ConfigEntry[Int] 
=
+    
buildConf("celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold")
+      .categories("client")
+      .doc(s"Threshold of shuffle partition number for dynamically switching 
push writer mode. " +
+        s"When the shuffle partition number is greater than this value, " +
+        s"use the sort-based shuffle writer for memory efficiency; " +
+        s"otherwise use the hash-based shuffle writer for speed. " +
+        s"This configuration only takes effect when 
${CLIENT_PUSH_DYNAMIC_WRITE_MODE_ENABLED.key} is true.")
+      .version("0.5.0")
+      .intConf
+      .createWithDefault(2000)
+
   val SPARK_SHUFFLE_WRITER_MODE: ConfigEntry[String] =
     buildConf("celeborn.client.spark.shuffle.writer")
       .withAlternative("celeborn.shuffle.writer")
       .categories("client")
-      .doc("Celeborn supports the following kind of shuffle writers. 1. hash: 
hash-based shuffle writer " +
-        "works fine when shuffle partition count is normal; 2. sort: 
sort-based shuffle writer works fine " +
-        "when memory pressure is high or shuffle partition count is huge.")
+      .doc(s"Celeborn supports the following kind of shuffle writers. 1. hash: 
hash-based shuffle writer " +
+        s"works fine when shuffle partition count is normal; 2. sort: 
sort-based shuffle writer works fine " +
+        s"when memory pressure is high or shuffle partition count is huge. " +
+        s"This configuration only takes effect when 
${CLIENT_PUSH_DYNAMIC_WRITE_MODE_ENABLED.key} is false.")
       .version("0.3.0")
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index d77cb9521..70f5e54b0 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -105,12 +105,14 @@ license: |
 | celeborn.client.spark.io.encryption.enabled | true | whether to enable io 
encryption | 0.4.0 | 
 | celeborn.client.spark.io.encryption.initialization.vector |  | io encryption 
initialization vector | 0.4.0 | 
 | celeborn.client.spark.io.encryption.key |  | io encryption key | 0.4.0 | 
+| celeborn.client.spark.push.dynamicWriteMode.enabled | false | Whether to 
dynamically switch push write mode based on conditions.If true, shuffle mode 
will be only determined by partition count | 0.5.0 | 
+| celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 | 
Threshold of shuffle partition number for dynamically switching push writer 
mode. When the shuffle partition number is greater than this value, use the 
sort-based shuffle writer for memory efficiency; otherwise use the hash-based 
shuffle writer for speed. This configuration only takes effect when 
celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 | 
 | celeborn.client.spark.push.sort.memory.threshold | 64m | When 
SortBasedPusher use memory over the threshold, will trigger push data. If the 
pipeline push feature is enabled 
(`celeborn.client.spark.push.sort.pipeline.enabled=true`), the SortBasedPusher 
will trigger a data push when the memory usage exceeds half of the threshold(by 
default, 32m). | 0.3.0 | 
 | celeborn.client.spark.push.sort.pipeline.enabled | false | Whether to enable 
pipelining for sort based shuffle writer. If true, double buffering will be 
used to pipeline push | 0.3.0 | 
 | celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | This is 
Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you 
have changed UnsafeRow's memory layout set this to false. | 0.2.2 | 
 | celeborn.client.spark.shuffle.forceFallback.enabled | false | Whether force 
fallback shuffle to Spark's default. | 0.3.0 | 
 | celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 
2147483647 | Celeborn will only accept shuffle of partition number lower than 
this configuration value. | 0.3.0 | 
-| celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the 
following kind of shuffle writers. 1. hash: hash-based shuffle writer works 
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer 
works fine when memory pressure is high or shuffle partition count is huge. | 
0.3.0 | 
+| celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the 
following kind of shuffle writers. 1. hash: hash-based shuffle writer works 
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer 
works fine when memory pressure is high or shuffle partition count is huge. 
This configuration only takes effect when 
celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | 
 | celeborn.master.endpoints | &lt;localhost&gt;:9097 | Endpoints of master 
nodes for celeborn client to connect, allowed pattern is: 
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If 
the port is omitted, 9097 will be used. | 0.2.0 | 
 | celeborn.storage.availableTypes | HDD | Enabled storages. Available options: 
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for 
Celeborn to store shuffle data. | 0.2.0 | 

Reply via email to