This is an automated email from the ASF dual-hosted git repository. kerwinzhang pushed a commit to branch add-config in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 11188fb1d6408f7b54588947a8954150f867279c Author: xiyu.zk <[email protected]> AuthorDate: Wed Feb 7 17:33:27 2024 +0800 [CELEBORN-1267] Add config to control worker check in CelebornShuffleFallbackPolicyRunner --- .../celeborn/CelebornShuffleFallbackPolicyRunner.scala | 4 ++++ .../main/scala/org/apache/celeborn/common/CelebornConf.scala | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala index 6248e08ff..ddf06ab43 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala @@ -80,6 +80,10 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging { * @return if celeborn cluster has available workers. */ def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = { + if (!conf.workerEnabled) { + return true + } + val resp = lifecycleManager.checkWorkersAvailable() if (!resp.getAvailable) { logWarning(s"No workers available for current user ${lifecycleManager.getUserIdentifier}.") diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index c4aad0306..948c7fd3c 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -754,6 +754,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se // Quota // // ////////////////////////////////////////////////////// def quotaEnabled: Boolean = get(QUOTA_ENABLED) + def workerEnabled: Boolean = get(WORKER_ENABLED) def quotaIdentityProviderClass: String = get(QUOTA_IDENTITY_PROVIDER) def quotaManagerClass: String = get(QUOTA_MANAGER) def quotaConfigurationPath: Option[String] = get(QUOTA_CONFIGURATION_PATH) @@ -4161,6 +4162,16 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(true) + val WORKER_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.worker.enabled") + .categories("worker") + .doc("When true, before registering shuffle, LifecycleManager should check " + + "if current user have available workers, if cluster don't have available " + + "workers for current user, fallback to Spark's default shuffle") + .version("0.3.0") + .booleanConf + .createWithDefault(true) + val QUOTA_IDENTITY_PROVIDER: ConfigEntry[String] = buildConf("celeborn.quota.identity.provider") .categories("quota")
