This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 3fc74ec06 [CELEBORN-1597][CIP-11] Implement TagsManager
3fc74ec06 is described below
commit 3fc74ec06b0753da04253c5a465be7b541583446
Author: Sanskar Modi <[email protected]>
AuthorDate: Fri Sep 27 15:20:24 2024 +0800
[CELEBORN-1597][CIP-11] Implement TagsManager
### What changes were proposed in this pull request?
Added tags manager which will be responsible for managing worker tags. This
will be used in the follow up PRs
### Why are the changes needed?
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn
### Does this PR introduce _any_ user-facing change?
NA
### How was this patch tested?
Added UTs
Closes #2739 from s0nskar/tags_manager.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../service/deploy/master/tags/TagsManager.scala | 80 ++++++++++++++
.../deploy/master/tags/TagsManagerSuite.scala | 123 +++++++++++++++++++++
2 files changed, 203 insertions(+)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
new file mode 100644
index 000000000..6dd6e43e2
--- /dev/null
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master.tags
+
+import java.util
+import java.util.{Set => JSet}
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters.{asScalaIteratorConverter,
mapAsScalaConcurrentMapConverter}
+
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+
+class TagsManager extends Logging {
+ private val tagStore = new ConcurrentHashMap[String, JSet[String]]()
+
+ private val addNewTagFunc =
+ new util.function.Function[String, ConcurrentHashMap.KeySetView[String,
java.lang.Boolean]]() {
+ override def apply(t: String): ConcurrentHashMap.KeySetView[String,
java.lang.Boolean] =
+ ConcurrentHashMap.newKeySet[String]()
+ }
+
+ def getTaggedWorkers(tag: String, workers: List[WorkerInfo]):
List[WorkerInfo] = {
+ val workersForTag = tagStore.get(tag)
+ if (workersForTag == null) {
+ logWarning(s"Tag $tag not found in cluster")
+ return List.empty
+ }
+ workers.filter(worker => workersForTag.contains(worker.toUniqueId()))
+ }
+
+ def addTagToWorker(tag: String, workerId: String): Unit = {
+ val workers = tagStore.computeIfAbsent(tag, addNewTagFunc)
+ logInfo(s"Adding Tag $tag to worker $workerId")
+ workers.add(workerId)
+ }
+
+ def removeTagFromWorker(tag: String, workerId: String): Unit = {
+ val workers = tagStore.get(tag)
+
+ if (workers != null && workers.contains(workerId)) {
+ logInfo(s"Removing Tag $tag from worker $workerId")
+ workers.remove(workerId)
+ } else {
+ logWarning(s"Tag $tag not found for worker $workerId")
+ }
+ }
+
+ def getTagsForWorker(worker: WorkerInfo): Set[String] = {
+ tagStore.asScala.filter(_._2.contains(worker.toUniqueId())).keySet.toSet
+ }
+
+ def removeTagFromCluster(tag: String): Unit = {
+ val workers = tagStore.remove(tag)
+ if (workers != null) {
+ logInfo(s"Removed Tag $tag from cluster with workers
${workers.toArray.mkString(", ")}")
+ } else {
+ logWarning(s"Tag $tag not found in cluster and thus can not be removed")
+ }
+ }
+
+ def getTagsForCluster: Set[String] = {
+ tagStore.keySet().iterator().asScala.toSet
+ }
+}
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
new file mode 100644
index 000000000..5f444a816
--- /dev/null
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master.tags
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+
+class TagsManagerSuite extends AnyFunSuite
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with Logging {
+ protected var tagsManager: TagsManager = _
+
+ val WORKER1 = new WorkerInfo("host1", 111, 112, 113, 114, 115)
+ val WORKER2 = new WorkerInfo("host2", 211, 212, 213, 214, 215)
+ val WORKER3 = new WorkerInfo("host3", 311, 312, 313, 314, 315)
+
+ val workers = List(WORKER1, WORKER2, WORKER3)
+
+ test("test tags manager") {
+ tagsManager = new TagsManager()
+
+ // Tag1
+ val TAG1 = "tag1"
+ tagsManager.addTagToWorker(TAG1, WORKER1.toUniqueId())
+ tagsManager.addTagToWorker(TAG1, WORKER2.toUniqueId())
+
+ // Tag2
+ val TAG2 = "tag2"
+ tagsManager.addTagToWorker(TAG2, WORKER2.toUniqueId())
+ tagsManager.addTagToWorker(TAG2, WORKER3.toUniqueId())
+
+ {
+ val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+ assert(taggedWorkers.size == 2)
+ assert(taggedWorkers.contains(WORKER1))
+ assert(taggedWorkers.contains(WORKER2))
+ assert(!taggedWorkers.contains(WORKER3))
+ }
+
+ {
+ val taggedWorkers = tagsManager.getTaggedWorkers(TAG2, workers)
+ assert(taggedWorkers.size == 2)
+ assert(!taggedWorkers.contains(WORKER1))
+ assert(taggedWorkers.contains(WORKER2))
+ assert(taggedWorkers.contains(WORKER3))
+ }
+
+ {
+ // Test get tags for cluster
+ val tags = tagsManager.getTagsForCluster
+ assert(tags.size == 2)
+ assert(tags.contains(TAG1))
+ assert(tags.contains(TAG2))
+ }
+
+ {
+ // Test an unknown tag
+ val taggedWorkers = tagsManager.getTaggedWorkers("unknown-tag", workers)
+ assert(taggedWorkers.isEmpty)
+ }
+
+ {
+ // Test get tags for worker
+ val tagsWorker1 = tagsManager.getTagsForWorker(WORKER1)
+ assert(tagsWorker1.size == 1)
+ assert(tagsWorker1.contains(TAG1))
+
+ val tagsWorker2 = tagsManager.getTagsForWorker(WORKER2)
+ assert(tagsWorker2.size == 2)
+ assert(tagsWorker2.contains(TAG1))
+ assert(tagsWorker2.contains(TAG2))
+
+ val tagsWorker3 = tagsManager.getTagsForWorker(WORKER3)
+ assert(tagsWorker3.size == 1)
+ assert(tagsWorker3.contains(TAG2))
+
+ // Untagged worker
+ val untaggedWorker = new WorkerInfo("host4", 999, 999, 999, 999, 999)
+ val tagsUntaggedWorker = tagsManager.getTagsForWorker(untaggedWorker)
+ assert(tagsUntaggedWorker.isEmpty)
+ }
+
+ {
+ // Remove tag from worker
+ tagsManager.removeTagFromWorker(TAG1, WORKER2.toUniqueId())
+ val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+ assert(taggedWorkers.size == 1)
+ assert(taggedWorkers.contains(WORKER1))
+ assert(!taggedWorkers.contains(WORKER2))
+ assert(!taggedWorkers.contains(WORKER3))
+ }
+
+ {
+ // Remove tag from cluster
+ tagsManager.removeTagFromCluster(TAG1)
+ val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+ assert(taggedWorkers.isEmpty)
+
+ val tags = tagsManager.getTagsForCluster
+ assert(tags.size == 1)
+ assert(tags.contains(TAG2))
+ }
+ }
+}