This is an automated email from the ASF dual-hosted git repository.

kerwinzhang pushed a commit to branch add-config
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 4d176ec470b00986561a9674af05f0e37bc35d0c
Author: xiyu.zk <[email protected]>
AuthorDate: Wed Feb 7 17:33:27 2024 +0800

    [CELEBORN-1267] Add config to control worker check in 
CelebornShuffleFallbackPolicyRunner
---
 .../celeborn/CelebornShuffleFallbackPolicyRunner.scala        |  4 ++++
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala  | 11 +++++++++++
 2 files changed, 15 insertions(+)

diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 6248e08ff..20743467c 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -80,6 +80,10 @@ class CelebornShuffleFallbackPolicyRunner(conf: 
CelebornConf) extends Logging {
    * @return if celeborn cluster has available workers.
    */
   def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
+    if (!conf.checkWorkerEnabled) {
+      return true
+    }
+
     val resp = lifecycleManager.checkWorkersAvailable()
     if (!resp.getAvailable) {
       logWarning(s"No workers available for current user 
${lifecycleManager.getUserIdentifier}.")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c4aad0306..8ebba9f26 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -884,6 +884,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def shufflePartitionType: PartitionType = 
PartitionType.valueOf(get(SHUFFLE_PARTITION_TYPE))
   def shuffleRangeReadFilterEnabled: Boolean = 
get(SHUFFLE_RANGE_READ_FILTER_ENABLED)
   def shuffleForceFallbackEnabled: Boolean = 
get(SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED)
+  def checkWorkerEnabled: Boolean = get(CHECK_WORKER_ENABLED)
   def shuffleForceFallbackPartitionThreshold: Long =
     get(SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD)
   def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
@@ -4003,6 +4004,16 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val CHECK_WORKER_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.client.spark.shuffle.checkWorker.enabled")
+      .categories("client")
+      .doc("When true, before registering shuffle, LifecycleManager should 
check " +
+        "if current cluster have available workers, if cluster don't have 
available " +
+        "workers, fallback to Spark's default shuffle")
+      .version("0.5.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD: ConfigEntry[Long] =
     
buildConf("celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold")
       .withAlternative("celeborn.shuffle.forceFallback.numPartitionsThreshold")

Reply via email to