This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 712d9a496 [CELEBORN-1621][CIP-11] Predefined worker tags expr via 
dynamic configs
712d9a496 is described below

commit 712d9a496eabacb954c8931b419d710a79883ab5
Author: Sanskar Modi <[email protected]>
AuthorDate: Tue Nov 26 20:40:30 2024 +0800

    [CELEBORN-1621][CIP-11] Predefined worker tags expr via dynamic configs
    
    ### What changes were proposed in this pull request?
    
    Support predefined tags expression for tenant and users via dynamic config. 
Using this admin can configure tags for users/tenants and give permission to 
special users to provide custom tags expression.
    
    ### Why are the changes needed?
    
    
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn
    
    ### Does this PR introduce _any_ user-facing change?
    
    NA
    
    ### How was this patch tested?
    UTs
    
    Closes #2936 from s0nskar/admin_tags.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../apache/celeborn/client/LifecycleManager.scala  |  2 +-
 .../org/apache/celeborn/common/CelebornConf.scala  | 20 ++++++++--
 .../celeborn/common/tags/WorkerTagsMeta.scala      | 32 ++++++++++++++++
 docs/configuration/client.md                       |  2 +-
 docs/configuration/master.md                       |  2 +
 .../celeborn/service/deploy/master/Master.scala    |  1 +
 .../service/deploy/master/tags/TagsManager.scala   | 23 +++++++++---
 master/src/test/resources/dynamicConfig-tags.yaml  |  5 ++-
 .../deploy/master/tags/TagsManagerSuite.scala      | 43 +++++++++++++++++-----
 .../common/service/config/DynamicConfig.java       | 22 ++++++++---
 .../server/common/service/config/SystemConfig.java |  7 +---
 11 files changed, 128 insertions(+), 31 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 47c27bced..056747e22 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -116,7 +116,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     .maximumSize(rpcCacheSize)
     .build().asInstanceOf[Cache[Int, ByteBuffer]]
 
-  private val clientTagsExpr = conf.clientTagsExpr
+  private val clientTagsExpr = conf.tagsExpr
   private val mockDestroyFailure = conf.testMockDestroySlotsFailure
   private val authEnabled = conf.authEnabledOnClient
   private var applicationMeta: ApplicationMeta = _
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 3e48d9388..b68238e83 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1347,7 +1347,8 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
   def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)
 
-  def clientTagsExpr: String = get(CLIENT_TAGS_EXPR)
+  def tagsExpr: String = get(TAGS_EXPR)
+  def preferClientTagsExpr: Boolean = get(PREFER_CLIENT_TAGS_EXPR)
 
   // //////////////////////////////////////////////////////
   //                    kerberos                         //
@@ -5980,16 +5981,27 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
-  val CLIENT_TAGS_EXPR: ConfigEntry[String] =
-    buildConf("celeborn.client.tagsExpr")
-      .categories("client")
+  val TAGS_EXPR: ConfigEntry[String] =
+    buildConf("celeborn.tags.tagsExpr")
+      .categories("master", "client")
       .version("0.6.0")
       .doc("Expression to filter workers by tags. The expression is a 
comma-separated list of " +
         "tags. The expression is evaluated as a logical AND of all tags. For 
example, " +
         "`prod,high-io` filters workers that have both the `prod` and 
`high-io` tags.")
+      .dynamic
       .stringConf
       .createWithDefault("")
 
+  val PREFER_CLIENT_TAGS_EXPR: ConfigEntry[Boolean] =
+    buildConf("celeborn.tags.preferClientTagsExpr")
+      .categories("master")
+      .doc("When `true`, prefer the tags expression provided by the client 
over the tags " +
+        "expression provided by the master.")
+      .dynamic
+      .version("0.6.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val MASTER_EXCLUDE_WORKER_UNHEALTHY_DISK_RATIO_THRESHOLD: 
ConfigEntry[Double] =
     buildConf("celeborn.master.excludeWorker.unhealthyDiskRatioThreshold")
       .categories("master")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/tags/WorkerTagsMeta.scala 
b/common/src/main/scala/org/apache/celeborn/common/tags/WorkerTagsMeta.scala
new file mode 100644
index 000000000..df5e669b9
--- /dev/null
+++ b/common/src/main/scala/org/apache/celeborn/common/tags/WorkerTagsMeta.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.tags
+
+import org.apache.celeborn.common.internal.Logging
+
+case class WorkerTagsMeta(
+    tagsExpr: String,
+    preferClientTagExpr: Boolean) extends Logging {
+
+  override def toString: String = {
+    s"WorkerTagsMeta[" +
+      s"tagsExpr=$tagsExpr, " +
+      s"preferClientTagExp=$preferClientTagExpr" +
+      s"]"
+  }
+}
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 2bb7a08f3..53e540425 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -120,7 +120,6 @@ license: |
 | celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always 
use spark built-in shuffle implementation. This configuration is deprecated, 
consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. | 
0.3.0 | celeborn.shuffle.forceFallback.enabled | 
 | celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the 
following kind of shuffle writers. 1. hash: hash-based shuffle writer works 
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer 
works fine when memory pressure is high or shuffle partition count is huge. 
This configuration only takes effect when 
celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | 
celeborn.shuffle.writer | 
 | celeborn.client.spark.stageRerun.enabled | true | false | Whether to enable 
stage rerun. If true, client throws FetchFailedException instead of 
CelebornIOException. | 0.4.0 | celeborn.client.spark.fetch.throwsFetchFailure | 
-| celeborn.client.tagsExpr |  | false | Expression to filter workers by tags. 
The expression is a comma-separated list of tags. The expression is evaluated 
as a logical AND of all tags. For example, `prod,high-io` filters workers that 
have both the `prod` and `high-io` tags. | 0.6.0 |  | 
 | celeborn.master.endpoints | &lt;localhost&gt;:9097 | false | Endpoints of 
master nodes for celeborn clients to connect. Client uses resolver provided by 
celeborn.master.endpoints.resolver to resolve the master endpoints. By default 
Celeborn uses `org.apache.celeborn.common.client.StaticMasterEndpointResolver` 
which take static master endpoints as input. Allowed pattern: 
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If 
the port is omitted, 9097 will be used [...]
 | celeborn.master.endpoints.resolver | 
org.apache.celeborn.common.client.StaticMasterEndpointResolver | false | 
Resolver class that can be used for discovering and updating the master 
endpoints. This allows users to provide a custom master endpoint resolver 
implementation. This is useful in environments where the master nodes might 
change due to scaling operations or infrastructure updates. Clients need to 
ensure that provided resolver class should be present in the classpath. | 0.6.0 
|  | 
 | celeborn.quota.enabled | true | false | When Master side sets to true, the 
master will enable to check the quota via QuotaManager. When Client side sets 
to true, LifecycleManager will request Master side to check whether the current 
user has enough quota before registration of shuffle. Fallback to the default 
shuffle service of Spark when Master side checks that there is no enough quota 
for current user. | 0.2.0 |  | 
@@ -134,4 +133,5 @@ license: |
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.tags.tagsExpr |  | true | Expression to filter workers by tags. The 
expression is a comma-separated list of tags. The expression is evaluated as a 
logical AND of all tags. For example, `prod,high-io` filters workers that have 
both the `prod` and `high-io` tags. | 0.6.0 |  | 
 <!--end-include-->
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 4359527c0..a39007198 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -91,4 +91,6 @@ license: |
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.tags.preferClientTagsExpr | false | true | When `true`, prefer the 
tags expression provided by the client over the tags expression provided by the 
master. | 0.6.0 |  | 
+| celeborn.tags.tagsExpr |  | true | Expression to filter workers by tags. The 
expression is a comma-separated list of tags. The expression is evaluated as a 
logical AND of all tags. For example, `prod,high-io` filters workers that have 
both the `prod` and `high-io` tags. | 0.6.0 |  | 
 <!--end-include-->
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index de749a7b5..f394ab3e0 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -863,6 +863,7 @@ private[celeborn] class Master(
     var availableWorkers = workersAvailable(requestSlots.excludedWorkerSet)
     if (requestSlots.tagsExpr.nonEmpty) {
       availableWorkers = tagsManager.getTaggedWorkers(
+        requestSlots.userIdentifier,
         requestSlots.tagsExpr,
         availableWorkers)
     }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
index fdc6442b2..b03426732 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
@@ -25,6 +25,7 @@ import java.util.stream.Collectors
 
 import scala.collection.JavaConverters.{asScalaIteratorConverter, 
mapAsScalaConcurrentMapConverter}
 
+import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.WorkerInfo
 import org.apache.celeborn.common.util.JavaUtils
@@ -50,15 +51,27 @@ class TagsManager(configService: Option[ConfigService]) 
extends Logging {
   }
 
   def getTaggedWorkers(
-      tagExpr: String,
+      userIdentifier: UserIdentifier,
+      clientTagsExpr: String,
       workers: util.List[WorkerInfo]): util.List[WorkerInfo] = {
-    val tags = tagExpr.split(",").map(_.trim)
 
-    if (tags.isEmpty) {
+    val tagsExpr = configService.flatMap { cs =>
+      val config = cs.getTenantUserConfigFromCache(userIdentifier.tenantId, 
userIdentifier.name)
+      val tagsMeta = config.getWorkerTagsMeta
+      if (tagsMeta.preferClientTagExpr) {
+        Some(clientTagsExpr)
+      } else {
+        Some(tagsMeta.tagsExpr)
+      }
+    }.getOrElse(clientTagsExpr)
+
+    if (tagsExpr.isEmpty) {
       logWarning("No tags provided")
-      return Collections.emptyList()
+      return workers
     }
 
+    val tags = tagsExpr.split(",").map(_.trim)
+
     var workersForTags: Option[JSet[String]] = None
     tags.foreach { tag =>
       val taggedWorkers = getTagStore.getOrDefault(tag, Collections.emptySet())
@@ -71,7 +84,7 @@ class TagsManager(configService: Option[ConfigService]) 
extends Logging {
     }
 
     if (workersForTags.isEmpty) {
-      logWarning(s"No workers for tags: $tagExpr found in cluster")
+      logWarning(s"No workers for tags: $tagsExpr found in cluster")
       return Collections.emptyList()
     }
 
diff --git a/master/src/test/resources/dynamicConfig-tags.yaml 
b/master/src/test/resources/dynamicConfig-tags.yaml
index ea06ec31f..ca90a0e53 100644
--- a/master/src/test/resources/dynamicConfig-tags.yaml
+++ b/master/src/test/resources/dynamicConfig-tags.yaml
@@ -32,4 +32,7 @@
    users:
      - name: Jerry
        config:
-         celeborn.test.int.only: 10
+         celeborn.tags.preferClientTagsExpr: true
+     - name: Tom
+       config:
+         celeborn.tags.tagsExpr: 'tag1'
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
index fbffa478d..54939aaa8 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters.seqAsJavaListConverter
 
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.meta.WorkerInfo
 import 
org.apache.celeborn.server.common.service.config.DynamicConfigServiceFactory
 
@@ -36,6 +37,8 @@ class TagsManagerSuite extends CelebornFunSuite {
 
   private val workers = List(WORKER1, WORKER2, WORKER3).asJava
 
+  private val user = UserIdentifier("tenant_01", "Jerry")
+
   override def beforeEach(): Unit = {
     super.beforeEach()
     DynamicConfigServiceFactory.reset()
@@ -51,7 +54,7 @@ class TagsManagerSuite extends CelebornFunSuite {
     tagsManager.addTagToWorker(TAG2, WORKER3.toUniqueId())
 
     {
-      val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+      val taggedWorkers = tagsManager.getTaggedWorkers(user, TAG1, workers)
       assert(taggedWorkers.size == 2)
       assert(taggedWorkers.contains(WORKER1))
       assert(taggedWorkers.contains(WORKER2))
@@ -59,7 +62,7 @@ class TagsManagerSuite extends CelebornFunSuite {
     }
 
     {
-      val taggedWorkers = tagsManager.getTaggedWorkers(TAG2, workers)
+      val taggedWorkers = tagsManager.getTaggedWorkers(user, TAG2, workers)
       assert(taggedWorkers.size == 2)
       assert(!taggedWorkers.contains(WORKER1))
       assert(taggedWorkers.contains(WORKER2))
@@ -76,7 +79,7 @@ class TagsManagerSuite extends CelebornFunSuite {
 
     {
       // Test an unknown tag
-      val taggedWorkers = tagsManager.getTaggedWorkers("unknown-tag", workers)
+      val taggedWorkers = tagsManager.getTaggedWorkers(user, "unknown-tag", 
workers)
       assert(taggedWorkers.isEmpty)
     }
 
@@ -104,7 +107,7 @@ class TagsManagerSuite extends CelebornFunSuite {
     {
       // Remove tag from worker
       tagsManager.removeTagFromWorker(TAG1, WORKER2.toUniqueId())
-      val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+      val taggedWorkers = tagsManager.getTaggedWorkers(user, TAG1, workers)
       assert(taggedWorkers.size == 1)
       assert(taggedWorkers.contains(WORKER1))
       assert(!taggedWorkers.contains(WORKER2))
@@ -114,7 +117,7 @@ class TagsManagerSuite extends CelebornFunSuite {
     {
       // Remove tag from cluster
       tagsManager.removeTagFromCluster(TAG1)
-      val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+      val taggedWorkers = tagsManager.getTaggedWorkers(user, TAG1, workers)
       assert(taggedWorkers.isEmpty)
 
       val tags = tagsManager.getTagsForCluster
@@ -135,7 +138,7 @@ class TagsManagerSuite extends CelebornFunSuite {
     tagsManager.addTagToWorker(TAG2, WORKER3.toUniqueId())
 
     {
-      val taggedWorkers = tagsManager.getTaggedWorkers("tag1,tag2", workers)
+      val taggedWorkers = tagsManager.getTaggedWorkers(user, "tag1,tag2", 
workers)
       assert(taggedWorkers.size == 1)
       assert(!taggedWorkers.contains(WORKER1))
       assert(taggedWorkers.contains(WORKER2))
@@ -143,7 +146,7 @@ class TagsManagerSuite extends CelebornFunSuite {
     }
 
     {
-      val taggedWorkers = tagsManager.getTaggedWorkers("tag1,tag3", workers)
+      val taggedWorkers = tagsManager.getTaggedWorkers(user, "tag1,tag3", 
workers)
       assert(taggedWorkers.size == 0)
     }
   }
@@ -159,7 +162,8 @@ class TagsManagerSuite extends CelebornFunSuite {
     tagsManager = new TagsManager(Option(configService))
 
     {
-      val taggedWorkers = tagsManager.getTaggedWorkers(TAG1, workers)
+      // preferClientTagsExpr: true
+      val taggedWorkers = tagsManager.getTaggedWorkers(user, TAG1, workers)
       assert(taggedWorkers.size == 2)
       assert(taggedWorkers.contains(WORKER1))
       assert(taggedWorkers.contains(WORKER2))
@@ -167,11 +171,32 @@ class TagsManagerSuite extends CelebornFunSuite {
     }
 
     {
-      val taggedWorkers = tagsManager.getTaggedWorkers(TAG2, workers)
+      // preferClientTagsExpr: true
+      val taggedWorkers = tagsManager.getTaggedWorkers(user, TAG2, workers)
       assert(taggedWorkers.size == 2)
       assert(!taggedWorkers.contains(WORKER1))
       assert(taggedWorkers.contains(WORKER2))
       assert(taggedWorkers.contains(WORKER3))
     }
+
+    {
+      // preferClientTagsExpr: false, adminTagsExpr: "tag1"
+      val user = UserIdentifier("tenant_01", "Tom")
+      val taggedWorkers = tagsManager.getTaggedWorkers(user, "tag1,tag2", 
workers)
+      assert(taggedWorkers.size == 2)
+      assert(taggedWorkers.contains(WORKER1))
+      assert(taggedWorkers.contains(WORKER2))
+      assert(!taggedWorkers.contains(WORKER3))
+    }
+
+    {
+      // preferClientTagsExpr: false, adminTagsExpr: ""
+      val user = UserIdentifier("tenant_01", "Robin")
+      val taggedWorkers = tagsManager.getTaggedWorkers(user, "tag1", workers)
+      assert(taggedWorkers.size == 3)
+      assert(taggedWorkers.contains(WORKER1))
+      assert(taggedWorkers.contains(WORKER2))
+      assert(taggedWorkers.contains(WORKER3))
+    }
   }
 }
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java
index 29c317e7e..c09ac0823 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java
@@ -29,6 +29,7 @@ import org.apache.celeborn.common.internal.config.ConfigEntry;
 import org.apache.celeborn.common.quota.Quota;
 import org.apache.celeborn.common.quota.UserTrafficQuota;
 import org.apache.celeborn.common.quota.WorkerTrafficQuota;
+import org.apache.celeborn.common.tags.WorkerTagsMeta;
 import org.apache.celeborn.common.util.Utils;
 
 /**
@@ -60,11 +61,8 @@ public abstract class DynamicConfig {
     }
   }
 
-  public <T> T getValue(
-      String configKey,
-      ConfigEntry<Object> configEntry,
-      Class<T> finalType,
-      ConfigType configType) {
+  public <T, V> T getValue(
+      String configKey, ConfigEntry<V> configEntry, Class<T> finalType, 
ConfigType configType) {
     String configValue = configs.get(configKey);
     T formatValue =
         configValue != null ? formatValue(configKey, configValue, finalType, 
configType) : null;
@@ -169,6 +167,20 @@ public abstract class DynamicConfig {
             ConfigType.BYTES));
   }
 
+  public WorkerTagsMeta getWorkerTagsMeta() {
+    return new WorkerTagsMeta(
+        getValue(
+            CelebornConf.TAGS_EXPR().key(),
+            CelebornConf.TAGS_EXPR(),
+            String.class,
+            ConfigType.STRING),
+        getValue(
+            CelebornConf.PREFER_CLIENT_TAGS_EXPR().key(),
+            CelebornConf.PREFER_CLIENT_TAGS_EXPR(),
+            Boolean.TYPE,
+            ConfigType.STRING));
+  }
+
   public Map<String, String> getConfigs() {
     return configs;
   }
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java
index 23c28d47f..294fa1514 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java
@@ -38,11 +38,8 @@ public class SystemConfig extends DynamicConfig {
   }
 
   @Override
-  public <T> T getValue(
-      String configKey,
-      ConfigEntry<Object> configEntry,
-      Class<T> finalType,
-      ConfigType configType) {
+  public <T, V> T getValue(
+      String configKey, ConfigEntry<V> configEntry, Class<T> finalType, 
ConfigType configType) {
     String configValue = configs.get(configKey);
     T formatValue =
         configValue != null ? formatValue(configKey, configValue, finalType, 
configType) : null;

Reply via email to