This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 259dfcd98 [CELEBORN-1621][FOLLOWUP] Support enabling worker tags via 
config
259dfcd98 is described below

commit 259dfcd988408e64f83e539b6469b8f87ca4a9f7
Author: Sanskar Modi <[email protected]>
AuthorDate: Thu Nov 28 11:22:35 2024 +0800

    [CELEBORN-1621][FOLLOWUP] Support enabling worker tags via config
    
    ### What changes were proposed in this pull request?
    
    - Adding support to enable/disable worker tags feature by a master config 
flag.
    - Fixed BUG: After this change #2936, admins can also define the tagsExpr 
for users. In a case user is passing an empty tagsExpr current code will ignore 
the admin defined tagsExpr and allow job to use all workers.
    
    ### Why are the changes needed?
    
    
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn
    
    ### Does this PR introduce _any_ user-facing change?
    
    NA
    
    ### How was this patch tested?
    Existing UTs
    
    Closes #2953 from s0nskar/tags-enabled.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../src/main/scala/org/apache/celeborn/common/CelebornConf.scala | 9 +++++++++
 docs/configuration/master.md                                     | 1 +
 .../scala/org/apache/celeborn/service/deploy/master/Master.scala | 2 +-
 3 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c8274ae80..c914e404b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1346,6 +1346,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
   def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)
 
+  def tagsEnabled: Boolean = get(TAGS_ENABLED)
   def tagsExpr: String = get(TAGS_EXPR)
   def preferClientTagsExpr: Boolean = get(PREFER_CLIENT_TAGS_EXPR)
 
@@ -5975,6 +5976,14 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val TAGS_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.tags.enabled")
+      .categories("master")
+      .doc("Whether to enable tags for workers.")
+      .version("0.6.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val TAGS_EXPR: ConfigEntry[String] =
     buildConf("celeborn.tags.tagsExpr")
       .categories("master", "client")
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index dfef34107..bdb9a791a 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -91,6 +91,7 @@ license: |
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.tags.enabled | true | false | Whether to enable tags for workers. | 
0.6.0 |  | 
 | celeborn.tags.preferClientTagsExpr | false | true | When `true`, prefer the 
tags expression provided by the client over the tags expression provided by the 
master. | 0.6.0 |  | 
 | celeborn.tags.tagsExpr |  | true | Expression to filter workers by tags. The 
expression is a comma-separated list of tags. The expression is evaluated as a 
logical AND of all tags. For example, `prod,high-io` filters workers that have 
both the `prod` and `high-io` tags. | 0.6.0 |  | 
 <!--end-include-->
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 2859ce3c5..e0328ac6d 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -857,7 +857,7 @@ private[celeborn] class Master(
     val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, 
requestSlots.shuffleId)
 
     var availableWorkers = workersAvailable(requestSlots.excludedWorkerSet)
-    if (requestSlots.tagsExpr.nonEmpty) {
+    if (conf.tagsEnabled) {
       availableWorkers = tagsManager.getTaggedWorkers(
         requestSlots.userIdentifier,
         requestSlots.tagsExpr,

Reply via email to