This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e66d509a9 [CELEBORN-1369][FOLLOWUP] Improve docs for shuffle fallback 
policy
e66d509a9 is described below

commit e66d509a956a80af83cb0eba9271a4f6a458da1f
Author: Cheng Pan <[email protected]>
AuthorDate: Wed May 15 19:18:39 2024 +0800

    [CELEBORN-1369][FOLLOWUP] Improve docs for shuffle fallback policy
    
    ### What changes were proposed in this pull request?
    
    Improve docs for shuffle fallback policy
    
    Rename a configuration
    
    ```patch
    - celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold
    + celeborn.client.spark.shuffle.fallback.numPartitionsThreshold
    ````
    
    ### Why are the changes needed?
    
    Canonicalize the words to "spark built-in shuffle implementation" 
everywhere.
    
    And `...forceFallback...` is confusing, use `...fallback...` with explicit 
docs instead.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Deprecate a configuration but still effective.
    
    ### How was this patch tested?
    
    Pass CI.
    
    Closes #2494 from pan3793/CELEBORN-1369-followup.
    
    Lead-authored-by: Cheng Pan <[email protected]>
    Co-authored-by: Fu Chen <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../CelebornShuffleFallbackPolicyRunner.scala      | 40 ++++++++++++++--------
 .../CelebornShuffleFallbackPolicyRunner.scala      | 38 +++++++++++---------
 .../org/apache/celeborn/common/CelebornConf.scala  | 29 +++++++++-------
 docs/configuration/client.md                       |  6 ++--
 4 files changed, 67 insertions(+), 46 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 24510ba15..48e0825e6 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -25,52 +25,57 @@ import org.apache.celeborn.common.protocol.FallbackPolicy
 
 class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
   private val shuffleFallbackPolicy = conf.shuffleFallbackPolicy
+  private val checkWorkerEnabled = conf.checkWorkerEnabled
+  private val quotaEnabled = conf.quotaEnabled
+  private val numPartitionsThreshold = conf.shuffleFallbackPartitionThreshold
 
   def applyAllFallbackPolicy(lifecycleManager: LifecycleManager, 
numPartitions: Int): Boolean = {
     val needFallback =
       applyForceFallbackPolicy() || 
applyShufflePartitionsFallbackPolicy(numPartitions) ||
         !checkQuota(lifecycleManager) || 
!checkWorkersAvailable(lifecycleManager)
     if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
-      throw new CelebornIOException("Fallback to Spark's default shuffle is 
prohibited.")
+      throw new CelebornIOException(
+        "Fallback to spark built-in shuffle implementation is prohibited.")
     }
     needFallback
   }
 
   /**
-   * if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to 
external shuffle
-   * @return return if celeborn.client.spark.shuffle.fallback.policy is ALWAYS
+   * if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to 
spark built-in shuffle implementation
+   * @return return true if celeborn.client.spark.shuffle.fallback.policy is 
ALWAYS, otherwise false
    */
   def applyForceFallbackPolicy(): Boolean = {
     if (FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)) {
       logWarning(
-        s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is 
${FallbackPolicy.ALWAYS.name}, which will force fallback.")
+        s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is 
${FallbackPolicy.ALWAYS.name}, " +
+          s"forcibly fallback to spark built-in shuffle implementation.")
     }
     FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)
   }
 
   /**
-   * if shuffle partitions > 
celeborn.shuffle.forceFallback.numPartitionsThreshold, fallback to external 
shuffle
+   * if shuffle partitions > celeborn.shuffle.fallback.numPartitionsThreshold, 
fallback to spark built-in
+   * shuffle implementation
    * @param numPartitions shuffle partitions
-   * @return return if shuffle partitions bigger than limit
+   * @return return true if shuffle partitions bigger than limit, otherwise 
false
    */
   def applyShufflePartitionsFallbackPolicy(numPartitions: Int): Boolean = {
-    val confNumPartitions = conf.shuffleForceFallbackPartitionThreshold
-    val needFallback = numPartitions >= confNumPartitions
+    val needFallback = numPartitions >= numPartitionsThreshold
     if (needFallback) {
-      logWarning(s"Shuffle num of partitions: $numPartitions" +
-        s" is bigger than the limit: $confNumPartitions," +
-        s" need fallback to spark shuffle")
+      logWarning(
+        s"Shuffle partition number: $numPartitions exceeds threshold: 
$numPartitionsThreshold, " +
+          "need to fallback to spark built-in shuffle implementation.")
     }
     needFallback
   }
 
   /**
-   * If celeborn cluster is exceed current user's quota, fallback to external 
shuffle
+   * If celeborn cluster is exceed current user's quota, fallback to spark 
built-in shuffle implementation
    *
    * @return if celeborn cluster have available space for current user
    */
   def checkQuota(lifecycleManager: LifecycleManager): Boolean = {
-    if (!conf.quotaEnabled) {
+    if (!quotaEnabled) {
       return true
     }
 
@@ -83,14 +88,19 @@ class CelebornShuffleFallbackPolicyRunner(conf: 
CelebornConf) extends Logging {
   }
 
   /**
-   * If celeborn cluster has no available workers, fallback to external 
shuffle.
+   * If celeborn cluster has no available workers, fallback to spark built-in 
shuffle implementation
    *
    * @return if celeborn cluster has available workers.
    */
   def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
+    if (!checkWorkerEnabled) {
+      return true
+    }
+
     val resp = lifecycleManager.checkWorkersAvailable()
     if (!resp.getAvailable) {
-      logWarning(s"No workers available for current user 
${lifecycleManager.getUserIdentifier}.")
+      logWarning(
+        s"No celeborn workers available for current user 
${lifecycleManager.getUserIdentifier}.")
     }
     resp.getAvailable
   }
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 0147f6cfc..48e0825e6 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -25,52 +25,57 @@ import org.apache.celeborn.common.protocol.FallbackPolicy
 
 class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
   private val shuffleFallbackPolicy = conf.shuffleFallbackPolicy
+  private val checkWorkerEnabled = conf.checkWorkerEnabled
+  private val quotaEnabled = conf.quotaEnabled
+  private val numPartitionsThreshold = conf.shuffleFallbackPartitionThreshold
 
   def applyAllFallbackPolicy(lifecycleManager: LifecycleManager, 
numPartitions: Int): Boolean = {
     val needFallback =
       applyForceFallbackPolicy() || 
applyShufflePartitionsFallbackPolicy(numPartitions) ||
         !checkQuota(lifecycleManager) || 
!checkWorkersAvailable(lifecycleManager)
     if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
-      throw new CelebornIOException("Fallback to Spark's default shuffle is 
prohibited.")
+      throw new CelebornIOException(
+        "Fallback to spark built-in shuffle implementation is prohibited.")
     }
     needFallback
   }
 
   /**
-   * if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to 
external shuffle
-   * @return return if celeborn.client.spark.shuffle.fallback.policy is ALWAYS
+   * if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to 
spark built-in shuffle implementation
+   * @return return true if celeborn.client.spark.shuffle.fallback.policy is 
ALWAYS, otherwise false
    */
   def applyForceFallbackPolicy(): Boolean = {
     if (FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)) {
       logWarning(
-        s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is 
${FallbackPolicy.ALWAYS.name}, which will force fallback.")
+        s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is 
${FallbackPolicy.ALWAYS.name}, " +
+          s"forcibly fallback to spark built-in shuffle implementation.")
     }
     FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)
   }
 
   /**
-   * if shuffle partitions > 
celeborn.shuffle.forceFallback.numPartitionsThreshold, fallback to external 
shuffle
+   * if shuffle partitions > celeborn.shuffle.fallback.numPartitionsThreshold, 
fallback to spark built-in
+   * shuffle implementation
    * @param numPartitions shuffle partitions
-   * @return return if shuffle partitions bigger than limit
+   * @return return true if shuffle partitions bigger than limit, otherwise 
false
    */
   def applyShufflePartitionsFallbackPolicy(numPartitions: Int): Boolean = {
-    val confNumPartitions = conf.shuffleForceFallbackPartitionThreshold
-    val needFallback = numPartitions >= confNumPartitions
+    val needFallback = numPartitions >= numPartitionsThreshold
     if (needFallback) {
-      logWarning(s"Shuffle num of partitions: $numPartitions" +
-        s" is bigger than the limit: $confNumPartitions," +
-        s" need fallback to spark shuffle")
+      logWarning(
+        s"Shuffle partition number: $numPartitions exceeds threshold: 
$numPartitionsThreshold, " +
+          "need to fallback to spark built-in shuffle implementation.")
     }
     needFallback
   }
 
   /**
-   * If celeborn cluster is exceed current user's quota, fallback to external 
shuffle
+   * If celeborn cluster is exceed current user's quota, fallback to spark 
built-in shuffle implementation
    *
    * @return if celeborn cluster have available space for current user
    */
   def checkQuota(lifecycleManager: LifecycleManager): Boolean = {
-    if (!conf.quotaEnabled) {
+    if (!quotaEnabled) {
       return true
     }
 
@@ -83,18 +88,19 @@ class CelebornShuffleFallbackPolicyRunner(conf: 
CelebornConf) extends Logging {
   }
 
   /**
-   * If celeborn cluster has no available workers, fallback to external 
shuffle.
+   * If celeborn cluster has no available workers, fallback to spark built-in 
shuffle implementation
    *
    * @return if celeborn cluster has available workers.
    */
   def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
-    if (!conf.checkWorkerEnabled) {
+    if (!checkWorkerEnabled) {
       return true
     }
 
     val resp = lifecycleManager.checkWorkersAvailable()
     if (!resp.getAvailable) {
-      logWarning(s"No workers available for current user 
${lifecycleManager.getUserIdentifier}.")
+      logWarning(
+        s"No celeborn workers available for current user 
${lifecycleManager.getUserIdentifier}.")
     }
     resp.getAvailable
   }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index d43dab053..13eaed0b9 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1012,8 +1012,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
     }
   }
 
-  def shuffleForceFallbackPartitionThreshold: Long =
-    get(SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD)
+  def shuffleFallbackPartitionThreshold: Long = 
get(SPARK_SHUFFLE_FALLBACK_PARTITION_THRESHOLD)
   def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
   def shuffleManagerPort: Int = get(CLIENT_SHUFFLE_MANAGER_PORT)
   def shuffleChunkSize: Long = get(SHUFFLE_CHUNK_SIZE)
@@ -4379,10 +4378,13 @@ object CelebornConf extends Logging {
     buildConf("celeborn.client.spark.shuffle.fallback.policy")
       .categories("client")
       .version("0.5.0")
-      .doc(
-        s"Celeborn supports the following kind of fallback policies. 1. 
${FallbackPolicy.ALWAYS.name}: force fallback shuffle to Spark's default; " +
-          s"2. ${FallbackPolicy.AUTO.name}: consider other factors like 
availability of enough workers and quota, or whether shuffle of partition 
number is lower than 
celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold; " +
-          s"3. ${FallbackPolicy.NEVER.name}: the job will fail if it is 
concluded that fallback is required based on factors above.")
+      .doc("Celeborn supports the following kind of fallback policies. " +
+        s"1. ${FallbackPolicy.ALWAYS.name}: always use spark built-in shuffle 
implementation; " +
+        s"2. ${FallbackPolicy.AUTO.name}: prefer to use celeborn shuffle 
implementation, and fallback to use spark " +
+        "built-in shuffle implementation based on certain factors, e.g. 
availability of enough workers and quota, " +
+        "shuffle partition number; " +
+        s"3. ${FallbackPolicy.NEVER.name}: always use celeborn shuffle 
implementation, and fail fast when it it is " +
+        "concluded that fallback is required based on factors above.")
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .checkValues(Set(
@@ -4396,17 +4398,20 @@ object CelebornConf extends Logging {
       .withAlternative("celeborn.shuffle.forceFallback.enabled")
       .categories("client")
       .version("0.3.0")
-      .doc(s"Whether force fallback shuffle to Spark's default. This 
configuration only takes effect when 
${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is 
${FallbackPolicy.AUTO.name}.")
+      .doc("Always use spark built-in shuffle implementation. This 
configuration is deprecated, " +
+        s"consider configuring 
`${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key}` instead.")
       .booleanConf
       .createWithDefault(false)
 
-  val SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD: ConfigEntry[Long] =
-    
buildConf("celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold")
+  val SPARK_SHUFFLE_FALLBACK_PARTITION_THRESHOLD: ConfigEntry[Long] =
+    buildConf("celeborn.client.spark.shuffle.fallback.numPartitionsThreshold")
       .withAlternative("celeborn.shuffle.forceFallback.numPartitionsThreshold")
+      
.withAlternative("celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold")
       .categories("client")
-      .version("0.3.0")
-      .doc(
-        s"Celeborn will only accept shuffle of partition number lower than 
this configuration value. This configuration only takes effect when 
${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is 
${FallbackPolicy.AUTO.name}.")
+      .version("0.5.0")
+      .doc("Celeborn will only accept shuffle of partition number lower than 
this configuration value. " +
+        s"This configuration only takes effect when 
`${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key}` " +
+        s"is `${FallbackPolicy.AUTO.name}`.")
       .longConf
       .createWithDefault(Int.MaxValue)
 
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 314cd90ae..0fe90d0ec 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -109,9 +109,9 @@ license: |
 | celeborn.client.spark.push.sort.memory.useAdaptiveThreshold | false | false 
| Adaptively adjust sort-based shuffle writer's memory threshold | 0.5.0 |  | 
 | celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | false | This 
is Celeborn's optimization on UnsafeRow for Spark and it's true by default. If 
you have changed UnsafeRow's memory layout set this to false. | 0.2.2 |  | 
 | celeborn.client.spark.shuffle.checkWorker.enabled | true | false | When 
true, before registering shuffle, LifecycleManager should check if current 
cluster have available workers, if cluster don't have available workers, 
fallback to Spark's default shuffle | 0.5.0 |  | 
-| celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn 
supports the following kind of fallback policies. 1. ALWAYS: force fallback 
shuffle to Spark's default; 2. AUTO: consider other factors like availability 
of enough workers and quota, or whether shuffle of partition number is lower 
than celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold; 3. 
NEVER: the job will fail if it is concluded that fallback is required based on 
factors above. | 0.5.0 |  | 
-| celeborn.client.spark.shuffle.forceFallback.enabled | false | false | 
Whether force fallback shuffle to Spark's default. This configuration only 
takes effect when celeborn.client.spark.shuffle.fallback.policy is AUTO. | 
0.3.0 | celeborn.shuffle.forceFallback.enabled | 
-| celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 
2147483647 | false | Celeborn will only accept shuffle of partition number 
lower than this configuration value. This configuration only takes effect when 
celeborn.client.spark.shuffle.fallback.policy is AUTO. | 0.3.0 | 
celeborn.shuffle.forceFallback.numPartitionsThreshold | 
+| celeborn.client.spark.shuffle.fallback.numPartitionsThreshold | 2147483647 | 
false | Celeborn will only accept shuffle of partition number lower than this 
configuration value. This configuration only takes effect when 
`celeborn.client.spark.shuffle.fallback.policy` is `AUTO`. | 0.5.0 | 
celeborn.shuffle.forceFallback.numPartitionsThreshold,celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold
 | 
+| celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn 
supports the following kind of fallback policies. 1. ALWAYS: always use spark 
built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle 
implementation, and fallback to use spark built-in shuffle implementation based 
on certain factors, e.g. availability of enough workers and quota, shuffle 
partition number; 3. NEVER: always use celeborn shuffle implementation, and 
fail fast when it it is concluded th [...]
+| celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always 
use spark built-in shuffle implementation. This configuration is deprecated, 
consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. | 
0.3.0 | celeborn.shuffle.forceFallback.enabled | 
 | celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the 
following kind of shuffle writers. 1. hash: hash-based shuffle writer works 
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer 
works fine when memory pressure is high or shuffle partition count is huge. 
This configuration only takes effect when 
celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | 
celeborn.shuffle.writer | 
 | celeborn.master.endpoints | &lt;localhost&gt;:9097 | false | Endpoints of 
master nodes for celeborn client to connect, allowed pattern is: 
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If 
the port is omitted, 9097 will be used. | 0.2.0 |  | 
 | celeborn.quota.enabled | true | false | When Master side sets to true, the 
master will enable to check the quota via QuotaManager. When Client side sets 
to true, LifecycleManager will request Master side to check whether the current 
user has enough quota before registration of shuffle. Fallback to the default 
shuffle service of Spark when Master side checks that there is no enough quota 
for current user. | 0.2.0 |  | 

Reply via email to