This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 4304be1a6 [CELEBORN-1172][SPARK] Support dynamic switch shuffle push
write mode based on partition number
4304be1a6 is described below
commit 4304be1a60c4d3371354ff92f4ed4a807d8840e2
Author: liangyongyuan <[email protected]>
AuthorDate: Thu Dec 21 16:58:51 2023 +0800
[CELEBORN-1172][SPARK] Support dynamic switch shuffle push write mode based
on partition number
### What changes were proposed in this pull request?
Dynamically determine the writing mode in Spark based on the number of
partitions.
### Why are the changes needed?
Enhance the flexibility of shuffle writes to improve performance.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add uts
Closes #2160 from lyy-pineapple/dynamic-write-mode.
Lead-authored-by: liangyongyuan <[email protected]>
Co-authored-by: cxzl25 <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../shuffle/celeborn/SparkShuffleManager.java | 29 ++++++++++++++++++--
.../celeborn/CelebornShuffleManagerSuite.scala | 22 +++++++++++++++
.../org/apache/celeborn/common/CelebornConf.scala | 32 ++++++++++++++++++++--
docs/configuration/client.md | 4 ++-
4 files changed, 80 insertions(+), 7 deletions(-)
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 03a38b933..f7c0e5985 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -110,7 +110,8 @@ public class SparkShuffleManager implements ShuffleManager {
this.celebornConf = SparkUtils.fromSparkConf(conf);
this.cores = executorCores(conf);
this.fallbackPolicyRunner = new
CelebornShuffleFallbackPolicyRunner(celebornConf);
- if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
+ if ((ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
+ || celebornConf.dynamicWriteModeEnabled())
&& celebornConf.clientPushSortPipelineEnabled()) {
asyncPushers = new ExecutorService[cores];
for (int i = 0; i < asyncPushers.length; i++) {
@@ -281,7 +282,29 @@ public class SparkShuffleManager implements ShuffleManager
{
int shuffleId = SparkUtils.celebornShuffleId(shuffleClient, h,
context, true);
shuffleIdTracker.track(h.shuffleId(), shuffleId);
- if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
+ ShuffleMode shuffleMode = celebornConf.shuffleWriterMode();
+ if (celebornConf.dynamicWriteModeEnabled()) {
+ int partitionCount = h.dependency().partitioner().numPartitions();
+ if (partitionCount >
celebornConf.dynamicWriteModePartitionNumThreshold()) {
+ logger.info(
+ "Shuffle {} write mode is changed to SORT because "
+ + "partition count {} is greater than threshold {}",
+ shuffleId,
+ partitionCount,
+ celebornConf.dynamicWriteModePartitionNumThreshold());
+ shuffleMode = ShuffleMode.SORT;
+ } else {
+ logger.info(
+ "Shuffle {} write mode is changed to HASH because "
+ + "partition count {} is less than threshold {}",
+ shuffleId,
+ partitionCount,
+ celebornConf.dynamicWriteModePartitionNumThreshold());
+ shuffleMode = ShuffleMode.HASH;
+ }
+ }
+
+ if (ShuffleMode.SORT.equals(shuffleMode)) {
ExecutorService pushThread =
celebornConf.clientPushSortPipelineEnabled() ? getPusherThread()
: null;
return new SortBasedShuffleWriter<>(
@@ -294,7 +317,7 @@ public class SparkShuffleManager implements ShuffleManager {
metrics,
pushThread,
SendBufferPool.get(cores, sendBufferPoolCheckInterval,
sendBufferPoolExpireTimeout));
- } else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
+ } else if (ShuffleMode.HASH.equals(shuffleMode)) {
SendBufferPool pool =
SendBufferPool.get(cores, sendBufferPoolCheckInterval,
sendBufferPoolExpireTimeout);
if (COLUMNAR_SHUFFLE_CLASSES_PRESENT &&
celebornConf.columnarShuffleEnabled()) {
diff --git
a/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
b/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
index 019d3f48e..4e998ffc7 100644
---
a/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
+++
b/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
@@ -67,4 +67,26 @@ class SparkShuffleManagerSuite extends Logging {
// scalastyle:on println
sc.stop()
}
+
+ def testChangeWriteModeByPartitionCount(): Unit = {
+ val conf = new SparkConf().setIfMissing("spark.master", "local")
+ .setIfMissing(
+ "spark.shuffle.manager",
+ "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
+ .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
+ .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
+ .set("spark.shuffle.service.enabled", "false")
+ .set("spark.shuffle.useOldFetchProtocol", "true")
+ .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
+
.set(s"spark.${CelebornConf.CLIENT_PUSH_DYNAMIC_WRITE_MODE_ENABLED.key}",
"true")
+ .set(
+
s"spark.${CelebornConf.CLIENT_PUSH_DYNAMIC_WRITE_MODE_PARTITION_NUM_THRESHOLD.key}",
+ "15")
+ .setAppName("test")
+ val sc = new SparkContext(conf)
+ // scalastyle:off println
+ sc.parallelize(1 to 1000, 10).repartition(20).repartition(10).count()
+ // scalastyle:on println
+ sc.stop()
+ }
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index e89f30078..3e779c702 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -862,6 +862,10 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
// Client Shuffle //
// //////////////////////////////////////////////////////
def shuffleWriterMode: ShuffleMode =
ShuffleMode.valueOf(get(SPARK_SHUFFLE_WRITER_MODE))
+ def dynamicWriteModeEnabled =
+ get(CLIENT_PUSH_DYNAMIC_WRITE_MODE_ENABLED)
+ def dynamicWriteModePartitionNumThreshold =
+ get(CLIENT_PUSH_DYNAMIC_WRITE_MODE_PARTITION_NUM_THRESHOLD)
def shufflePartitionType: PartitionType =
PartitionType.valueOf(get(SHUFFLE_PARTITION_TYPE))
def shuffleRangeReadFilterEnabled: Boolean =
get(SHUFFLE_RANGE_READ_FILTER_ENABLED)
def shuffleForceFallbackEnabled: Boolean =
get(SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED)
@@ -3757,13 +3761,35 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)
+ val CLIENT_PUSH_DYNAMIC_WRITE_MODE_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.spark.push.dynamicWriteMode.enabled")
+ .categories("client")
+ .doc("Whether to dynamically switch push write mode based on
conditions.If true, " +
+ s"shuffle mode will be only determined by partition count")
+ .version("0.5.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ val CLIENT_PUSH_DYNAMIC_WRITE_MODE_PARTITION_NUM_THRESHOLD: ConfigEntry[Int]
=
+
buildConf("celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold")
+ .categories("client")
+ .doc(s"Threshold of shuffle partition number for dynamically switching
push writer mode. " +
+ s"When the shuffle partition number is greater than this value, " +
+ s"use the sort-based shuffle writer for memory efficiency; " +
+ s"otherwise use the hash-based shuffle writer for speed. " +
+ s"This configuration only takes effect when
${CLIENT_PUSH_DYNAMIC_WRITE_MODE_ENABLED.key} is true.")
+ .version("0.5.0")
+ .intConf
+ .createWithDefault(2000)
+
val SPARK_SHUFFLE_WRITER_MODE: ConfigEntry[String] =
buildConf("celeborn.client.spark.shuffle.writer")
.withAlternative("celeborn.shuffle.writer")
.categories("client")
- .doc("Celeborn supports the following kind of shuffle writers. 1. hash:
hash-based shuffle writer " +
- "works fine when shuffle partition count is normal; 2. sort:
sort-based shuffle writer works fine " +
- "when memory pressure is high or shuffle partition count is huge.")
+ .doc(s"Celeborn supports the following kind of shuffle writers. 1. hash:
hash-based shuffle writer " +
+ s"works fine when shuffle partition count is normal; 2. sort:
sort-based shuffle writer works fine " +
+ s"when memory pressure is high or shuffle partition count is huge. " +
+ s"This configuration only takes effect when
${CLIENT_PUSH_DYNAMIC_WRITE_MODE_ENABLED.key} is false.")
.version("0.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index d77cb9521..70f5e54b0 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -105,12 +105,14 @@ license: |
| celeborn.client.spark.io.encryption.enabled | true | whether to enable io
encryption | 0.4.0 |
| celeborn.client.spark.io.encryption.initialization.vector | | io encryption
initialization vector | 0.4.0 |
| celeborn.client.spark.io.encryption.key | | io encryption key | 0.4.0 |
+| celeborn.client.spark.push.dynamicWriteMode.enabled | false | Whether to
dynamically switch push write mode based on conditions.If true, shuffle mode
will be only determined by partition count | 0.5.0 |
+| celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 |
Threshold of shuffle partition number for dynamically switching push writer
mode. When the shuffle partition number is greater than this value, use the
sort-based shuffle writer for memory efficiency; otherwise use the hash-based
shuffle writer for speed. This configuration only takes effect when
celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 |
| celeborn.client.spark.push.sort.memory.threshold | 64m | When
SortBasedPusher use memory over the threshold, will trigger push data. If the
pipeline push feature is enabled
(`celeborn.client.spark.push.sort.pipeline.enabled=true`), the SortBasedPusher
will trigger a data push when the memory usage exceeds half of the threshold(by
default, 32m). | 0.3.0 |
| celeborn.client.spark.push.sort.pipeline.enabled | false | Whether to enable
pipelining for sort based shuffle writer. If true, double buffering will be
used to pipeline push | 0.3.0 |
| celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | This is
Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you
have changed UnsafeRow's memory layout set this to false. | 0.2.2 |
| celeborn.client.spark.shuffle.forceFallback.enabled | false | Whether force
fallback shuffle to Spark's default. | 0.3.0 |
| celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold |
2147483647 | Celeborn will only accept shuffle of partition number lower than
this configuration value. | 0.3.0 |
-| celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the
following kind of shuffle writers. 1. hash: hash-based shuffle writer works
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer
works fine when memory pressure is high or shuffle partition count is huge. |
0.3.0 |
+| celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the
following kind of shuffle writers. 1. hash: hash-based shuffle writer works
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer
works fine when memory pressure is high or shuffle partition count is huge.
This configuration only takes effect when
celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 |
| celeborn.master.endpoints | <localhost>:9097 | Endpoints of master
nodes for celeborn client to connect, allowed pattern is:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used. | 0.2.0 |
| celeborn.storage.availableTypes | HDD | Enabled storages. Available options:
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 |
| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for
Celeborn to store shuffle data. | 0.2.0 |