This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e51b0c4f8 [CELEBORN-1642][CIP-11] Support multiple worker tags
e51b0c4f8 is described below

commit e51b0c4f86fca391e9fa70a1fd7fbf445937ac04
Author: Sanskar Modi <[email protected]>
AuthorDate: Mon Oct 28 10:18:43 2024 +0800

    [CELEBORN-1642][CIP-11] Support multiple worker tags
    
    ### What changes were proposed in this pull request?
    Current TagsManager code only supported one tags for selecting tagged 
workers. This change will enable support of passing multiple tags to 
TagsManager. Multiple tags will be evaluated as "AND" expression i.e only 
workers tagged with all the passed tags will be selected.
    
    Support for more schemes will be added in follow up PRs.
    
    ### Why are the changes needed?
    
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UTs
    
    Closes #2850 from s0nskar/CELEBORN-1642.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../service/deploy/master/tags/TagsManager.scala   | 26 +++++++++++++--------
 .../deploy/master/tags/TagsManagerSuite.scala      | 27 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 9 deletions(-)

diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
index e80534d48..3b421dff8 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
@@ -18,7 +18,7 @@
 package org.apache.celeborn.service.deploy.master.tags
 
 import java.util
-import java.util.{Set => JSet}
+import java.util.{Collections, Set => JSet}
 import java.util.concurrent.ConcurrentHashMap
 import java.util.function.Predicate
 import java.util.stream.Collectors
@@ -43,19 +43,27 @@ class TagsManager extends Logging {
 
     if (tags.isEmpty) {
       logWarning("No tags provided")
-      return new util.ArrayList[WorkerInfo]()
+      return Collections.emptyList()
     }
 
-    // TODO: Support multiple tags (CELEBORN-1642)
-    val tag = tags(0)
-    val workersForTag = tagStore.get(tag)
-    if (workersForTag == null) {
-      logWarning(s"Tag $tag not found in cluster")
-      return new util.ArrayList[WorkerInfo]()
+    var workersForTags: Option[JSet[String]] = None
+    tags.foreach { tag =>
+      val taggedWorkers = tagStore.getOrDefault(tag, Collections.emptySet())
+      workersForTags match {
+        case Some(w) =>
+          w.retainAll(taggedWorkers)
+        case _ =>
+          workersForTags = Some(taggedWorkers)
+      }
+    }
+
+    if (workersForTags.isEmpty) {
+      logWarning(s"No workers for tags: $tagExpr found in cluster")
+      return Collections.emptyList()
     }
 
     val workerTagsPredicate = new Predicate[WorkerInfo] {
-      override def test(w: WorkerInfo): Boolean = 
workersForTag.contains(w.toUniqueId())
+      override def test(w: WorkerInfo): Boolean = 
workersForTags.get.contains(w.toUniqueId())
     }
     workers.stream().filter(workerTagsPredicate).collect(Collectors.toList())
   }
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
index 544152ea7..a77796f52 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
@@ -122,4 +122,31 @@ class TagsManagerSuite extends AnyFunSuite
       assert(tags.contains(TAG2))
     }
   }
+
+  test("test tags expression with multiple tags") {
+    tagsManager = new TagsManager()
+
+    // Tag1
+    val TAG1 = "tag1"
+    tagsManager.addTagToWorker(TAG1, WORKER1.toUniqueId())
+    tagsManager.addTagToWorker(TAG1, WORKER2.toUniqueId())
+
+    // Tag2
+    val TAG2 = "tag2"
+    tagsManager.addTagToWorker(TAG2, WORKER2.toUniqueId())
+    tagsManager.addTagToWorker(TAG2, WORKER3.toUniqueId())
+
+    {
+      val taggedWorkers = tagsManager.getTaggedWorkers("tag1,tag2", workers)
+      assert(taggedWorkers.size == 1)
+      assert(!taggedWorkers.contains(WORKER1))
+      assert(taggedWorkers.contains(WORKER2))
+      assert(!taggedWorkers.contains(WORKER3))
+    }
+
+    {
+      val taggedWorkers = tagsManager.getTaggedWorkers("tag1,tag3", workers)
+      assert(taggedWorkers.size == 0)
+    }
+  }
 }

Reply via email to