This is an automated email from the ASF dual-hosted git repository.
kerwinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 3a42e8fbd [CELEBORN] Add config to control celeborn fallback for CI
(#6230)
3a42e8fbd is described below
commit 3a42e8fbd3797390a554839ef10e6f9b073460d6
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Jun 27 11:49:15 2024 +0800
[CELEBORN] Add config to control celeborn fallback for CI (#6230)
---
.../shuffle/gluten/celeborn/CelebornShuffleManager.java | 12 +++++++++---
.../src/main/scala/org/apache/gluten/GlutenConfig.scala | 10 ++++++++++
.../main/scala/org/apache/gluten/integration/Constants.scala | 1 +
3 files changed, 20 insertions(+), 3 deletions(-)
diff --git
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
index d196691d1..63fb0cc1b 100644
---
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
+++
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.spark.shuffle.gluten.celeborn;
+import org.apache.gluten.GlutenConfig;
import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.exception.GlutenException;
@@ -194,9 +195,14 @@ public class CelebornShuffleManager implements
ShuffleManager {
if (dependency instanceof ColumnarShuffleDependency) {
if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
- logger.warn("Fallback to ColumnarShuffleManager!");
- columnarShuffleIds.add(shuffleId);
- return columnarShuffleManager().registerShuffle(shuffleId, dependency);
+ if (GlutenConfig.getConf().enableCelebornFallback()) {
+ logger.warn("Fallback to ColumnarShuffleManager!");
+ columnarShuffleIds.add(shuffleId);
+ return columnarShuffleManager().registerShuffle(shuffleId,
dependency);
+ } else {
+ throw new GlutenException(
+ "The Celeborn service(Master: " + celebornConf.masterHost() + ")
is unavailable");
+ }
} else {
return registerCelebornShuffleHandle(shuffleId, dependency);
}
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 89933cc58..58b99a7f3 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -447,6 +447,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {
conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
def enableHiveFileFormatWriter: Boolean =
conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
+
+ def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED)
}
object GlutenConfig {
@@ -2049,4 +2051,12 @@ object GlutenConfig {
.doubleConf
.checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must
between [0, 1]")
.createWithDefault(0.6)
+
+ val CELEBORN_FALLBACK_ENABLED =
+
buildStaticConf("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled")
+ .internal()
+ .doc("If enabled, fall back to ColumnarShuffleManager when celeborn
service is unavailable." +
+ "Otherwise, throw an exception.")
+ .booleanConf
+ .createWithDefault(true)
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
index 50766f3a9..e680ce9d5 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
@@ -44,6 +44,7 @@ object Constants {
val VELOX_WITH_CELEBORN_CONF: SparkConf = new SparkConf(false)
.set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
+ .set("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled",
"false")
.set("spark.sql.parquet.enableVectorizedReader", "true")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]