This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 712d9a496 [CELEBORN-1621][CIP-11] Predefined worker tags expr via
dynamic configs
712d9a496 is described below
commit 712d9a496eabacb954c8931b419d710a79883ab5
Author: Sanskar Modi <[email protected]>
AuthorDate: Tue Nov 26 20:40:30 2024 +0800
[CELEBORN-1621][CIP-11] Predefined worker tags expr via dynamic configs
### What changes were proposed in this pull request?
Support predefined tags expression for tenant and users via dynamic config.
Using this admin can configure tags for users/tenants and give permission to
special users to provide custom tags expression.
### Why are the changes needed?
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn
### Does this PR introduce _any_ user-facing change?
NA
### How was this patch tested?
UTs
Closes #2936 from s0nskar/admin_tags.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../apache/celeborn/client/LifecycleManager.scala | 2 +-
.../org/apache/celeborn/common/CelebornConf.scala | 20 ++++++++--
.../celeborn/common/tags/WorkerTagsMeta.scala | 32 ++++++++++++++++
docs/configuration/client.md | 2 +-
docs/configuration/master.md | 2 +
.../celeborn/service/deploy/master/Master.scala | 1 +
.../service/deploy/master/tags/TagsManager.scala | 23 +++++++++---
master/src/test/resources/dynamicConfig-tags.yaml | 5 ++-
.../deploy/master/tags/TagsManagerSuite.scala | 43 +++++++++++++++++-----
.../common/service/config/DynamicConfig.java | 22 ++++++++---
.../server/common/service/config/SystemConfig.java | 7 +---
11 files changed, 128 insertions(+), 31 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 47c27bced..056747e22 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -116,7 +116,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
.maximumSize(rpcCacheSize)
.build().asInstanceOf[Cache[Int, ByteBuffer]]
- private val clientTagsExpr = conf.clientTagsExpr
+ private val clientTagsExpr = conf.tagsExpr
private val mockDestroyFailure = conf.testMockDestroySlotsFailure
private val authEnabled = conf.authEnabledOnClient
private var applicationMeta: ApplicationMeta = _
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 3e48d9388..b68238e83 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1347,7 +1347,8 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)
- def clientTagsExpr: String = get(CLIENT_TAGS_EXPR)
+ def tagsExpr: String = get(TAGS_EXPR)
+ def preferClientTagsExpr: Boolean = get(PREFER_CLIENT_TAGS_EXPR)
// //////////////////////////////////////////////////////
// kerberos //
@@ -5980,16 +5981,27 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
- val CLIENT_TAGS_EXPR: ConfigEntry[String] =
- buildConf("celeborn.client.tagsExpr")
- .categories("client")
+ val TAGS_EXPR: ConfigEntry[String] =
+ buildConf("celeborn.tags.tagsExpr")
+ .categories("master", "client")
.version("0.6.0")
.doc("Expression to filter workers by tags. The expression is a
comma-separated list of " +
"tags. The expression is evaluated as a logical AND of all tags. For
example, " +
"`prod,high-io` filters workers that have both the `prod` and
`high-io` tags.")
+ .dynamic
.stringConf
.createWithDefault("")
+ val PREFER_CLIENT_TAGS_EXPR: ConfigEntry[Boolean] =
+ buildConf("celeborn.tags.preferClientTagsExpr")
+ .categories("master")
+ .doc("When `true`, prefer the tags expression provided by the client
over the tags " +
+ "expression provided by the master.")
+ .dynamic
+ .version("0.6.0")
+ .booleanConf
+ .createWithDefault(false)
+
val MASTER_EXCLUDE_WORKER_UNHEALTHY_DISK_RATIO_THRESHOLD:
ConfigEntry[Double] =
buildConf("celeborn.master.excludeWorker.unhealthyDiskRatioThreshold")
.categories("master")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/tags/WorkerTagsMeta.scala
b/common/src/main/scala/org/apache/celeborn/common/tags/WorkerTagsMeta.scala
new file mode 100644
index 000000000..df5e669b9
--- /dev/null
+++ b/common/src/main/scala/org/apache/celeborn/common/tags/WorkerTagsMeta.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.tags
+
+import org.apache.celeborn.common.internal.Logging
+
+case class WorkerTagsMeta(
+ tagsExpr: String,
+ preferClientTagExpr: Boolean) extends Logging {
+
+ override def toString: String = {
+ s"WorkerTagsMeta[" +
+ s"tagsExpr=$tagsExpr, " +
+ s"preferClientTagExp=$preferClientTagExpr" +
+ s"]"
+ }
+}
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 2bb7a08f3..53e540425 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -120,7 +120,6 @@ license: |
| celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always
use spark built-in shuffle implementation. This configuration is deprecated,
consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. |
0.3.0 | celeborn.shuffle.forceFallback.enabled |
| celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the
following kind of shuffle writers. 1. hash: hash-based shuffle writer works
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer
works fine when memory pressure is high or shuffle partition count is huge.
This configuration only takes effect when
celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 |
celeborn.shuffle.writer |
| celeborn.client.spark.stageRerun.enabled | true | false | Whether to enable
stage rerun. If true, client throws FetchFailedException instead of
CelebornIOException. | 0.4.0 | celeborn.client.spark.fetch.throwsFetchFailure |
-| celeborn.client.tagsExpr | | false | Expression to filter workers by tags.
The expression is a comma-separated list of tags. The expression is evaluated
as a logical AND of all tags. For example, `prod,high-io` filters workers that
have both the `prod` and `high-io` tags. | 0.6.0 | |
| celeborn.master.endpoints | <localhost>:9097 | false | Endpoints of
master nodes for celeborn clients to connect. Client uses resolver provided by
celeborn.master.endpoints.resolver to resolve the master endpoints. By default
Celeborn uses `org.apache.celeborn.common.client.StaticMasterEndpointResolver`
which take static master endpoints as input. Allowed pattern:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used [...]
| celeborn.master.endpoints.resolver |
org.apache.celeborn.common.client.StaticMasterEndpointResolver | false |
Resolver class that can be used for discovering and updating the master
endpoints. This allows users to provide a custom master endpoint resolver
implementation. This is useful in environments where the master nodes might
change due to scaling operations or infrastructure updates. Clients need to
ensure that provided resolver class should be present in the classpath. | 0.6.0
| |
| celeborn.quota.enabled | true | false | When Master side sets to true, the
master will enable to check the quota via QuotaManager. When Client side sets
to true, LifecycleManager will request Master side to check whether the current
user has enough quota before registration of shuffle. Fallback to the default
shuffle service of Spark when Master side checks that there is no enough quota
for current user. | 0.2.0 | |
@@ -134,4 +133,5 @@ license: |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key
for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.tags.tagsExpr | | true | Expression to filter workers by tags. The
expression is a comma-separated list of tags. The expression is evaluated as a
logical AND of all tags. For example, `prod,high-io` filters workers that have
both the `prod` and `high-io` tags. | 0.6.0 | |
<!--end-include-->
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 4359527c0..a39007198 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -91,4 +91,6 @@ license: |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key
for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.tags.preferClientTagsExpr | false | true | When `true`, prefer the
tags expression provided by the client over the tags expression provided by the
master. | 0.6.0 | |
+| celeborn.tags.tagsExpr | | true | Expression to filter workers by tags. The
expression is a comma-separated list of tags. The expression is evaluated as a
logical AND of all tags. For example, `prod,high-io` filters workers that have
both the `prod` and `high-io` tags. | 0.6.0 | |
<!--end-include-->
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index de749a7b5..f394ab3e0 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -863,6 +863,7 @@ private[celeborn] class Master(
var availableWorkers = workersAvailable(requestSlots.excludedWorkerSet)
if (requestSlots.tagsExpr.nonEmpty) {
availableWorkers = tagsManager.getTaggedWorkers(
+ requestSlots.userIdentifier,
requestSlots.tagsExpr,
availableWorkers)
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
index fdc6442b2..b03426732 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
@@ -25,6 +25,7 @@ import java.util.stream.Collectors
import scala.collection.JavaConverters.{asScalaIteratorConverter,
mapAsScalaConcurrentMapConverter}
+import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.WorkerInfo
import org.apache.celeborn.common.util.JavaUtils
@@ -50,15 +51,27 @@ class TagsManager(configService: Option[ConfigService])
extends Logging {
}
def getTaggedWorkers(
- tagExpr: String,
+ userIdentifier: UserIdentifier,
+ clientTagsExpr: String,
workers: util.List[WorkerInfo]): util.List[WorkerInfo] = {
- val tags = tagExpr.split(",").map(_.trim)
- if (tags.isEmpty) {
+ val tagsExpr = configService.flatMap { cs =>
+ val config = cs.getTenantUserConfigFromCache(userIdentifier.tenantId,
userIdentifier.name)
+ val tagsMeta = config.getWorkerTagsMeta
+ if (tagsMeta.preferClientTagExpr) {
+ Some(clientTagsExpr)
+ } else {
+ Some(tagsMeta.tagsExpr)
+ }
+ }.getOrElse(clientTagsExpr)
+
+ if (tagsExpr.isEmpty) {
logWarning("No tags provided")
- return Collections.emptyList()
+ return workers
}
+ val tags = tagsExpr.split(",").map(_.trim)
+
var workersForTags: Option[JSet[String]] = None
tags.foreach { tag =>
val taggedWorkers = getTagStore.getOrDefault(tag, Collections.emptySet())
@@ -71,7 +84,7 @@ class TagsManager(configService: Option[ConfigService])
extends Logging {
}
if (workersForTags.isEmpty) {
- logWarning(s"No workers for tags: $tagExpr found in cluster")
+ logWarning(s"No workers for tags: $tagsExpr found in cluster")
return Collections.emptyList()
}
diff --git a/master/src/test/resources/dynamicConfig-tags.yaml
b/master/src/test/resources/dynamicConfig-tags.yaml
index ea06ec31f..ca90a0e53 100644
--- a/master/src/test/resources/dynamicConfig-tags.yaml
+++ b/master/src/test/resources/dynamicConfig-tags.yaml
@@ -32,4 +32,7 @@
users:
- name: Jerry
config:
- celeborn.test.int.only: 10
+ celeborn.tags.preferClientTagsExpr: true
+ - name: Tom
+ config:
+ celeborn.tags.tagsExpr: 'tag1'
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
index fbffa478d..54939aaa8 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters.seqAsJavaListConverter
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.WorkerInfo
import
org.apache.celeborn.server.common.service.config.DynamicConfigServiceFactory
@@ -36,6 +37,8 @@ class TagsManagerSuite extends CelebornFunSuite {
private val workers = List(WORKER1, WORKER2, WORKER3).asJava
+ private val user = UserIdentifier("tenant_01", "Jerry")
+
override def beforeEach(): Unit = {
super.beforeEach()
DynamicConfigServiceFactory.reset()
@@ -51,7 +54,7 @@ class TagsManagerSuite extends CelebornFunSuite {
tagsManager.addTagToWorker(TAG2, WORKER3.toUniqueId())
{
- val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+ val taggedWorkers = tagsManager.getTaggedWorkers(user, TAG1, workers)
assert(taggedWorkers.size == 2)
assert(taggedWorkers.contains(WORKER1))
assert(taggedWorkers.contains(WORKER2))
@@ -59,7 +62,7 @@ class TagsManagerSuite extends CelebornFunSuite {
}
{
- val taggedWorkers = tagsManager.getTaggedWorkers(TAG2, workers)
+ val taggedWorkers = tagsManager.getTaggedWorkers(user, TAG2, workers)
assert(taggedWorkers.size == 2)
assert(!taggedWorkers.contains(WORKER1))
assert(taggedWorkers.contains(WORKER2))
@@ -76,7 +79,7 @@ class TagsManagerSuite extends CelebornFunSuite {
{
// Test an unknown tag
- val taggedWorkers = tagsManager.getTaggedWorkers("unknown-tag", workers)
+ val taggedWorkers = tagsManager.getTaggedWorkers(user, "unknown-tag",
workers)
assert(taggedWorkers.isEmpty)
}
@@ -104,7 +107,7 @@ class TagsManagerSuite extends CelebornFunSuite {
{
// Remove tag from worker
tagsManager.removeTagFromWorker(TAG1, WORKER2.toUniqueId())
- val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+ val taggedWorkers = tagsManager.getTaggedWorkers(user, TAG1, workers)
assert(taggedWorkers.size == 1)
assert(taggedWorkers.contains(WORKER1))
assert(!taggedWorkers.contains(WORKER2))
@@ -114,7 +117,7 @@ class TagsManagerSuite extends CelebornFunSuite {
{
// Remove tag from cluster
tagsManager.removeTagFromCluster(TAG1)
- val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+ val taggedWorkers = tagsManager.getTaggedWorkers(user, TAG1, workers)
assert(taggedWorkers.isEmpty)
val tags = tagsManager.getTagsForCluster
@@ -135,7 +138,7 @@ class TagsManagerSuite extends CelebornFunSuite {
tagsManager.addTagToWorker(TAG2, WORKER3.toUniqueId())
{
- val taggedWorkers = tagsManager.getTaggedWorkers("tag1,tag2", workers)
+ val taggedWorkers = tagsManager.getTaggedWorkers(user, "tag1,tag2",
workers)
assert(taggedWorkers.size == 1)
assert(!taggedWorkers.contains(WORKER1))
assert(taggedWorkers.contains(WORKER2))
@@ -143,7 +146,7 @@ class TagsManagerSuite extends CelebornFunSuite {
}
{
- val taggedWorkers = tagsManager.getTaggedWorkers("tag1,tag3", workers)
+ val taggedWorkers = tagsManager.getTaggedWorkers(user, "tag1,tag3",
workers)
assert(taggedWorkers.size == 0)
}
}
@@ -159,7 +162,8 @@ class TagsManagerSuite extends CelebornFunSuite {
tagsManager = new TagsManager(Option(configService))
{
- val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+ // preferClientTagsExpr: true
+ val taggedWorkers = tagsManager.getTaggedWorkers(user, TAG1, workers)
assert(taggedWorkers.size == 2)
assert(taggedWorkers.contains(WORKER1))
assert(taggedWorkers.contains(WORKER2))
@@ -167,11 +171,32 @@ class TagsManagerSuite extends CelebornFunSuite {
}
{
- val taggedWorkers = tagsManager.getTaggedWorkers(TAG2, workers)
+ // preferClientTagsExpr: true
+ val taggedWorkers = tagsManager.getTaggedWorkers(user, TAG2, workers)
assert(taggedWorkers.size == 2)
assert(!taggedWorkers.contains(WORKER1))
assert(taggedWorkers.contains(WORKER2))
assert(taggedWorkers.contains(WORKER3))
}
+
+ {
+ // preferClientTagsExpr: false, adminTagsExpr: "tag1"
+ val user = UserIdentifier("tenant_01", "Tom")
+ val taggedWorkers = tagsManager.getTaggedWorkers(user, "tag1,tag2",
workers)
+ assert(taggedWorkers.size == 2)
+ assert(taggedWorkers.contains(WORKER1))
+ assert(taggedWorkers.contains(WORKER2))
+ assert(!taggedWorkers.contains(WORKER3))
+ }
+
+ {
+ // preferClientTagsExpr: false, adminTagsExpr: ""
+ val user = UserIdentifier("tenant_01", "Robin")
+ val taggedWorkers = tagsManager.getTaggedWorkers(user, "tag1", workers)
+ assert(taggedWorkers.size == 3)
+ assert(taggedWorkers.contains(WORKER1))
+ assert(taggedWorkers.contains(WORKER2))
+ assert(taggedWorkers.contains(WORKER3))
+ }
}
}
diff --git
a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java
b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java
index 29c317e7e..c09ac0823 100644
---
a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java
+++
b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java
@@ -29,6 +29,7 @@ import org.apache.celeborn.common.internal.config.ConfigEntry;
import org.apache.celeborn.common.quota.Quota;
import org.apache.celeborn.common.quota.UserTrafficQuota;
import org.apache.celeborn.common.quota.WorkerTrafficQuota;
+import org.apache.celeborn.common.tags.WorkerTagsMeta;
import org.apache.celeborn.common.util.Utils;
/**
@@ -60,11 +61,8 @@ public abstract class DynamicConfig {
}
}
- public <T> T getValue(
- String configKey,
- ConfigEntry<Object> configEntry,
- Class<T> finalType,
- ConfigType configType) {
+ public <T, V> T getValue(
+ String configKey, ConfigEntry<V> configEntry, Class<T> finalType,
ConfigType configType) {
String configValue = configs.get(configKey);
T formatValue =
configValue != null ? formatValue(configKey, configValue, finalType,
configType) : null;
@@ -169,6 +167,20 @@ public abstract class DynamicConfig {
ConfigType.BYTES));
}
+ public WorkerTagsMeta getWorkerTagsMeta() {
+ return new WorkerTagsMeta(
+ getValue(
+ CelebornConf.TAGS_EXPR().key(),
+ CelebornConf.TAGS_EXPR(),
+ String.class,
+ ConfigType.STRING),
+ getValue(
+ CelebornConf.PREFER_CLIENT_TAGS_EXPR().key(),
+ CelebornConf.PREFER_CLIENT_TAGS_EXPR(),
+ Boolean.TYPE,
+ ConfigType.STRING));
+ }
+
public Map<String, String> getConfigs() {
return configs;
}
diff --git
a/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java
b/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java
index 23c28d47f..294fa1514 100644
---
a/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java
+++
b/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java
@@ -38,11 +38,8 @@ public class SystemConfig extends DynamicConfig {
}
@Override
- public <T> T getValue(
- String configKey,
- ConfigEntry<Object> configEntry,
- Class<T> finalType,
- ConfigType configType) {
+ public <T, V> T getValue(
+ String configKey, ConfigEntry<V> configEntry, Class<T> finalType,
ConfigType configType) {
String configValue = configs.get(configKey);
T formatValue =
configValue != null ? formatValue(configKey, configValue, finalType,
configType) : null;