This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 3fc74ec06 [CELEBORN-1597][CIP-11] Implement TagsManager
3fc74ec06 is described below

commit 3fc74ec06b0753da04253c5a465be7b541583446
Author: Sanskar Modi <[email protected]>
AuthorDate: Fri Sep 27 15:20:24 2024 +0800

    [CELEBORN-1597][CIP-11] Implement TagsManager
    
    ### What changes were proposed in this pull request?
    
    Added tags manager which will be responsible for managing worker tags. This 
will be used in the follow up PRs
    
    ### Why are the changes needed?
    
    
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn
    
    ### Does this PR introduce _any_ user-facing change?
    
    NA
    
    ### How was this patch tested?
    Added UTs
    
    Closes #2739 from s0nskar/tags_manager.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../service/deploy/master/tags/TagsManager.scala   |  80 ++++++++++++++
 .../deploy/master/tags/TagsManagerSuite.scala      | 123 +++++++++++++++++++++
 2 files changed, 203 insertions(+)

diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
new file mode 100644
index 000000000..6dd6e43e2
--- /dev/null
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master.tags
+
+import java.util
+import java.util.{Set => JSet}
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters.{asScalaIteratorConverter, 
mapAsScalaConcurrentMapConverter}
+
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+
+class TagsManager extends Logging {
+  private val tagStore = new ConcurrentHashMap[String, JSet[String]]()
+
+  private val addNewTagFunc =
+    new util.function.Function[String, ConcurrentHashMap.KeySetView[String, 
java.lang.Boolean]]() {
+      override def apply(t: String): ConcurrentHashMap.KeySetView[String, 
java.lang.Boolean] =
+        ConcurrentHashMap.newKeySet[String]()
+    }
+
+  def getTaggedWorkers(tag: String, workers: List[WorkerInfo]): 
List[WorkerInfo] = {
+    val workersForTag = tagStore.get(tag)
+    if (workersForTag == null) {
+      logWarning(s"Tag $tag not found in cluster")
+      return List.empty
+    }
+    workers.filter(worker => workersForTag.contains(worker.toUniqueId()))
+  }
+
+  def addTagToWorker(tag: String, workerId: String): Unit = {
+    val workers = tagStore.computeIfAbsent(tag, addNewTagFunc)
+    logInfo(s"Adding Tag $tag to worker $workerId")
+    workers.add(workerId)
+  }
+
+  def removeTagFromWorker(tag: String, workerId: String): Unit = {
+    val workers = tagStore.get(tag)
+
+    if (workers != null && workers.contains(workerId)) {
+      logInfo(s"Removing Tag $tag from worker $workerId")
+      workers.remove(workerId)
+    } else {
+      logWarning(s"Tag $tag not found for worker $workerId")
+    }
+  }
+
+  def getTagsForWorker(worker: WorkerInfo): Set[String] = {
+    tagStore.asScala.filter(_._2.contains(worker.toUniqueId())).keySet.toSet
+  }
+
+  def removeTagFromCluster(tag: String): Unit = {
+    val workers = tagStore.remove(tag)
+    if (workers != null) {
+      logInfo(s"Removed Tag $tag from cluster with workers 
${workers.toArray.mkString(", ")}")
+    } else {
+      logWarning(s"Tag $tag not found in cluster and thus can not be removed")
+    }
+  }
+
+  def getTagsForCluster: Set[String] = {
+    tagStore.keySet().iterator().asScala.toSet
+  }
+}
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
new file mode 100644
index 000000000..5f444a816
--- /dev/null
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master.tags
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+
+class TagsManagerSuite extends AnyFunSuite
+  with BeforeAndAfterAll
+  with BeforeAndAfterEach
+  with Logging {
+  protected var tagsManager: TagsManager = _
+
+  val WORKER1 = new WorkerInfo("host1", 111, 112, 113, 114, 115)
+  val WORKER2 = new WorkerInfo("host2", 211, 212, 213, 214, 215)
+  val WORKER3 = new WorkerInfo("host3", 311, 312, 313, 314, 315)
+
+  val workers = List(WORKER1, WORKER2, WORKER3)
+
+  test("test tags manager") {
+    tagsManager = new TagsManager()
+
+    // Tag1
+    val TAG1 = "tag1"
+    tagsManager.addTagToWorker(TAG1, WORKER1.toUniqueId())
+    tagsManager.addTagToWorker(TAG1, WORKER2.toUniqueId())
+
+    // Tag2
+    val TAG2 = "tag2"
+    tagsManager.addTagToWorker(TAG2, WORKER2.toUniqueId())
+    tagsManager.addTagToWorker(TAG2, WORKER3.toUniqueId())
+
+    {
+      val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+      assert(taggedWorkers.size == 2)
+      assert(taggedWorkers.contains(WORKER1))
+      assert(taggedWorkers.contains(WORKER2))
+      assert(!taggedWorkers.contains(WORKER3))
+    }
+
+    {
+      val taggedWorkers = tagsManager.getTaggedWorkers(TAG2, workers)
+      assert(taggedWorkers.size == 2)
+      assert(!taggedWorkers.contains(WORKER1))
+      assert(taggedWorkers.contains(WORKER2))
+      assert(taggedWorkers.contains(WORKER3))
+    }
+
+    {
+      // Test get tags for cluster
+      val tags = tagsManager.getTagsForCluster
+      assert(tags.size == 2)
+      assert(tags.contains(TAG1))
+      assert(tags.contains(TAG2))
+    }
+
+    {
+      // Test an unknown tag
+      val taggedWorkers = tagsManager.getTaggedWorkers("unknown-tag", workers)
+      assert(taggedWorkers.isEmpty)
+    }
+
+    {
+      // Test get tags for worker
+      val tagsWorker1 = tagsManager.getTagsForWorker(WORKER1)
+      assert(tagsWorker1.size == 1)
+      assert(tagsWorker1.contains(TAG1))
+
+      val tagsWorker2 = tagsManager.getTagsForWorker(WORKER2)
+      assert(tagsWorker2.size == 2)
+      assert(tagsWorker2.contains(TAG1))
+      assert(tagsWorker2.contains(TAG2))
+
+      val tagsWorker3 = tagsManager.getTagsForWorker(WORKER3)
+      assert(tagsWorker3.size == 1)
+      assert(tagsWorker3.contains(TAG2))
+
+      // Untagged worker
+      val untaggedWorker = new WorkerInfo("host4", 999, 999, 999, 999, 999)
+      val tagsUntaggedWorker = tagsManager.getTagsForWorker(untaggedWorker)
+      assert(tagsUntaggedWorker.isEmpty)
+    }
+
+    {
+      // Remove tag from worker
+      tagsManager.removeTagFromWorker(TAG1, WORKER2.toUniqueId())
+      val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+      assert(taggedWorkers.size == 1)
+      assert(taggedWorkers.contains(WORKER1))
+      assert(!taggedWorkers.contains(WORKER2))
+      assert(!taggedWorkers.contains(WORKER3))
+    }
+
+    {
+      // Remove tag from cluster
+      tagsManager.removeTagFromCluster(TAG1)
+      val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+      assert(taggedWorkers.isEmpty)
+
+      val tags = tagsManager.getTagsForCluster
+      assert(tags.size == 1)
+      assert(tags.contains(TAG2))
+    }
+  }
+}

Reply via email to