This is an automated email from the ASF dual-hosted git repository.

kerwinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 62036bf1d [CELEBORN] Add compression for row-based shuffle (#6380)
62036bf1d is described below

commit 62036bf1d0837fe5c941e5ac9653e183379f5ac8
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Jul 11 15:06:10 2024 +0800

    [CELEBORN] Add compression for row-based shuffle (#6380)
---
 .../gluten/celeborn/CelebornShuffleManager.java    | 29 ++++++++++++++++++++--
 1 file changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
 
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
index 6b450dea7..a0516d177 100644
--- 
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
+++ 
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
@@ -54,6 +54,12 @@ public class CelebornShuffleManager implements 
ShuffleManager {
   private static final String LOCAL_SHUFFLE_READER_KEY =
       "spark.sql.adaptive.localShuffleReader.enabled";
 
+  private static final String CELEBORN_COMPRESSION_CODEC_KEY =
+      CelebornConf.SHUFFLE_COMPRESSION_CODEC().key();
+
+  private static final String SPARK_CELEBORN_COMPRESSION_CODEC_KEY =
+      "spark." + CELEBORN_COMPRESSION_CODEC_KEY;
+
   private static final CelebornShuffleWriterFactory writerFactory;
 
   static {
@@ -78,6 +84,8 @@ public class CelebornShuffleManager implements ShuffleManager 
{
 
   private final SparkConf conf;
   private final CelebornConf celebornConf;
+  private final SparkConf rowBasedConf;
+  private final CelebornConf rowBasedCelebornConf;
   // either be "{appId}_{appAttemptId}" or "{appId}"
   private String appUniqueId;
 
@@ -89,6 +97,8 @@ public class CelebornShuffleManager implements ShuffleManager 
{
       ConcurrentHashMap.newKeySet();
   private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
 
+  private final String celebornDefaultCodec;
+
   // for Celeborn 0.4.0
   private final Object shuffleIdTracker;
 
@@ -110,6 +120,16 @@ public class CelebornShuffleManager implements 
ShuffleManager {
         
CelebornUtils.createInstance(CelebornUtils.EXECUTOR_SHUFFLE_ID_TRACKER_NAME);
 
     this.throwsFetchFailure = 
CelebornUtils.getThrowsFetchFailure(celebornConf);
+
+    this.celebornDefaultCodec = 
CelebornConf.SHUFFLE_COMPRESSION_CODEC().defaultValueString();
+
+    this.rowBasedConf = conf.clone();
+    this.rowBasedCelebornConf = celebornConf.clone();
+    if ("none"
+        .equalsIgnoreCase(conf.get(SPARK_CELEBORN_COMPRESSION_CODEC_KEY, 
celebornDefaultCodec))) {
+      rowBasedConf.set(SPARK_CELEBORN_COMPRESSION_CODEC_KEY, 
celebornDefaultCodec);
+      rowBasedCelebornConf.set(CELEBORN_COMPRESSION_CODEC_KEY, 
celebornDefaultCodec);
+    }
   }
 
   private boolean isDriver() {
@@ -133,7 +153,8 @@ public class CelebornShuffleManager implements 
ShuffleManager {
       synchronized (this) {
         if (_vanillaCelebornShuffleManager == null) {
           _vanillaCelebornShuffleManager =
-              
SparkUtils.instantiateClass(VANILLA_CELEBORN_SHUFFLE_MANAGER_NAME, conf, 
isDriver());
+              SparkUtils.instantiateClass(
+                  VANILLA_CELEBORN_SHUFFLE_MANAGER_NAME, rowBasedConf, 
isDriver());
         }
       }
     }
@@ -330,6 +351,10 @@ public class CelebornShuffleManager implements 
ShuffleManager {
     if (handle instanceof CelebornShuffleHandle) {
       @SuppressWarnings("unchecked")
       CelebornShuffleHandle<K, ?, C> h = (CelebornShuffleHandle<K, ?, C>) 
handle;
+      CelebornConf readerConf = celebornConf;
+      if (!(h.dependency() instanceof ColumnarShuffleDependency)) {
+        readerConf = rowBasedCelebornConf;
+      }
       return CelebornUtils.getCelebornShuffleReader(
           h,
           startPartition,
@@ -337,7 +362,7 @@ public class CelebornShuffleManager implements 
ShuffleManager {
           startMapIndex,
           endMapIndex,
           context,
-          celebornConf,
+          readerConf,
           metrics,
           shuffleIdTracker);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to