This is an automated email from the ASF dual-hosted git repository.
kerwinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 62036bf1d [CELEBORN] Add compression for row-based shuffle (#6380)
62036bf1d is described below
commit 62036bf1d0837fe5c941e5ac9653e183379f5ac8
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Jul 11 15:06:10 2024 +0800
[CELEBORN] Add compression for row-based shuffle (#6380)
---
.../gluten/celeborn/CelebornShuffleManager.java | 29 ++++++++++++++++++++--
1 file changed, 27 insertions(+), 2 deletions(-)
diff --git
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
index 6b450dea7..a0516d177 100644
---
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
+++
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
@@ -54,6 +54,12 @@ public class CelebornShuffleManager implements
ShuffleManager {
private static final String LOCAL_SHUFFLE_READER_KEY =
"spark.sql.adaptive.localShuffleReader.enabled";
+ private static final String CELEBORN_COMPRESSION_CODEC_KEY =
+ CelebornConf.SHUFFLE_COMPRESSION_CODEC().key();
+
+ private static final String SPARK_CELEBORN_COMPRESSION_CODEC_KEY =
+ "spark." + CELEBORN_COMPRESSION_CODEC_KEY;
+
private static final CelebornShuffleWriterFactory writerFactory;
static {
@@ -78,6 +84,8 @@ public class CelebornShuffleManager implements ShuffleManager
{
private final SparkConf conf;
private final CelebornConf celebornConf;
+ private final SparkConf rowBasedConf;
+ private final CelebornConf rowBasedCelebornConf;
// either be "{appId}_{appAttemptId}" or "{appId}"
private String appUniqueId;
@@ -89,6 +97,8 @@ public class CelebornShuffleManager implements ShuffleManager
{
ConcurrentHashMap.newKeySet();
private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
+ private final String celebornDefaultCodec;
+
// for Celeborn 0.4.0
private final Object shuffleIdTracker;
@@ -110,6 +120,16 @@ public class CelebornShuffleManager implements
ShuffleManager {
CelebornUtils.createInstance(CelebornUtils.EXECUTOR_SHUFFLE_ID_TRACKER_NAME);
this.throwsFetchFailure =
CelebornUtils.getThrowsFetchFailure(celebornConf);
+
+ this.celebornDefaultCodec =
CelebornConf.SHUFFLE_COMPRESSION_CODEC().defaultValueString();
+
+ this.rowBasedConf = conf.clone();
+ this.rowBasedCelebornConf = celebornConf.clone();
+ if ("none"
+ .equalsIgnoreCase(conf.get(SPARK_CELEBORN_COMPRESSION_CODEC_KEY,
celebornDefaultCodec))) {
+ rowBasedConf.set(SPARK_CELEBORN_COMPRESSION_CODEC_KEY,
celebornDefaultCodec);
+ rowBasedCelebornConf.set(CELEBORN_COMPRESSION_CODEC_KEY,
celebornDefaultCodec);
+ }
}
private boolean isDriver() {
@@ -133,7 +153,8 @@ public class CelebornShuffleManager implements
ShuffleManager {
synchronized (this) {
if (_vanillaCelebornShuffleManager == null) {
_vanillaCelebornShuffleManager =
-
SparkUtils.instantiateClass(VANILLA_CELEBORN_SHUFFLE_MANAGER_NAME, conf,
isDriver());
+ SparkUtils.instantiateClass(
+ VANILLA_CELEBORN_SHUFFLE_MANAGER_NAME, rowBasedConf,
isDriver());
}
}
}
@@ -330,6 +351,10 @@ public class CelebornShuffleManager implements
ShuffleManager {
if (handle instanceof CelebornShuffleHandle) {
@SuppressWarnings("unchecked")
CelebornShuffleHandle<K, ?, C> h = (CelebornShuffleHandle<K, ?, C>)
handle;
+ CelebornConf readerConf = celebornConf;
+ if (!(h.dependency() instanceof ColumnarShuffleDependency)) {
+ readerConf = rowBasedCelebornConf;
+ }
return CelebornUtils.getCelebornShuffleReader(
h,
startPartition,
@@ -337,7 +362,7 @@ public class CelebornShuffleManager implements
ShuffleManager {
startMapIndex,
endMapIndex,
context,
- celebornConf,
+ readerConf,
metrics,
shuffleIdTracker);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]