This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 259dfcd98 [CELEBORN-1621][FOLLOWUP] Support enabling worker tags via
config
259dfcd98 is described below
commit 259dfcd988408e64f83e539b6469b8f87ca4a9f7
Author: Sanskar Modi <[email protected]>
AuthorDate: Thu Nov 28 11:22:35 2024 +0800
[CELEBORN-1621][FOLLOWUP] Support enabling worker tags via config
### What changes were proposed in this pull request?
- Adding support to enable/disable worker tags feature by a master config
flag.
- Fixed BUG: After this change #2936, admins can also define the tagsExpr
for users. In a case user is passing an empty tagsExpr current code will ignore
the admin defined tagsExpr and allow job to use all workers.
### Why are the changes needed?
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn
### Does this PR introduce _any_ user-facing change?
NA
### How was this patch tested?
Existing UTs
Closes #2953 from s0nskar/tags-enabled.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../src/main/scala/org/apache/celeborn/common/CelebornConf.scala | 9 +++++++++
docs/configuration/master.md | 1 +
.../scala/org/apache/celeborn/service/deploy/master/Master.scala | 2 +-
3 files changed, 11 insertions(+), 1 deletion(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c8274ae80..c914e404b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1346,6 +1346,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)
+ def tagsEnabled: Boolean = get(TAGS_ENABLED)
def tagsExpr: String = get(TAGS_EXPR)
def preferClientTagsExpr: Boolean = get(PREFER_CLIENT_TAGS_EXPR)
@@ -5975,6 +5976,14 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
+ val TAGS_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.tags.enabled")
+ .categories("master")
+ .doc("Whether to enable tags for workers.")
+ .version("0.6.0")
+ .booleanConf
+ .createWithDefault(true)
+
val TAGS_EXPR: ConfigEntry[String] =
buildConf("celeborn.tags.tagsExpr")
.categories("master", "client")
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index dfef34107..bdb9a791a 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -91,6 +91,7 @@ license: |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key
for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.tags.enabled | true | false | Whether to enable tags for workers. |
0.6.0 | |
| celeborn.tags.preferClientTagsExpr | false | true | When `true`, prefer the
tags expression provided by the client over the tags expression provided by the
master. | 0.6.0 | |
| celeborn.tags.tagsExpr | | true | Expression to filter workers by tags. The
expression is a comma-separated list of tags. The expression is evaluated as a
logical AND of all tags. For example, `prod,high-io` filters workers that have
both the `prod` and `high-io` tags. | 0.6.0 | |
<!--end-include-->
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 2859ce3c5..e0328ac6d 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -857,7 +857,7 @@ private[celeborn] class Master(
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId,
requestSlots.shuffleId)
var availableWorkers = workersAvailable(requestSlots.excludedWorkerSet)
- if (requestSlots.tagsExpr.nonEmpty) {
+ if (conf.tagsEnabled) {
availableWorkers = tagsManager.getTaggedWorkers(
requestSlots.userIdentifier,
requestSlots.tagsExpr,