This is an automated email from the ASF dual-hosted git repository. kerwinzhang pushed a commit to branch add-config in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 4d176ec470b00986561a9674af05f0e37bc35d0c Author: xiyu.zk <[email protected]> AuthorDate: Wed Feb 7 17:33:27 2024 +0800 [CELEBORN-1267] Add config to control worker check in CelebornShuffleFallbackPolicyRunner --- .../celeborn/CelebornShuffleFallbackPolicyRunner.scala | 4 ++++ .../main/scala/org/apache/celeborn/common/CelebornConf.scala | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala index 6248e08ff..20743467c 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala @@ -80,6 +80,10 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging { * @return if celeborn cluster has available workers. */ def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = { + if (!conf.checkWorkerEnabled) { + return true + } + val resp = lifecycleManager.checkWorkersAvailable() if (!resp.getAvailable) { logWarning(s"No workers available for current user ${lifecycleManager.getUserIdentifier}.") diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index c4aad0306..8ebba9f26 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -884,6 +884,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def shufflePartitionType: PartitionType = PartitionType.valueOf(get(SHUFFLE_PARTITION_TYPE)) def shuffleRangeReadFilterEnabled: Boolean = get(SHUFFLE_RANGE_READ_FILTER_ENABLED) def shuffleForceFallbackEnabled: Boolean = get(SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED) + def checkWorkerEnabled: Boolean = get(CHECK_WORKER_ENABLED) def shuffleForceFallbackPartitionThreshold: Long = get(SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD) def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL) @@ -4003,6 +4004,16 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val CHECK_WORKER_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.client.spark.shuffle.checkWorker.enabled") + .categories("client") + .doc("When true, before registering shuffle, LifecycleManager should check " + + "if current cluster have available workers, if cluster don't have available " + + "workers, fallback to Spark's default shuffle") + .version("0.5.0") + .booleanConf + .createWithDefault(true) + val SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD: ConfigEntry[Long] = buildConf("celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold") .withAlternative("celeborn.shuffle.forceFallback.numPartitionsThreshold")
