This is an automated email from the ASF dual-hosted git repository.

kerwinzhang pushed a commit to branch add-config
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 11188fb1d6408f7b54588947a8954150f867279c
Author: xiyu.zk <[email protected]>
AuthorDate: Wed Feb 7 17:33:27 2024 +0800

    [CELEBORN-1267] Add config to control worker check in 
CelebornShuffleFallbackPolicyRunner
---
 .../celeborn/CelebornShuffleFallbackPolicyRunner.scala        |  4 ++++
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala  | 11 +++++++++++
 2 files changed, 15 insertions(+)

diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 6248e08ff..ddf06ab43 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -80,6 +80,10 @@ class CelebornShuffleFallbackPolicyRunner(conf: 
CelebornConf) extends Logging {
    * @return if celeborn cluster has available workers.
    */
   def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
+    if (!conf.workerEnabled) {
+      return true
+    }
+
     val resp = lifecycleManager.checkWorkersAvailable()
     if (!resp.getAvailable) {
       logWarning(s"No workers available for current user 
${lifecycleManager.getUserIdentifier}.")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c4aad0306..948c7fd3c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -754,6 +754,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   //                      Quota                         //
   // //////////////////////////////////////////////////////
   def quotaEnabled: Boolean = get(QUOTA_ENABLED)
+  def workerEnabled: Boolean = get(WORKER_ENABLED)
   def quotaIdentityProviderClass: String = get(QUOTA_IDENTITY_PROVIDER)
   def quotaManagerClass: String = get(QUOTA_MANAGER)
   def quotaConfigurationPath: Option[String] = get(QUOTA_CONFIGURATION_PATH)
@@ -4161,6 +4162,16 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(true)
 
+  val WORKER_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.worker.enabled")
+      .categories("worker")
+      .doc("When true, before registering shuffle, LifecycleManager should 
check " +
+        "if current user have available workers, if cluster don't have 
available " +
+        "workers for current user, fallback to Spark's default shuffle")
+      .version("0.3.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val QUOTA_IDENTITY_PROVIDER: ConfigEntry[String] =
     buildConf("celeborn.quota.identity.provider")
       .categories("quota")

Reply via email to