This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e66d509a9 [CELEBORN-1369][FOLLOWUP] Improve docs for shuffle fallback
policy
e66d509a9 is described below
commit e66d509a956a80af83cb0eba9271a4f6a458da1f
Author: Cheng Pan <[email protected]>
AuthorDate: Wed May 15 19:18:39 2024 +0800
[CELEBORN-1369][FOLLOWUP] Improve docs for shuffle fallback policy
### What changes were proposed in this pull request?
Improve docs for shuffle fallback policy
Rename a configuration
```patch
- celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold
+ celeborn.client.spark.shuffle.fallback.numPartitionsThreshold
````
### Why are the changes needed?
Canonicalize the words to "spark built-in shuffle implementation"
everywhere.
And `...forceFallback...` is confusing, use `...fallback...` with explicit
docs instead.
### Does this PR introduce _any_ user-facing change?
Deprecate a configuration but still effective.
### How was this patch tested?
Pass CI.
Closes #2494 from pan3793/CELEBORN-1369-followup.
Lead-authored-by: Cheng Pan <[email protected]>
Co-authored-by: Fu Chen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../CelebornShuffleFallbackPolicyRunner.scala | 40 ++++++++++++++--------
.../CelebornShuffleFallbackPolicyRunner.scala | 38 +++++++++++---------
.../org/apache/celeborn/common/CelebornConf.scala | 29 +++++++++-------
docs/configuration/client.md | 6 ++--
4 files changed, 67 insertions(+), 46 deletions(-)
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 24510ba15..48e0825e6 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -25,52 +25,57 @@ import org.apache.celeborn.common.protocol.FallbackPolicy
class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
private val shuffleFallbackPolicy = conf.shuffleFallbackPolicy
+ private val checkWorkerEnabled = conf.checkWorkerEnabled
+ private val quotaEnabled = conf.quotaEnabled
+ private val numPartitionsThreshold = conf.shuffleFallbackPartitionThreshold
def applyAllFallbackPolicy(lifecycleManager: LifecycleManager,
numPartitions: Int): Boolean = {
val needFallback =
applyForceFallbackPolicy() ||
applyShufflePartitionsFallbackPolicy(numPartitions) ||
!checkQuota(lifecycleManager) ||
!checkWorkersAvailable(lifecycleManager)
if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
- throw new CelebornIOException("Fallback to Spark's default shuffle is
prohibited.")
+ throw new CelebornIOException(
+ "Fallback to spark built-in shuffle implementation is prohibited.")
}
needFallback
}
/**
- * if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to
external shuffle
- * @return return if celeborn.client.spark.shuffle.fallback.policy is ALWAYS
+ * if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to
spark built-in shuffle implementation
+ * @return return true if celeborn.client.spark.shuffle.fallback.policy is
ALWAYS, otherwise false
*/
def applyForceFallbackPolicy(): Boolean = {
if (FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)) {
logWarning(
- s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is
${FallbackPolicy.ALWAYS.name}, which will force fallback.")
+ s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is
${FallbackPolicy.ALWAYS.name}, " +
+ s"forcibly fallback to spark built-in shuffle implementation.")
}
FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)
}
/**
- * if shuffle partitions >
celeborn.shuffle.forceFallback.numPartitionsThreshold, fallback to external
shuffle
+ * if shuffle partitions > celeborn.shuffle.fallback.numPartitionsThreshold,
fallback to spark built-in
+ * shuffle implementation
* @param numPartitions shuffle partitions
- * @return return if shuffle partitions bigger than limit
+ * @return return true if shuffle partitions bigger than limit, otherwise
false
*/
def applyShufflePartitionsFallbackPolicy(numPartitions: Int): Boolean = {
- val confNumPartitions = conf.shuffleForceFallbackPartitionThreshold
- val needFallback = numPartitions >= confNumPartitions
+ val needFallback = numPartitions >= numPartitionsThreshold
if (needFallback) {
- logWarning(s"Shuffle num of partitions: $numPartitions" +
- s" is bigger than the limit: $confNumPartitions," +
- s" need fallback to spark shuffle")
+ logWarning(
+ s"Shuffle partition number: $numPartitions exceeds threshold:
$numPartitionsThreshold, " +
+ "need to fallback to spark built-in shuffle implementation.")
}
needFallback
}
/**
- * If celeborn cluster is exceed current user's quota, fallback to external
shuffle
+ * If celeborn cluster is exceed current user's quota, fallback to spark
built-in shuffle implementation
*
* @return if celeborn cluster have available space for current user
*/
def checkQuota(lifecycleManager: LifecycleManager): Boolean = {
- if (!conf.quotaEnabled) {
+ if (!quotaEnabled) {
return true
}
@@ -83,14 +88,19 @@ class CelebornShuffleFallbackPolicyRunner(conf:
CelebornConf) extends Logging {
}
/**
- * If celeborn cluster has no available workers, fallback to external
shuffle.
+ * If celeborn cluster has no available workers, fallback to spark built-in
shuffle implementation
*
* @return if celeborn cluster has available workers.
*/
def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
+ if (!checkWorkerEnabled) {
+ return true
+ }
+
val resp = lifecycleManager.checkWorkersAvailable()
if (!resp.getAvailable) {
- logWarning(s"No workers available for current user
${lifecycleManager.getUserIdentifier}.")
+ logWarning(
+ s"No celeborn workers available for current user
${lifecycleManager.getUserIdentifier}.")
}
resp.getAvailable
}
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 0147f6cfc..48e0825e6 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -25,52 +25,57 @@ import org.apache.celeborn.common.protocol.FallbackPolicy
class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
private val shuffleFallbackPolicy = conf.shuffleFallbackPolicy
+ private val checkWorkerEnabled = conf.checkWorkerEnabled
+ private val quotaEnabled = conf.quotaEnabled
+ private val numPartitionsThreshold = conf.shuffleFallbackPartitionThreshold
def applyAllFallbackPolicy(lifecycleManager: LifecycleManager,
numPartitions: Int): Boolean = {
val needFallback =
applyForceFallbackPolicy() ||
applyShufflePartitionsFallbackPolicy(numPartitions) ||
!checkQuota(lifecycleManager) ||
!checkWorkersAvailable(lifecycleManager)
if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
- throw new CelebornIOException("Fallback to Spark's default shuffle is
prohibited.")
+ throw new CelebornIOException(
+ "Fallback to spark built-in shuffle implementation is prohibited.")
}
needFallback
}
/**
- * if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to
external shuffle
- * @return return if celeborn.client.spark.shuffle.fallback.policy is ALWAYS
+ * if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to
spark built-in shuffle implementation
+ * @return return true if celeborn.client.spark.shuffle.fallback.policy is
ALWAYS, otherwise false
*/
def applyForceFallbackPolicy(): Boolean = {
if (FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)) {
logWarning(
- s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is
${FallbackPolicy.ALWAYS.name}, which will force fallback.")
+ s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is
${FallbackPolicy.ALWAYS.name}, " +
+ s"forcibly fallback to spark built-in shuffle implementation.")
}
FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)
}
/**
- * if shuffle partitions >
celeborn.shuffle.forceFallback.numPartitionsThreshold, fallback to external
shuffle
+ * if shuffle partitions > celeborn.shuffle.fallback.numPartitionsThreshold,
fallback to spark built-in
+ * shuffle implementation
* @param numPartitions shuffle partitions
- * @return return if shuffle partitions bigger than limit
+ * @return return true if shuffle partitions bigger than limit, otherwise
false
*/
def applyShufflePartitionsFallbackPolicy(numPartitions: Int): Boolean = {
- val confNumPartitions = conf.shuffleForceFallbackPartitionThreshold
- val needFallback = numPartitions >= confNumPartitions
+ val needFallback = numPartitions >= numPartitionsThreshold
if (needFallback) {
- logWarning(s"Shuffle num of partitions: $numPartitions" +
- s" is bigger than the limit: $confNumPartitions," +
- s" need fallback to spark shuffle")
+ logWarning(
+ s"Shuffle partition number: $numPartitions exceeds threshold:
$numPartitionsThreshold, " +
+ "need to fallback to spark built-in shuffle implementation.")
}
needFallback
}
/**
- * If celeborn cluster is exceed current user's quota, fallback to external
shuffle
+ * If celeborn cluster is exceed current user's quota, fallback to spark
built-in shuffle implementation
*
* @return if celeborn cluster have available space for current user
*/
def checkQuota(lifecycleManager: LifecycleManager): Boolean = {
- if (!conf.quotaEnabled) {
+ if (!quotaEnabled) {
return true
}
@@ -83,18 +88,19 @@ class CelebornShuffleFallbackPolicyRunner(conf:
CelebornConf) extends Logging {
}
/**
- * If celeborn cluster has no available workers, fallback to external
shuffle.
+ * If celeborn cluster has no available workers, fallback to spark built-in
shuffle implementation
*
* @return if celeborn cluster has available workers.
*/
def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
- if (!conf.checkWorkerEnabled) {
+ if (!checkWorkerEnabled) {
return true
}
val resp = lifecycleManager.checkWorkersAvailable()
if (!resp.getAvailable) {
- logWarning(s"No workers available for current user
${lifecycleManager.getUserIdentifier}.")
+ logWarning(
+ s"No celeborn workers available for current user
${lifecycleManager.getUserIdentifier}.")
}
resp.getAvailable
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index d43dab053..13eaed0b9 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1012,8 +1012,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
}
}
- def shuffleForceFallbackPartitionThreshold: Long =
- get(SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD)
+ def shuffleFallbackPartitionThreshold: Long =
get(SPARK_SHUFFLE_FALLBACK_PARTITION_THRESHOLD)
def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
def shuffleManagerPort: Int = get(CLIENT_SHUFFLE_MANAGER_PORT)
def shuffleChunkSize: Long = get(SHUFFLE_CHUNK_SIZE)
@@ -4379,10 +4378,13 @@ object CelebornConf extends Logging {
buildConf("celeborn.client.spark.shuffle.fallback.policy")
.categories("client")
.version("0.5.0")
- .doc(
- s"Celeborn supports the following kind of fallback policies. 1.
${FallbackPolicy.ALWAYS.name}: force fallback shuffle to Spark's default; " +
- s"2. ${FallbackPolicy.AUTO.name}: consider other factors like
availability of enough workers and quota, or whether shuffle of partition
number is lower than
celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold; " +
- s"3. ${FallbackPolicy.NEVER.name}: the job will fail if it is
concluded that fallback is required based on factors above.")
+ .doc("Celeborn supports the following kind of fallback policies. " +
+ s"1. ${FallbackPolicy.ALWAYS.name}: always use spark built-in shuffle
implementation; " +
+ s"2. ${FallbackPolicy.AUTO.name}: prefer to use celeborn shuffle
implementation, and fallback to use spark " +
+ "built-in shuffle implementation based on certain factors, e.g.
availability of enough workers and quota, " +
+ "shuffle partition number; " +
+ s"3. ${FallbackPolicy.NEVER.name}: always use celeborn shuffle
implementation, and fail fast when it it is " +
+ "concluded that fallback is required based on factors above.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set(
@@ -4396,17 +4398,20 @@ object CelebornConf extends Logging {
.withAlternative("celeborn.shuffle.forceFallback.enabled")
.categories("client")
.version("0.3.0")
- .doc(s"Whether force fallback shuffle to Spark's default. This
configuration only takes effect when
${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is
${FallbackPolicy.AUTO.name}.")
+ .doc("Always use spark built-in shuffle implementation. This
configuration is deprecated, " +
+ s"consider configuring
`${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key}` instead.")
.booleanConf
.createWithDefault(false)
- val SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD: ConfigEntry[Long] =
-
buildConf("celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold")
+ val SPARK_SHUFFLE_FALLBACK_PARTITION_THRESHOLD: ConfigEntry[Long] =
+ buildConf("celeborn.client.spark.shuffle.fallback.numPartitionsThreshold")
.withAlternative("celeborn.shuffle.forceFallback.numPartitionsThreshold")
+
.withAlternative("celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold")
.categories("client")
- .version("0.3.0")
- .doc(
- s"Celeborn will only accept shuffle of partition number lower than
this configuration value. This configuration only takes effect when
${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is
${FallbackPolicy.AUTO.name}.")
+ .version("0.5.0")
+ .doc("Celeborn will only accept shuffle of partition number lower than
this configuration value. " +
+ s"This configuration only takes effect when
`${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key}` " +
+ s"is `${FallbackPolicy.AUTO.name}`.")
.longConf
.createWithDefault(Int.MaxValue)
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 314cd90ae..0fe90d0ec 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -109,9 +109,9 @@ license: |
| celeborn.client.spark.push.sort.memory.useAdaptiveThreshold | false | false
| Adaptively adjust sort-based shuffle writer's memory threshold | 0.5.0 | |
| celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | false | This
is Celeborn's optimization on UnsafeRow for Spark and it's true by default. If
you have changed UnsafeRow's memory layout set this to false. | 0.2.2 | |
| celeborn.client.spark.shuffle.checkWorker.enabled | true | false | When
true, before registering shuffle, LifecycleManager should check if current
cluster have available workers, if cluster don't have available workers,
fallback to Spark's default shuffle | 0.5.0 | |
-| celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn
supports the following kind of fallback policies. 1. ALWAYS: force fallback
shuffle to Spark's default; 2. AUTO: consider other factors like availability
of enough workers and quota, or whether shuffle of partition number is lower
than celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold; 3.
NEVER: the job will fail if it is concluded that fallback is required based on
factors above. | 0.5.0 | |
-| celeborn.client.spark.shuffle.forceFallback.enabled | false | false |
Whether force fallback shuffle to Spark's default. This configuration only
takes effect when celeborn.client.spark.shuffle.fallback.policy is AUTO. |
0.3.0 | celeborn.shuffle.forceFallback.enabled |
-| celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold |
2147483647 | false | Celeborn will only accept shuffle of partition number
lower than this configuration value. This configuration only takes effect when
celeborn.client.spark.shuffle.fallback.policy is AUTO. | 0.3.0 |
celeborn.shuffle.forceFallback.numPartitionsThreshold |
+| celeborn.client.spark.shuffle.fallback.numPartitionsThreshold | 2147483647 |
false | Celeborn will only accept shuffle of partition number lower than this
configuration value. This configuration only takes effect when
`celeborn.client.spark.shuffle.fallback.policy` is `AUTO`. | 0.5.0 |
celeborn.shuffle.forceFallback.numPartitionsThreshold,celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold
|
+| celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn
supports the following kind of fallback policies. 1. ALWAYS: always use spark
built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle
implementation, and fallback to use spark built-in shuffle implementation based
on certain factors, e.g. availability of enough workers and quota, shuffle
partition number; 3. NEVER: always use celeborn shuffle implementation, and
fail fast when it it is concluded th [...]
+| celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always
use spark built-in shuffle implementation. This configuration is deprecated,
consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. |
0.3.0 | celeborn.shuffle.forceFallback.enabled |
| celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the
following kind of shuffle writers. 1. hash: hash-based shuffle writer works
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer
works fine when memory pressure is high or shuffle partition count is huge.
This configuration only takes effect when
celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 |
celeborn.shuffle.writer |
| celeborn.master.endpoints | <localhost>:9097 | false | Endpoints of
master nodes for celeborn client to connect, allowed pattern is:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used. | 0.2.0 | |
| celeborn.quota.enabled | true | false | When Master side sets to true, the
master will enable to check the quota via QuotaManager. When Client side sets
to true, LifecycleManager will request Master side to check whether the current
user has enough quota before registration of shuffle. Fallback to the default
shuffle service of Spark when Master side checks that there is no enough quota
for current user. | 0.2.0 | |