This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c1837536a [CELEBORN-1267] Add config to control worker check in
CelebornShuffleFallbackPolicyRunner
c1837536a is described below
commit c1837536a159454287e16664bc6be9b90a09a023
Author: xiyu.zk <[email protected]>
AuthorDate: Wed Feb 7 19:01:41 2024 +0800
[CELEBORN-1267] Add config to control worker check in
CelebornShuffleFallbackPolicyRunner
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
For some scenarios, if Celeborn cannot be used, users want to report an
error directly instead of fallback.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI
Closes #2291 from kerwin-zk/add-config.
Authored-by: xiyu.zk <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../celeborn/CelebornShuffleFallbackPolicyRunner.scala | 4 ++++
.../main/scala/org/apache/celeborn/common/CelebornConf.scala | 11 +++++++++++
docs/configuration/client.md | 1 +
3 files changed, 16 insertions(+)
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 6248e08ff..20743467c 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -80,6 +80,10 @@ class CelebornShuffleFallbackPolicyRunner(conf:
CelebornConf) extends Logging {
* @return if celeborn cluster has available workers.
*/
def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
+ if (!conf.checkWorkerEnabled) {
+ return true
+ }
+
val resp = lifecycleManager.checkWorkersAvailable()
if (!resp.getAvailable) {
logWarning(s"No workers available for current user
${lifecycleManager.getUserIdentifier}.")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c4aad0306..8ebba9f26 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -884,6 +884,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def shufflePartitionType: PartitionType =
PartitionType.valueOf(get(SHUFFLE_PARTITION_TYPE))
def shuffleRangeReadFilterEnabled: Boolean =
get(SHUFFLE_RANGE_READ_FILTER_ENABLED)
def shuffleForceFallbackEnabled: Boolean =
get(SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED)
+ def checkWorkerEnabled: Boolean = get(CHECK_WORKER_ENABLED)
def shuffleForceFallbackPartitionThreshold: Long =
get(SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD)
def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
@@ -4003,6 +4004,16 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
+ val CHECK_WORKER_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.spark.shuffle.checkWorker.enabled")
+ .categories("client")
+ .doc("When true, before registering shuffle, LifecycleManager should
check " +
+ "if current cluster have available workers, if cluster don't have
available " +
+ "workers, fallback to Spark's default shuffle")
+ .version("0.5.0")
+ .booleanConf
+ .createWithDefault(true)
+
val SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold")
.withAlternative("celeborn.shuffle.forceFallback.numPartitionsThreshold")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 3b4b8836f..e52966c93 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -106,6 +106,7 @@ license: |
| celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 |
Threshold of shuffle partition number for dynamically switching push writer
mode. When the shuffle partition number is greater than this value, use the
sort-based shuffle writer for memory efficiency; otherwise use the hash-based
shuffle writer for speed. This configuration only takes effect when
celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 | |
| celeborn.client.spark.push.sort.memory.threshold | 64m | When
SortBasedPusher use memory over the threshold, will trigger push data. | 0.3.0
| celeborn.push.sortMemory.threshold |
| celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | This is
Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you
have changed UnsafeRow's memory layout set this to false. | 0.2.2 | |
+| celeborn.client.spark.shuffle.checkWorker.enabled | true | When true, before
registering shuffle, LifecycleManager should check if current cluster have
available workers, if cluster don't have available workers, fallback to Spark's
default shuffle | 0.5.0 | |
| celeborn.client.spark.shuffle.forceFallback.enabled | false | Whether force
fallback shuffle to Spark's default. | 0.3.0 |
celeborn.shuffle.forceFallback.enabled |
| celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold |
2147483647 | Celeborn will only accept shuffle of partition number lower than
this configuration value. | 0.3.0 |
celeborn.shuffle.forceFallback.numPartitionsThreshold |
| celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the
following kind of shuffle writers. 1. hash: hash-based shuffle writer works
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer
works fine when memory pressure is high or shuffle partition count is huge.
This configuration only takes effect when
celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 |
celeborn.shuffle.writer |