This is an automated email from the ASF dual-hosted git repository.

kerwinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a42e8fbd [CELEBORN] Add config to control celeborn fallback for CI 
(#6230)
3a42e8fbd is described below

commit 3a42e8fbd3797390a554839ef10e6f9b073460d6
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Jun 27 11:49:15 2024 +0800

    [CELEBORN] Add config to control celeborn fallback for CI (#6230)
---
 .../shuffle/gluten/celeborn/CelebornShuffleManager.java      | 12 +++++++++---
 .../src/main/scala/org/apache/gluten/GlutenConfig.scala      | 10 ++++++++++
 .../main/scala/org/apache/gluten/integration/Constants.scala |  1 +
 3 files changed, 20 insertions(+), 3 deletions(-)

diff --git 
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
 
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
index d196691d1..63fb0cc1b 100644
--- 
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
+++ 
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.shuffle.gluten.celeborn;
 
+import org.apache.gluten.GlutenConfig;
 import org.apache.gluten.backendsapi.BackendsApiManager;
 import org.apache.gluten.exception.GlutenException;
 
@@ -194,9 +195,14 @@ public class CelebornShuffleManager implements 
ShuffleManager {
     if (dependency instanceof ColumnarShuffleDependency) {
       if (fallbackPolicyRunner.applyAllFallbackPolicy(
           lifecycleManager, dependency.partitioner().numPartitions())) {
-        logger.warn("Fallback to ColumnarShuffleManager!");
-        columnarShuffleIds.add(shuffleId);
-        return columnarShuffleManager().registerShuffle(shuffleId, dependency);
+        if (GlutenConfig.getConf().enableCelebornFallback()) {
+          logger.warn("Fallback to ColumnarShuffleManager!");
+          columnarShuffleIds.add(shuffleId);
+          return columnarShuffleManager().registerShuffle(shuffleId, 
dependency);
+        } else {
+          throw new GlutenException(
+              "The Celeborn service(Master: " + celebornConf.masterHost() + ") 
is unavailable");
+        }
       } else {
         return registerCelebornShuffleHandle(shuffleId, dependency);
       }
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 89933cc58..58b99a7f3 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -447,6 +447,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {
     conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
 
   def enableHiveFileFormatWriter: Boolean = 
conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
+
+  def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED)
 }
 
 object GlutenConfig {
@@ -2049,4 +2051,12 @@ object GlutenConfig {
       .doubleConf
       .checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must 
between [0, 1]")
       .createWithDefault(0.6)
+
+  val CELEBORN_FALLBACK_ENABLED =
+    
buildStaticConf("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled")
+      .internal()
+      .doc("If enabled, fall back to ColumnarShuffleManager when celeborn 
service is unavailable." +
+        "Otherwise, throw an exception.")
+      .booleanConf
+      .createWithDefault(true)
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
index 50766f3a9..e680ce9d5 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
@@ -44,6 +44,7 @@ object Constants {
 
   val VELOX_WITH_CELEBORN_CONF: SparkConf = new SparkConf(false)
     .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
+    .set("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled", 
"false")
     .set("spark.sql.parquet.enableVectorizedReader", "true")
     .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
     .set(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to