This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c1837536a [CELEBORN-1267] Add config to control worker check in 
CelebornShuffleFallbackPolicyRunner
c1837536a is described below

commit c1837536a159454287e16664bc6be9b90a09a023
Author: xiyu.zk <[email protected]>
AuthorDate: Wed Feb 7 19:01:41 2024 +0800

    [CELEBORN-1267] Add config to control worker check in 
CelebornShuffleFallbackPolicyRunner
    
    ### What changes were proposed in this pull request?
    As title.
    
    ### Why are the changes needed?
    For some scenarios, if Celeborn cannot be used, users want to report an 
error directly instead of fallback.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    CI
    
    Closes #2291 from kerwin-zk/add-config.
    
    Authored-by: xiyu.zk <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../celeborn/CelebornShuffleFallbackPolicyRunner.scala        |  4 ++++
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala  | 11 +++++++++++
 docs/configuration/client.md                                  |  1 +
 3 files changed, 16 insertions(+)

diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 6248e08ff..20743467c 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -80,6 +80,10 @@ class CelebornShuffleFallbackPolicyRunner(conf: 
CelebornConf) extends Logging {
    * @return if celeborn cluster has available workers.
    */
   def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
+    if (!conf.checkWorkerEnabled) {
+      return true
+    }
+
     val resp = lifecycleManager.checkWorkersAvailable()
     if (!resp.getAvailable) {
       logWarning(s"No workers available for current user 
${lifecycleManager.getUserIdentifier}.")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c4aad0306..8ebba9f26 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -884,6 +884,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def shufflePartitionType: PartitionType = 
PartitionType.valueOf(get(SHUFFLE_PARTITION_TYPE))
   def shuffleRangeReadFilterEnabled: Boolean = 
get(SHUFFLE_RANGE_READ_FILTER_ENABLED)
   def shuffleForceFallbackEnabled: Boolean = 
get(SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED)
+  def checkWorkerEnabled: Boolean = get(CHECK_WORKER_ENABLED)
   def shuffleForceFallbackPartitionThreshold: Long =
     get(SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD)
   def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
@@ -4003,6 +4004,16 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val CHECK_WORKER_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.client.spark.shuffle.checkWorker.enabled")
+      .categories("client")
+      .doc("When true, before registering shuffle, LifecycleManager should 
check " +
+        "if current cluster have available workers, if cluster don't have 
available " +
+        "workers, fallback to Spark's default shuffle")
+      .version("0.5.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD: ConfigEntry[Long] =
     
buildConf("celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold")
       .withAlternative("celeborn.shuffle.forceFallback.numPartitionsThreshold")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 3b4b8836f..e52966c93 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -106,6 +106,7 @@ license: |
 | celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 | 
Threshold of shuffle partition number for dynamically switching push writer 
mode. When the shuffle partition number is greater than this value, use the 
sort-based shuffle writer for memory efficiency; otherwise use the hash-based 
shuffle writer for speed. This configuration only takes effect when 
celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 |  | 
 | celeborn.client.spark.push.sort.memory.threshold | 64m | When 
SortBasedPusher use memory over the threshold, will trigger push data. | 0.3.0 
| celeborn.push.sortMemory.threshold | 
 | celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | This is 
Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you 
have changed UnsafeRow's memory layout set this to false. | 0.2.2 |  | 
+| celeborn.client.spark.shuffle.checkWorker.enabled | true | When true, before 
registering shuffle, LifecycleManager should check if current cluster have 
available workers, if cluster don't have available workers, fallback to Spark's 
default shuffle | 0.5.0 |  | 
 | celeborn.client.spark.shuffle.forceFallback.enabled | false | Whether force 
fallback shuffle to Spark's default. | 0.3.0 | 
celeborn.shuffle.forceFallback.enabled | 
 | celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 
2147483647 | Celeborn will only accept shuffle of partition number lower than 
this configuration value. | 0.3.0 | 
celeborn.shuffle.forceFallback.numPartitionsThreshold | 
 | celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the 
following kind of shuffle writers. 1. hash: hash-based shuffle writer works 
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer 
works fine when memory pressure is high or shuffle partition count is huge. 
This configuration only takes effect when 
celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | 
celeborn.shuffle.writer | 

Reply via email to