This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 64dd84717 [#1110] improvement(coordinator): introduce pluggable remote 
storage config format (#1329)
64dd84717 is described below

commit 64dd847177709d971a82b34afd4806649404cf75
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Nov 30 12:14:00 2023 +0800

    [#1110] improvement(coordinator): introduce pluggable remote storage config 
format (#1329)
    
    ### What changes were proposed in this pull request?
    
    Introduce the new `DynamicClientConfigService` to provide the pluggable 
reader for config file multi format.
    And it also is compatible with the original dynamic config format.
    
    This PR introduces the new yaml parser, which is recommended in the future, 
the storage's hadoop conf could be xml style or k1:v1 . The example as follows.
    
    ```yaml
    rssClientConf:
      k1: v1
      k2: v2
    
    remoteStorageInfos:
      hdfs://a-ns01: |+
        <configuration>
            <property>
              <name>k1</name>
              <value>v1</value>
            </property>
    
            <property>
              <name>k2</name>
              <value>v2</value>
            </property>
        </configuration>
    
      hdfs://x-ns01:
        k1: v1
        k2: v2
    ```
    
    ### Why are the changes needed?
    
    Fix: #1110
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    1. UTs
---
 coordinator/pom.xml                                |   4 +
 .../uniffle/coordinator/ApplicationManager.java    | 104 ++++---
 .../uniffle/coordinator/ClientConfManager.java     | 159 ----------
 .../uniffle/coordinator/CoordinatorConf.java       |   7 +
 .../coordinator/CoordinatorGrpcService.java        |   6 +-
 .../uniffle/coordinator/CoordinatorServer.java     |  20 +-
 .../uniffle/coordinator/conf/ClientConf.java       |  39 +--
 .../uniffle/coordinator/conf/ClientConfParser.java |  28 +-
 .../coordinator/conf/DynamicClientConfService.java | 171 +++++++++++
 .../coordinator/conf/LegacyClientConfParser.java   | 104 +++++++
 .../coordinator/conf/YamlClientConfParser.java     |  77 +++++
 .../uniffle/coordinator/ClientConfManagerTest.java | 329 ---------------------
 .../conf/DynamicClientConfServiceTest.java         |  94 +++---
 .../conf/LegacyClientConfParserTest.java           |  29 +-
 .../coordinator/conf/YamlClientConfParserTest.java | 112 +++++++
 .../src/test/resources/dynamicClientConf.legacy    |  23 ++
 .../src/test/resources/dynamicClientConf.yaml      |  41 +++
 ...ava => DynamicClientConfServiceHadoopTest.java} |  27 +-
 ...micClientConfServiceKerberlizedHadoopTest.java} |   6 +-
 pom.xml                                            |   6 +
 20 files changed, 735 insertions(+), 651 deletions(-)

diff --git a/coordinator/pom.xml b/coordinator/pom.xml
index 7707871df..a1c9cccab 100644
--- a/coordinator/pom.xml
+++ b/coordinator/pom.xml
@@ -86,6 +86,10 @@
       <groupId>org.mockito</groupId>
       <artifactId>mockito-inline</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.yaml</groupId>
+      <artifactId>snakeyaml</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 626425235..92f780341 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.coordinator;
 import java.io.Closeable;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -34,6 +35,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.Range;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -41,16 +44,16 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.Application;
 import org.apache.uniffle.common.RemoteStorageInfo;
-import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker;
+import org.apache.uniffle.coordinator.conf.ClientConf;
+import org.apache.uniffle.coordinator.conf.LegacyClientConfParser;
 import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
 import 
org.apache.uniffle.coordinator.strategy.storage.AppBalanceSelectStorageStrategy;
 import 
org.apache.uniffle.coordinator.strategy.storage.LowestIOSampleCostSelectStorageStrategy;
 import org.apache.uniffle.coordinator.strategy.storage.RankValue;
 import org.apache.uniffle.coordinator.strategy.storage.SelectStorageStrategy;
-import org.apache.uniffle.coordinator.util.CoordinatorUtils;
 
 public class ApplicationManager implements Closeable {
 
@@ -152,48 +155,69 @@ public class ApplicationManager implements Closeable {
     }
   }
 
-  public void refreshRemoteStorage(String remoteStoragePath, String 
remoteStorageConf) {
-    if (!StringUtils.isEmpty(remoteStoragePath)) {
-      LOG.info("Refresh remote storage with {} {}", remoteStoragePath, 
remoteStorageConf);
-      Set<String> paths = 
Sets.newHashSet(remoteStoragePath.split(Constants.COMMA_SPLIT_CHAR));
-      Map<String, Map<String, String>> confKVs =
-          CoordinatorUtils.extractRemoteStorageConf(remoteStorageConf);
-      // add remote path if not exist
-      for (String path : paths) {
-        if (!availableRemoteStorageInfo.containsKey(path)) {
-          remoteStoragePathRankValue.computeIfAbsent(
-              path,
-              key -> {
-                // refreshRemoteStorage is designed without multiple thread 
problem
-                // metrics shouldn't be added duplicated
-                addRemoteStorageMetrics(path);
-                return new RankValue(0);
-              });
-        }
-        String storageHost = getStorageHost(path);
-        RemoteStorageInfo rsInfo =
-            new RemoteStorageInfo(path, confKVs.getOrDefault(storageHost, 
Maps.newHashMap()));
-        availableRemoteStorageInfo.put(path, rsInfo);
-      }
-      // remove unused remote path if exist
-      List<String> unusedPath = Lists.newArrayList();
-      for (String existPath : availableRemoteStorageInfo.keySet()) {
-        if (!paths.contains(existPath)) {
-          unusedPath.add(existPath);
-        }
-      }
-      // remote unused path
-      for (String path : unusedPath) {
-        availableRemoteStorageInfo.remove(path);
-        // try to remove if counter = 0, or it will be removed in 
decRemoteStorageCounter() later
-        removePathFromCounter(path);
-      }
-    } else {
-      LOG.info("Refresh remote storage with empty value {}", 
remoteStoragePath);
+  public void refreshRemoteStorages(ClientConf dynamicClientConf) {
+    if (dynamicClientConf == null || 
MapUtils.isEmpty(dynamicClientConf.getRemoteStorageInfos())) {
+      LOG.info("Refresh remote storage with empty config");
       for (String path : availableRemoteStorageInfo.keySet()) {
         removePathFromCounter(path);
       }
       availableRemoteStorageInfo.clear();
+      return;
+    }
+
+    Map<String, RemoteStorageInfo> remoteStorageInfoMap = 
dynamicClientConf.getRemoteStorageInfos();
+    LOG.info("Refresh remote storage with {}", remoteStorageInfoMap);
+
+    for (Map.Entry<String, RemoteStorageInfo> entry : 
remoteStorageInfoMap.entrySet()) {
+      String path = entry.getKey();
+      RemoteStorageInfo rsInfo = entry.getValue();
+
+      if (!availableRemoteStorageInfo.containsKey(path)) {
+        remoteStoragePathRankValue.computeIfAbsent(
+            path,
+            key -> {
+              // refreshRemoteStorage is designed without multiple thread 
problem
+              // metrics shouldn't be added duplicated
+              addRemoteStorageMetrics(path);
+              return new RankValue(0);
+            });
+      }
+      availableRemoteStorageInfo.put(path, rsInfo);
+    }
+    // remove unused remote path if exist
+    List<String> unusedPath = Lists.newArrayList();
+    for (String existPath : availableRemoteStorageInfo.keySet()) {
+      if (!remoteStorageInfoMap.containsKey(existPath)) {
+        unusedPath.add(existPath);
+      }
+    }
+    // remote unused path
+    for (String path : unusedPath) {
+      availableRemoteStorageInfo.remove(path);
+      // try to remove if counter = 0, or it will be removed in 
decRemoteStorageCounter() later
+      removePathFromCounter(path);
+    }
+  }
+
+  // only for test
+  @VisibleForTesting
+  public void refreshRemoteStorage(String remoteStoragePath, String 
remoteStorageConf) {
+    try {
+      LegacyClientConfParser parser = new LegacyClientConfParser();
+
+      String remoteStorageConfRaw =
+          String.format(
+              "%s %s %n %s %s",
+              CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
+              remoteStoragePath,
+              CoordinatorConf.COORDINATOR_REMOTE_STORAGE_CLUSTER_CONF.key(),
+              remoteStorageConf);
+
+      ClientConf conf =
+          parser.tryParse(IOUtils.toInputStream(remoteStorageConfRaw, 
StandardCharsets.UTF_8));
+      refreshRemoteStorages(conf);
+    } catch (Exception e) {
+      LOG.error("Errors on refreshing remote storage", e);
     }
   }
 
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
deleted file mode 100644
index d77badfb5..000000000
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.coordinator;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
-import org.apache.uniffle.common.util.JavaUtils;
-import org.apache.uniffle.common.util.ThreadUtils;
-
-public class ClientConfManager implements Closeable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ClientConfManager.class);
-
-  private Map<String, String> clientConf = JavaUtils.newConcurrentMap();
-  private final AtomicLong lastCandidatesUpdateMS = new AtomicLong(0L);
-  private Path path;
-  private ScheduledExecutorService updateClientConfSES = null;
-  private FileSystem fileSystem;
-  private static final String WHITESPACE_REGEX = "\\s+";
-  private ApplicationManager applicationManager;
-
-  public ClientConfManager(
-      CoordinatorConf conf, Configuration hadoopConf, ApplicationManager 
applicationManager)
-      throws Exception {
-    if 
(conf.getBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED)) {
-      this.applicationManager = applicationManager;
-      init(conf, hadoopConf);
-    }
-  }
-
-  private void init(CoordinatorConf conf, Configuration hadoopConf) throws 
Exception {
-    String pathStr = 
conf.get(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH);
-    this.path = new Path(pathStr);
-
-    this.fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
-
-    if (!fileSystem.isFile(path)) {
-      String msg = String.format("Fail to init ClientConfManager, %s is not a 
file.", path.toUri());
-      LOG.error(msg);
-      throw new IllegalStateException(msg);
-    }
-    updateClientConfInternal();
-    LOG.info("Load client conf from {} successfully", pathStr);
-
-    int updateIntervalS =
-        
conf.getInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC);
-    updateClientConfSES = 
ThreadUtils.getDaemonSingleThreadScheduledExecutor("ClientConfManager");
-    updateClientConfSES.scheduleAtFixedRate(
-        this::updateClientConf, 0, updateIntervalS, TimeUnit.SECONDS);
-  }
-
-  private void updateClientConf() {
-    try {
-      FileStatus[] fileStatus = fileSystem.listStatus(path);
-      if (!ArrayUtils.isEmpty(fileStatus)) {
-        long modifiedMS = fileStatus[0].getModificationTime();
-        if (lastCandidatesUpdateMS.get() != modifiedMS) {
-          updateClientConfInternal();
-          lastCandidatesUpdateMS.set(modifiedMS);
-          LOG.info("Update client conf from {} successfully.", path);
-        }
-      } else {
-        LOG.warn("Client conf file not found with {}", path);
-      }
-    } catch (Exception e) {
-      LOG.warn("Error when update client conf with {}.", path, e);
-    }
-  }
-
-  private void updateClientConfInternal() {
-    Map<String, String> newClientConf = JavaUtils.newConcurrentMap();
-    String content = loadClientConfContent();
-    if (StringUtils.isEmpty(content)) {
-      clientConf = newClientConf;
-      LOG.warn("Load empty content from {}, ignore this updating.", 
path.toUri().toString());
-      return;
-    }
-
-    boolean hasRemoteStorageConf = false;
-    String remoteStoragePath = "";
-    String remoteStorageConf = "";
-    for (String item : content.split(IOUtils.LINE_SEPARATOR_UNIX)) {
-      String confItem = item.trim();
-      if (!StringUtils.isEmpty(confItem)) {
-        String[] confKV = confItem.split(WHITESPACE_REGEX);
-        if (confKV.length == 2) {
-          if 
(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key().equals(confKV[0])) {
-            hasRemoteStorageConf = true;
-            remoteStoragePath = confKV[1];
-          } else if (CoordinatorConf.COORDINATOR_REMOTE_STORAGE_CLUSTER_CONF
-              .key()
-              .equals(confKV[0])) {
-            remoteStorageConf = confKV[1];
-          } else {
-            newClientConf.put(confKV[0], confKV[1]);
-          }
-        }
-      }
-    }
-    if (hasRemoteStorageConf) {
-      applicationManager.refreshRemoteStorage(remoteStoragePath, 
remoteStorageConf);
-    }
-
-    clientConf = newClientConf;
-  }
-
-  private String loadClientConfContent() {
-    String content = null;
-    try (FSDataInputStream in = fileSystem.open(path)) {
-      content = IOUtils.toString(in, StandardCharsets.UTF_8);
-    } catch (IOException e) {
-      LOG.error("Fail to load content from {}", path.toUri().toString());
-    }
-    return content;
-  }
-
-  public Map<String, String> getClientConf() {
-    return clientConf;
-  }
-
-  @Override
-  public void close() {
-    if (updateClientConfSES != null) {
-      updateClientConfSES.shutdownNow();
-    }
-  }
-}
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index 466dda8b2..7e72036c1 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -23,6 +23,7 @@ import org.apache.uniffle.common.config.ConfigOption;
 import org.apache.uniffle.common.config.ConfigOptions;
 import org.apache.uniffle.common.config.ConfigUtils;
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.coordinator.conf.ClientConfParser;
 import 
org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy;
 import 
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
 
@@ -129,6 +130,12 @@ public class CoordinatorConf extends RssBaseConf {
           .stringType()
           .noDefaultValue()
           .withDescription("dynamic client conf of this cluster");
+  public static final ConfigOption<ClientConfParser.Parser>
+      COORDINATOR_DYNAMIC_CLIENT_RAW_CONF_PARSER =
+          ConfigOptions.key("rss.coordinator.dynamicClientConf.parser")
+              .enumType(ClientConfParser.Parser.class)
+              .defaultValue(ClientConfParser.Parser.LEGACY)
+              .withDescription("dynamic client conf parser");
   public static final ConfigOption<String> COORDINATOR_REMOTE_STORAGE_PATH =
       ConfigOptions.key("rss.coordinator.remote.storage.path")
           .stringType()
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index dffc3ff16..e5bc88fcb 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -38,6 +38,7 @@ import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.storage.StorageInfoUtils;
 import org.apache.uniffle.coordinator.access.AccessCheckResult;
 import org.apache.uniffle.coordinator.access.AccessInfo;
+import org.apache.uniffle.coordinator.conf.DynamicClientConfService;
 import 
org.apache.uniffle.coordinator.strategy.assignment.PartitionRangeAssignment;
 import org.apache.uniffle.coordinator.util.CoordinatorUtils;
 import org.apache.uniffle.proto.CoordinatorServerGrpc;
@@ -307,8 +308,9 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
             .getCoordinatorConf()
             
.getBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED);
     if (dynamicConfEnabled) {
-      ClientConfManager clientConfManager = 
coordinatorServer.getClientConfManager();
-      for (Map.Entry<String, String> kv : 
clientConfManager.getClientConf().entrySet()) {
+      DynamicClientConfService dynamicClientConfService =
+          coordinatorServer.getDynamicClientConfService();
+      for (Map.Entry<String, String> kv : 
dynamicClientConfService.getRssClientConf().entrySet()) {
         builder.addClientConf(
             
ClientConfItem.newBuilder().setKey(kv.getKey()).setValue(kv.getValue()).build());
       }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index befbb550b..c64156b65 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -17,6 +17,8 @@
 
 package org.apache.uniffle.coordinator;
 
+import java.util.function.Consumer;
+
 import io.prometheus.client.CollectorRegistry;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
@@ -37,6 +39,8 @@ import 
org.apache.uniffle.common.security.SecurityContextFactory;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.web.CoalescedCollectorRegistry;
 import org.apache.uniffle.common.web.JettyServer;
+import org.apache.uniffle.coordinator.conf.ClientConf;
+import org.apache.uniffle.coordinator.conf.DynamicClientConfService;
 import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
 import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
 import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy;
@@ -59,7 +63,7 @@ public class CoordinatorServer extends ReconfigurableBase {
   private ServerInterface server;
   private ClusterManager clusterManager;
   private AssignmentStrategy assignmentStrategy;
-  private ClientConfManager clientConfManager;
+  private DynamicClientConfService dynamicClientConfService;
   private AccessManager accessManager;
   private ApplicationManager applicationManager;
   private GRPCMetrics grpcMetrics;
@@ -133,8 +137,8 @@ public class CoordinatorServer extends ReconfigurableBase {
     if (accessManager != null) {
       accessManager.close();
     }
-    if (clientConfManager != null) {
-      clientConfManager.close();
+    if (dynamicClientConfService != null) {
+      dynamicClientConfService.close();
     }
     if (metricReporter != null) {
       metricReporter.stop();
@@ -177,7 +181,11 @@ public class CoordinatorServer extends ReconfigurableBase {
         new ClusterManagerFactory(coordinatorConf, hadoopConf);
 
     this.clusterManager = clusterManagerFactory.getClusterManager();
-    this.clientConfManager = new ClientConfManager(coordinatorConf, 
hadoopConf, applicationManager);
+    this.dynamicClientConfService =
+        new DynamicClientConfService(
+            coordinatorConf,
+            hadoopConf,
+            new Consumer[] {(Consumer<ClientConf>) 
applicationManager::refreshRemoteStorages});
     AssignmentStrategyFactory assignmentStrategyFactory =
         new AssignmentStrategyFactory(coordinatorConf, clusterManager);
     this.assignmentStrategy = 
assignmentStrategyFactory.getAssignmentStrategy();
@@ -247,8 +255,8 @@ public class CoordinatorServer extends ReconfigurableBase {
     return accessManager;
   }
 
-  public ClientConfManager getClientConfManager() {
-    return clientConfManager;
+  public DynamicClientConfService getDynamicClientConfService() {
+    return dynamicClientConfService;
   }
 
   public GRPCMetrics getGrpcMetrics() {
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHadoopTest.java
 b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/ClientConf.java
similarity index 50%
copy from 
integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHadoopTest.java
copy to 
coordinator/src/main/java/org/apache/uniffle/coordinator/conf/ClientConf.java
index 6e06a3ff9..111679d5d 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHadoopTest.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/ClientConf.java
@@ -15,28 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.test;
+package org.apache.uniffle.coordinator.conf;
 
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import java.util.Map;
 
-import org.apache.uniffle.common.KerberizedHadoopBase;
+import org.apache.uniffle.common.RemoteStorageInfo;
 
-public class ClientConfManagerKerberlizedHadoopTest extends 
KerberizedHadoopBase {
+/**
+ * This class is to hold the dynamic conf, which includes the rss conf for the 
client and the remote
+ * storage hadoop configs.
+ */
+public class ClientConf {
+  private Map<String, String> rssClientConf;
+
+  // key:remote-path, val: storage-conf
+  private Map<String, RemoteStorageInfo> remoteStorageInfos;
+
+  public ClientConf(
+      Map<String, String> rssClientConf, Map<String, RemoteStorageInfo> 
remoteStorageInfos) {
+    this.rssClientConf = rssClientConf;
+    this.remoteStorageInfos = remoteStorageInfos;
+  }
 
-  @BeforeAll
-  public static void beforeAll() throws Exception {
-    testRunner = ClientConfManagerKerberlizedHadoopTest.class;
-    KerberizedHadoopBase.init();
+  public Map<String, String> getRssClientConf() {
+    return rssClientConf;
   }
 
-  @Test
-  public void testConfInHadoop() throws Exception {
-    String cfgFile = kerberizedHadoop.getSchemeAndAuthorityPrefix() + 
"/test/client_conf";
-    ClientConfManagerHadoopTest.createAndRunClientConfManagerCases(
-        kerberizedHadoop.getSchemeAndAuthorityPrefix(),
-        cfgFile,
-        kerberizedHadoop.getFileSystem(),
-        kerberizedHadoop.getConf());
+  public Map<String, RemoteStorageInfo> getRemoteStorageInfos() {
+    return remoteStorageInfos;
   }
 }
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHadoopTest.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/ClientConfParser.java
similarity index 50%
copy from 
integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHadoopTest.java
copy to 
coordinator/src/main/java/org/apache/uniffle/coordinator/conf/ClientConfParser.java
index 6e06a3ff9..2de7346c9 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHadoopTest.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/ClientConfParser.java
@@ -15,28 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.test;
+package org.apache.uniffle.coordinator.conf;
 
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import java.io.InputStream;
 
-import org.apache.uniffle.common.KerberizedHadoopBase;
-
-public class ClientConfManagerKerberlizedHadoopTest extends 
KerberizedHadoopBase {
-
-  @BeforeAll
-  public static void beforeAll() throws Exception {
-    testRunner = ClientConfManagerKerberlizedHadoopTest.class;
-    KerberizedHadoopBase.init();
+public interface ClientConfParser {
+  enum Parser {
+    YAML,
+    LEGACY,
+    MIXED
   }
 
-  @Test
-  public void testConfInHadoop() throws Exception {
-    String cfgFile = kerberizedHadoop.getSchemeAndAuthorityPrefix() + 
"/test/client_conf";
-    ClientConfManagerHadoopTest.createAndRunClientConfManagerCases(
-        kerberizedHadoop.getSchemeAndAuthorityPrefix(),
-        cfgFile,
-        kerberizedHadoop.getFileSystem(),
-        kerberizedHadoop.getConf());
-  }
+  ClientConf tryParse(InputStream fileInputStream) throws Exception;
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/DynamicClientConfService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/DynamicClientConfService.java
new file mode 100644
index 000000000..3b8ae7bc1
--- /dev/null
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/DynamicClientConfService.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.conf;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+
+public class DynamicClientConfService implements Closeable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DynamicClientConfService.class);
+
+  private Path confStoredPath;
+  private FileSystem fileSystem;
+
+  private ClientConfParser[] parsers;
+
+  private final AtomicLong latestModificationMS = new AtomicLong(0L);
+  private ScheduledExecutorService updateClientConfExecutor = null;
+
+  private final Object clientConfLock = new Object();
+  private ClientConf clientConf = null;
+
+  private Consumer<ClientConf>[] callbacks;
+
+  public DynamicClientConfService(CoordinatorConf coordinatorConf, 
Configuration hadoopConf)
+      throws Exception {
+    this(coordinatorConf, hadoopConf, new Consumer[0]);
+  }
+
+  public DynamicClientConfService(
+      CoordinatorConf coordinatorConf, Configuration hadoopConf, 
Consumer<ClientConf>[] callbacks)
+      throws Exception {
+    if 
(!coordinatorConf.getBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED))
 {
+      return;
+    }
+
+    this.callbacks = callbacks;
+
+    String clientConfStoredRawPath =
+        
coordinatorConf.getString(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH);
+    this.confStoredPath = new Path(clientConfStoredRawPath);
+    this.fileSystem = HadoopFilesystemProvider.getFilesystem(confStoredPath, 
hadoopConf);
+
+    ClientConfParser.Parser parserType =
+        
coordinatorConf.get(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_RAW_CONF_PARSER);
+    ClientConfParser[] confParsers;
+    switch (parserType) {
+      case YAML:
+        confParsers = new ClientConfParser[] {new YamlClientConfParser()};
+        break;
+      case LEGACY:
+        confParsers = new ClientConfParser[] {new LegacyClientConfParser()};
+        break;
+      case MIXED:
+      default:
+        confParsers =
+            new ClientConfParser[] {new LegacyClientConfParser(), new 
YamlClientConfParser()};
+        break;
+    }
+    this.parsers = confParsers;
+
+    if (!fileSystem.isFile(confStoredPath)) {
+      String msg = String.format("Fail to init, %s is not a file.", 
confStoredPath.toUri());
+      LOGGER.error(msg);
+      throw new IllegalStateException(msg);
+    }
+    refreshClientConf();
+    LOGGER.info("Load client conf from {} successfully", confStoredPath);
+
+    int updateIntervalSec =
+        coordinatorConf.getInteger(
+            
CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC);
+    updateClientConfExecutor =
+        
ThreadUtils.getDaemonSingleThreadScheduledExecutor(this.getClass().getSimpleName());
+    updateClientConfExecutor.scheduleAtFixedRate(
+        this::refreshClientConf, 0, updateIntervalSec, TimeUnit.SECONDS);
+  }
+
+  private void refreshClientConf() {
+    try {
+      FileStatus[] fileStatus = fileSystem.listStatus(confStoredPath);
+      if (ArrayUtils.isNotEmpty(fileStatus)) {
+        long modifiedMS = fileStatus[0].getModificationTime();
+        if (latestModificationMS.get() != modifiedMS) {
+          doRefreshClientConf();
+          latestModificationMS.set(modifiedMS);
+          Arrays.stream(callbacks).forEach(x -> x.accept(clientConf));
+          LOGGER.info("Update client conf from {} successfully.", 
confStoredPath);
+        }
+      } else {
+        LOGGER.warn("Client conf file not found with {}", confStoredPath);
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Error when update client conf with {}.", confStoredPath, e);
+    }
+  }
+
+  private void doRefreshClientConf() throws Exception {
+    try (FSDataInputStream in = fileSystem.open(confStoredPath)) {
+      for (ClientConfParser parser : parsers) {
+        try {
+          ClientConf conf = parser.tryParse(in);
+          synchronized (clientConfLock) {
+            this.clientConf = conf;
+          }
+          return;
+        } catch (Exception e) {
+          // ignore
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Fail to refresh client conf from {}", 
confStoredPath.toUri().toString());
+      return;
+    }
+
+    throw new Exception("Unknown format of clientConf file.");
+  }
+
+  public Map<String, String> getRssClientConf() {
+    synchronized (clientConfLock) {
+      return clientConf.getRssClientConf();
+    }
+  }
+
+  public Map<String, RemoteStorageInfo> listRemoteStorageInfos() {
+    synchronized (clientConfLock) {
+      return clientConf.getRemoteStorageInfos();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (updateClientConfExecutor != null) {
+      updateClientConfExecutor.shutdownNow();
+    }
+  }
+}
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/LegacyClientConfParser.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/LegacyClientConfParser.java
new file mode 100644
index 000000000..a199ed8ec
--- /dev/null
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/LegacyClientConfParser.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.conf;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
+
+public class LegacyClientConfParser implements ClientConfParser {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LegacyClientConfParser.class);
+  private static final String WHITESPACE_REGEX = "\\s+";
+
+  @Override
+  public ClientConf tryParse(InputStream fileInputStream) throws Exception {
+    String content = IOUtils.toString(fileInputStream, StandardCharsets.UTF_8);
+
+    String remoteStoragePath = "";
+    String remoteStorageConf = "";
+
+    Map<String, String> rssClientConf = new HashMap<>();
+
+    for (String item : content.split(IOUtils.LINE_SEPARATOR_UNIX)) {
+      String confItem = item.trim();
+      if (StringUtils.isNotEmpty(confItem)) {
+        String[] confKV = confItem.split(WHITESPACE_REGEX);
+        if (confKV.length == 2) {
+          if 
(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key().equals(confKV[0])) {
+            remoteStoragePath = confKV[1];
+          } else if (CoordinatorConf.COORDINATOR_REMOTE_STORAGE_CLUSTER_CONF
+              .key()
+              .equals(confKV[0])) {
+            remoteStorageConf = confKV[1];
+          } else {
+            rssClientConf.put(confKV[0], confKV[1]);
+          }
+        }
+      }
+    }
+
+    Map<String, RemoteStorageInfo> storageInfoMap =
+        parseRemoteStorageInfos(remoteStoragePath, remoteStorageConf);
+
+    return new ClientConf(rssClientConf, storageInfoMap);
+  }
+
+  private Map<String, RemoteStorageInfo> parseRemoteStorageInfos(
+      String remoteStoragePath, String remoteStorageConf) {
+    if (StringUtils.isNotEmpty(remoteStoragePath)) {
+      LOG.info("Parsing remote storage with {} {}", remoteStoragePath, 
remoteStorageConf);
+
+      Set<String> paths = 
Sets.newHashSet(remoteStoragePath.split(Constants.COMMA_SPLIT_CHAR));
+      Map<String, Map<String, String>> confKVs =
+          CoordinatorUtils.extractRemoteStorageConf(remoteStorageConf);
+
+      Map<String, RemoteStorageInfo> remoteStorageInfoMap = new HashMap<>();
+      for (String path : paths) {
+        try {
+          URI uri = new URI(path);
+          String host = uri.getHost();
+          Map<String, String> kvs = confKVs.get(host);
+          remoteStorageInfoMap.put(
+              path, kvs == null ? new RemoteStorageInfo(path) : new 
RemoteStorageInfo(path, kvs));
+        } catch (URISyntaxException e) {
+          LOG.warn("The remote storage path: {} is illegal. Ignore this 
storage", path);
+        }
+      }
+
+      return remoteStorageInfoMap;
+    }
+
+    return Collections.emptyMap();
+  }
+}
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/YamlClientConfParser.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/YamlClientConfParser.java
new file mode 100644
index 000000000..84841a3a6
--- /dev/null
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/YamlClientConfParser.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.conf;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.yaml.snakeyaml.Yaml;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+
+/** The conf will be stored in the yaml format file. */
+public class YamlClientConfParser implements ClientConfParser {
+  private static final String RSS_CLIENT_CONF_KEY = "rssClientConf";
+  private static final String REMOTE_STORAGE_INFOS_KEY = "remoteStorageInfos";
+
+  @Override
+  public ClientConf tryParse(InputStream fileInputStream) throws Exception {
+    Yaml yaml = new Yaml();
+    Map<Object, Object> data = yaml.load(IOUtils.toString(fileInputStream, 
StandardCharsets.UTF_8));
+
+    Object rssClientConfRaw = data.get(RSS_CLIENT_CONF_KEY);
+    Map<String, String> rssConfKVs =
+        rssClientConfRaw == null ? Collections.emptyMap() : 
parseKVItems(rssClientConfRaw);
+
+    Map<String, Object> remoteStorageInfosRaw =
+        (Map<String, Object>) data.getOrDefault(REMOTE_STORAGE_INFOS_KEY, 
Collections.emptyMap());
+
+    Map<String, RemoteStorageInfo> remoteStorageInfoMap = new HashMap<>();
+    for (Map.Entry<String, Object> entry : remoteStorageInfosRaw.entrySet()) {
+      String remotePath = entry.getKey();
+      Map<String, String> kvs = parseKVItems(entry.getValue());
+      remoteStorageInfoMap.put(remotePath, new RemoteStorageInfo(remotePath, 
kvs));
+    }
+
+    return new ClientConf(rssConfKVs, remoteStorageInfoMap);
+  }
+
+  private Map<String, String> parseKVItems(Object confRaw) throws Exception {
+    if (confRaw instanceof Map) {
+      return (Map<String, String>) confRaw;
+    }
+
+    // todo: currently only xml format is supported
+    if (confRaw instanceof String) {
+      Configuration conf = new Configuration(false);
+      conf.addResource(IOUtils.toInputStream((String) confRaw));
+      Map<String, String> kvs = new HashMap<>();
+      for (Map.Entry<String, String> entry : conf) {
+        kvs.put(entry.getKey(), entry.getValue());
+      }
+      return kvs;
+    }
+
+    throw new Exception("No such supported format, only can be 'key : val' or 
xml.");
+  }
+}
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
deleted file mode 100644
index bce3319ba..000000000
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.coordinator;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.PrintWriter;
-import java.nio.file.Files;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import org.apache.uniffle.common.RemoteStorageInfo;
-import org.apache.uniffle.common.util.Constants;
-import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
-import 
org.apache.uniffle.coordinator.strategy.storage.LowestIOSampleCostSelectStorageStrategy;
-
-import static 
org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class ClientConfManagerTest {
-
-  @BeforeEach
-  public void setUp() {
-    CoordinatorMetrics.register();
-  }
-
-  @AfterEach
-  public void clear() {
-    CoordinatorMetrics.clear();
-  }
-
-  @Test
-  public void test(@TempDir File tempDir) throws Exception {
-    File cfgFile = File.createTempFile("tmp", ".conf", tempDir);
-    final String cfgFileName = cfgFile.getAbsolutePath();
-    final String filePath =
-        
Objects.requireNonNull(getClass().getClassLoader().getResource("coordinator.conf"))
-            .getFile();
-    CoordinatorConf conf = new CoordinatorConf(filePath);
-    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, 
tempDir.toURI().toString());
-    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true);
-    ApplicationManager applicationManager = new ApplicationManager(conf);
-
-    // file load checking at startup
-    Exception expectedException = null;
-    try {
-      new ClientConfManager(conf, new Configuration(), applicationManager);
-    } catch (RuntimeException e) {
-      expectedException = e;
-    }
-    assertNotNull(expectedException);
-    assertTrue(expectedException.getMessage().endsWith("is not a file."));
-
-    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, 
cfgFile.toURI().toString());
-    ClientConfManager clientConfManager =
-        new ClientConfManager(conf, new Configuration(), applicationManager);
-    assertEquals(0, clientConfManager.getClientConf().size());
-
-    FileWriter fileWriter = new FileWriter(cfgFile);
-    PrintWriter printWriter = new PrintWriter(fileWriter);
-    printWriter.println("spark.mock.1 abc");
-    printWriter.println(" spark.mock.2   123 ");
-    printWriter.println("spark.mock.3 true  ");
-    printWriter.flush();
-    printWriter.close();
-    // load config at the beginning
-    clientConfManager = new ClientConfManager(conf, new Configuration(), 
applicationManager);
-    Thread.sleep(1200);
-    Map<String, String> clientConf = clientConfManager.getClientConf();
-    assertEquals("abc", clientConf.get("spark.mock.1"));
-    assertEquals("123", clientConf.get("spark.mock.2"));
-    assertEquals("true", clientConf.get("spark.mock.3"));
-    assertEquals(3, clientConf.size());
-
-    // ignore empty or wrong content
-    printWriter.println("");
-    printWriter.flush();
-    printWriter.close();
-    Thread.sleep(1300);
-    assertTrue(cfgFile.exists());
-    clientConf = clientConfManager.getClientConf();
-    assertEquals("abc", clientConf.get("spark.mock.1"));
-    assertEquals("123", clientConf.get("spark.mock.2"));
-    assertEquals("true", clientConf.get("spark.mock.3"));
-    assertEquals(3, clientConf.size());
-
-    // the config will not be changed when the conf file is deleted
-    assertTrue(cfgFile.delete());
-    Thread.sleep(1300);
-    assertFalse(cfgFile.exists());
-    clientConf = clientConfManager.getClientConf();
-    assertEquals("abc", clientConf.get("spark.mock.1"));
-    assertEquals("123", clientConf.get("spark.mock.2"));
-    assertEquals("true", clientConf.get("spark.mock.3"));
-    assertEquals(3, clientConf.size());
-
-    // the normal update config process, move the new conf file to the old one
-    File cfgFileTmp = new File(cfgFileName + ".tmp");
-    fileWriter = new FileWriter(cfgFileTmp);
-    printWriter = new PrintWriter(fileWriter);
-    printWriter.println("spark.mock.4 deadbeaf");
-    printWriter.println("spark.mock.5 9527");
-    printWriter.println("spark.mock.6 9527 3423");
-    printWriter.println("spark.mock.7");
-    printWriter.close();
-    FileUtils.moveFile(cfgFileTmp, cfgFile);
-    Thread.sleep(1200);
-    clientConf = clientConfManager.getClientConf();
-    assertEquals("deadbeaf", clientConf.get("spark.mock.4"));
-    assertEquals("9527", clientConf.get("spark.mock.5"));
-    assertEquals(2, clientConf.size());
-    assertFalse(clientConf.containsKey("spark.mock.6"));
-    assertFalse(clientConf.containsKey("spark.mock.7"));
-    clientConfManager.close();
-  }
-
-  @Test
-  public void dynamicRemoteByAppNumStrategyStorageTest() throws Exception {
-    int updateIntervalSec = 2;
-    final String remotePath1 = "hdfs://host1/path1";
-    final String remotePath2 = "hdfs://host2/path2";
-    final String remotePath3 = "hdfs://host3/path3";
-    File cfgFile = Files.createTempFile("dynamicRemoteStorageTest", 
".conf").toFile();
-    cfgFile.deleteOnExit();
-    writeRemoteStorageConf(cfgFile, remotePath1);
-
-    CoordinatorConf conf = new CoordinatorConf();
-    conf.set(
-        CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, 
updateIntervalSec);
-    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, 
cfgFile.toURI().toString());
-    conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 
60000);
-    
conf.setInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES,
 1);
-    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true);
-    ApplicationManager applicationManager = new ApplicationManager(conf);
-
-    final ClientConfManager clientConfManager =
-        new ClientConfManager(conf, new Configuration(), applicationManager);
-    applicationManager.getSelectStorageStrategy().detectStorage();
-    Set<String> expectedAvailablePath = Sets.newHashSet(remotePath1);
-    assertEquals(
-        expectedAvailablePath, 
applicationManager.getAvailableRemoteStorageInfo().keySet());
-    RemoteStorageInfo remoteStorageInfo = 
applicationManager.pickRemoteStorage("testAppId1");
-    assertEquals(remotePath1, remoteStorageInfo.getPath());
-    assertTrue(remoteStorageInfo.getConfItems().isEmpty());
-
-    writeRemoteStorageConf(cfgFile, remotePath3);
-    expectedAvailablePath = Sets.newHashSet(remotePath3);
-    waitForUpdate(expectedAvailablePath, applicationManager);
-    applicationManager.getSelectStorageStrategy().detectStorage();
-    remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId2");
-    assertEquals(remotePath3, remoteStorageInfo.getPath());
-
-    String confItems = "host2,k1=v1,k2=v2;host3,k3=v3";
-    writeRemoteStorageConf(
-        cfgFile, remotePath2 + Constants.COMMA_SPLIT_CHAR + remotePath3, 
confItems);
-    expectedAvailablePath = Sets.newHashSet(remotePath2, remotePath3);
-    waitForUpdate(expectedAvailablePath, applicationManager);
-    applicationManager.getSelectStorageStrategy().detectStorage();
-    remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId3");
-    assertEquals(remotePath2, remoteStorageInfo.getPath());
-    assertEquals(2, remoteStorageInfo.getConfItems().size());
-    assertEquals("v1", remoteStorageInfo.getConfItems().get("k1"));
-    assertEquals("v2", remoteStorageInfo.getConfItems().get("k2"));
-
-    confItems = "host1,keyTest1=test1,keyTest2=test2;host2,k1=deadbeaf";
-    writeRemoteStorageConf(
-        cfgFile, remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2, 
confItems);
-    expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath2);
-    waitForUpdate(expectedAvailablePath, applicationManager);
-    remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId4");
-    // one of remote storage will be chosen
-    assertTrue(
-        (remotePath1.equals(remoteStorageInfo.getPath())
-                    && (remoteStorageInfo.getConfItems().size() == 2)
-                    && 
(remoteStorageInfo.getConfItems().get("keyTest1").equals("test1")))
-                && 
(remoteStorageInfo.getConfItems().get("keyTest2").equals("test2"))
-            || (remotePath2.equals(remoteStorageInfo.getPath())
-                    && remoteStorageInfo.getConfItems().size() == 1)
-                && 
remoteStorageInfo.getConfItems().get("k1").equals("deadbeaf"));
-
-    clientConfManager.close();
-  }
-
-  @Test
-  public void dynamicRemoteByHealthStrategyStorageTest() throws Exception {
-    final int updateIntervalSec = 2;
-    final String remotePath1 = "hdfs://host1/path1";
-    final String remotePath2 = "hdfs://host2/path2";
-    final String remotePath3 = "hdfs://host3/path3";
-    File cfgFile = Files.createTempFile("dynamicRemoteStorageTest", 
".conf").toFile();
-    cfgFile.deleteOnExit();
-    writeRemoteStorageConf(cfgFile, remotePath1);
-
-    CoordinatorConf conf = new CoordinatorConf();
-    conf.set(
-        CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, 
updateIntervalSec);
-    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, 
cfgFile.toURI().toString());
-    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true);
-    conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 
60000);
-    
conf.setInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES,
 1);
-    conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, 
IO_SAMPLE);
-
-    ApplicationManager applicationManager = new ApplicationManager(conf);
-    // init readWriteRankScheduler
-    LowestIOSampleCostSelectStorageStrategy selectStorageStrategy =
-        (LowestIOSampleCostSelectStorageStrategy) 
applicationManager.getSelectStorageStrategy();
-    String testPath = "/test";
-
-    final ClientConfManager clientConfManager =
-        new ClientConfManager(conf, new Configuration(), applicationManager);
-    // the reason for sleep here is to ensure that threads can be scheduled 
normally, the same below
-    applicationManager.getSelectStorageStrategy().detectStorage();
-    Set<String> expectedAvailablePath = Sets.newHashSet(remotePath1);
-    assertEquals(
-        expectedAvailablePath, 
applicationManager.getAvailableRemoteStorageInfo().keySet());
-    selectStorageStrategy.sortPathByRankValue(remotePath1, testPath, 
System.currentTimeMillis());
-    RemoteStorageInfo remoteStorageInfo = 
applicationManager.pickRemoteStorage("testAppId1");
-    assertEquals(remotePath1, remoteStorageInfo.getPath());
-    assertTrue(remoteStorageInfo.getConfItems().isEmpty());
-
-    writeRemoteStorageConf(cfgFile, remotePath3);
-    expectedAvailablePath = Sets.newHashSet(remotePath3);
-    waitForUpdate(expectedAvailablePath, applicationManager);
-
-    selectStorageStrategy.sortPathByRankValue(remotePath3, testPath, 
System.currentTimeMillis());
-    applicationManager.getSelectStorageStrategy().detectStorage();
-    remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId2");
-    assertEquals(remotePath3, remoteStorageInfo.getPath());
-
-    String confItems = "host2,k1=v1,k2=v2;host3,k3=v3";
-    final long current = System.currentTimeMillis();
-    writeRemoteStorageConf(
-        cfgFile, remotePath2 + Constants.COMMA_SPLIT_CHAR + remotePath3, 
confItems);
-    expectedAvailablePath = Sets.newHashSet(remotePath2, remotePath3);
-    waitForUpdate(expectedAvailablePath, applicationManager);
-    selectStorageStrategy.sortPathByRankValue(remotePath2, testPath, current);
-    selectStorageStrategy.sortPathByRankValue(remotePath3, testPath, current);
-    applicationManager.getSelectStorageStrategy().detectStorage();
-    remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId3");
-    assertEquals(remotePath2, remoteStorageInfo.getPath());
-    assertEquals(2, remoteStorageInfo.getConfItems().size());
-    assertEquals("v1", remoteStorageInfo.getConfItems().get("k1"));
-    assertEquals("v2", remoteStorageInfo.getConfItems().get("k2"));
-
-    confItems = "host1,keyTest1=test1,keyTest2=test2;host2,k1=deadbeaf";
-    writeRemoteStorageConf(
-        cfgFile, remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2, 
confItems);
-    expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath2);
-    waitForUpdate(expectedAvailablePath, applicationManager);
-    remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId4");
-    // one of remote storage will be chosen
-    assertTrue(
-        (remotePath1.equals(remoteStorageInfo.getPath())
-                    && (remoteStorageInfo.getConfItems().size() == 2)
-                    && 
(remoteStorageInfo.getConfItems().get("keyTest1").equals("test1")))
-                && 
(remoteStorageInfo.getConfItems().get("keyTest2").equals("test2"))
-            || (remotePath2.equals(remoteStorageInfo.getPath())
-                    && remoteStorageInfo.getConfItems().size() == 1)
-                && 
remoteStorageInfo.getConfItems().get("k1").equals("deadbeaf"));
-
-    clientConfManager.close();
-  }
-
-  private void writeRemoteStorageConf(File cfgFile, String value) throws 
Exception {
-    writeRemoteStorageConf(cfgFile, value, null);
-  }
-
-  private void writeRemoteStorageConf(File cfgFile, String pathItems, String 
confItems)
-      throws Exception {
-    // sleep 2 secs to make sure the modified time will be updated
-    Thread.sleep(2000);
-    FileWriter fileWriter = new FileWriter(cfgFile);
-    PrintWriter printWriter = new PrintWriter(fileWriter);
-    printWriter.println(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key() 
+ " " + pathItems);
-    if (confItems != null) {
-      printWriter.println(
-          CoordinatorConf.COORDINATOR_REMOTE_STORAGE_CLUSTER_CONF.key() + " " 
+ confItems);
-    }
-    printWriter.flush();
-    printWriter.close();
-  }
-
-  private void waitForUpdate(
-      Set<String> expectedAvailablePath, ApplicationManager 
applicationManager) throws Exception {
-    int maxAttempt = 10;
-    int attempt = 0;
-    while (true) {
-      if (attempt > maxAttempt) {
-        throw new RuntimeException("Timeout when update configuration");
-      }
-      Thread.sleep(1000);
-      try {
-        assertEquals(
-            expectedAvailablePath, 
applicationManager.getAvailableRemoteStorageInfo().keySet());
-        break;
-      } catch (Throwable e) {
-        // ignore
-      }
-      attempt++;
-    }
-  }
-}
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHadoopTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/DynamicClientConfServiceTest.java
similarity index 61%
copy from 
integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHadoopTest.java
copy to 
coordinator/src/test/java/org/apache/uniffle/coordinator/conf/DynamicClientConfServiceTest.java
index af4e5d762..8a117b86d 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHadoopTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/DynamicClientConfServiceTest.java
@@ -15,72 +15,78 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.test;
+package org.apache.uniffle.coordinator.conf;
 
-import java.io.OutputStreamWriter;
+import java.io.File;
+import java.io.FileWriter;
 import java.io.PrintWriter;
 import java.util.Map;
+import java.util.Objects;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
-import org.apache.uniffle.coordinator.ApplicationManager;
-import org.apache.uniffle.coordinator.ClientConfManager;
 import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.storage.HadoopTestBase;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
 
-import static java.lang.Thread.sleep;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class ClientConfManagerHadoopTest extends HadoopTestBase {
+public class DynamicClientConfServiceTest {
 
-  @Test
-  public void test() throws Exception {
-    String cfgFile = HDFS_URI + "/test/client_conf";
-    createAndRunClientConfManagerCases(HDFS_URI, cfgFile, fs, 
HadoopTestBase.conf);
+  @BeforeEach
+  public void setUp() {
+    CoordinatorMetrics.register();
   }
 
-  public static void createAndRunClientConfManagerCases(
-      String clusterPathPrefix, String cfgFile, FileSystem fileSystem, 
Configuration hadoopConf)
-      throws Exception {
+  @AfterEach
+  public void clear() {
+    CoordinatorMetrics.clear();
+  }
 
-    CoordinatorConf conf = new CoordinatorConf();
-    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, 
clusterPathPrefix);
-    
conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, 
1);
+  @Test
+  public void testByLegacyParser(@TempDir File tempDir) throws Exception {
+    File cfgFile = File.createTempFile("tmp", ".conf", tempDir);
+    final String cfgFileName = cfgFile.getAbsolutePath();
+    final String filePath =
+        
Objects.requireNonNull(getClass().getClassLoader().getResource("coordinator.conf"))
+            .getFile();
+    CoordinatorConf conf = new CoordinatorConf(filePath);
+    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, 
tempDir.toURI().toString());
     conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true);
 
     // file load checking at startup
     Exception expectedException = null;
     try {
-      new ClientConfManager(conf, new Configuration(), new 
ApplicationManager(conf));
+      new DynamicClientConfService(conf, new Configuration());
     } catch (RuntimeException e) {
       expectedException = e;
     }
     assertNotNull(expectedException);
     assertTrue(expectedException.getMessage().endsWith("is not a file."));
 
-    Path path = new Path(cfgFile);
-    FSDataOutputStream out = fileSystem.create(path);
-    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile);
-    ClientConfManager clientConfManager =
-        new ClientConfManager(conf, new Configuration(), new 
ApplicationManager(conf));
-    assertEquals(0, clientConfManager.getClientConf().size());
+    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, 
cfgFile.toURI().toString());
+    DynamicClientConfService clientConfManager =
+        new DynamicClientConfService(conf, new Configuration());
+    assertEquals(0, clientConfManager.getRssClientConf().size());
 
-    PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(out));
+    FileWriter fileWriter = new FileWriter(cfgFile);
+    PrintWriter printWriter = new PrintWriter(fileWriter);
     printWriter.println("spark.mock.1 abc");
     printWriter.println(" spark.mock.2   123 ");
     printWriter.println("spark.mock.3 true  ");
     printWriter.flush();
     printWriter.close();
-    clientConfManager = new ClientConfManager(conf, hadoopConf, new 
ApplicationManager(conf));
-    sleep(1200);
-    Map<String, String> clientConf = clientConfManager.getClientConf();
+    // load config at the beginning
+    clientConfManager = new DynamicClientConfService(conf, new 
Configuration());
+    Thread.sleep(1200);
+    Map<String, String> clientConf = clientConfManager.getRssClientConf();
     assertEquals("abc", clientConf.get("spark.mock.1"));
     assertEquals("123", clientConf.get("spark.mock.2"));
     assertEquals("true", clientConf.get("spark.mock.3"));
@@ -90,36 +96,36 @@ public class ClientConfManagerHadoopTest extends 
HadoopTestBase {
     printWriter.println("");
     printWriter.flush();
     printWriter.close();
-    sleep(1300);
-    assertTrue(fileSystem.exists(path));
-    clientConf = clientConfManager.getClientConf();
+    Thread.sleep(1300);
+    assertTrue(cfgFile.exists());
+    clientConf = clientConfManager.getRssClientConf();
     assertEquals("abc", clientConf.get("spark.mock.1"));
     assertEquals("123", clientConf.get("spark.mock.2"));
     assertEquals("true", clientConf.get("spark.mock.3"));
     assertEquals(3, clientConf.size());
 
     // the config will not be changed when the conf file is deleted
-    fileSystem.delete(path, true);
-    assertFalse(fileSystem.exists(path));
-    sleep(1200);
-    clientConf = clientConfManager.getClientConf();
+    assertTrue(cfgFile.delete());
+    Thread.sleep(1300);
+    assertFalse(cfgFile.exists());
+    clientConf = clientConfManager.getRssClientConf();
     assertEquals("abc", clientConf.get("spark.mock.1"));
     assertEquals("123", clientConf.get("spark.mock.2"));
     assertEquals("true", clientConf.get("spark.mock.3"));
     assertEquals(3, clientConf.size());
 
     // the normal update config process, move the new conf file to the old one
-    Path tmpPath = new Path(cfgFile + ".tmp");
-    out = fileSystem.create(tmpPath);
-    printWriter = new PrintWriter(new OutputStreamWriter(out));
+    File cfgFileTmp = new File(cfgFileName + ".tmp");
+    fileWriter = new FileWriter(cfgFileTmp);
+    printWriter = new PrintWriter(fileWriter);
     printWriter.println("spark.mock.4 deadbeaf");
     printWriter.println("spark.mock.5 9527");
     printWriter.println("spark.mock.6 9527 3423");
     printWriter.println("spark.mock.7");
     printWriter.close();
-    fileSystem.rename(tmpPath, path);
-    sleep(1200);
-    clientConf = clientConfManager.getClientConf();
+    FileUtils.moveFile(cfgFileTmp, cfgFile);
+    Thread.sleep(1200);
+    clientConf = clientConfManager.getRssClientConf();
     assertEquals("deadbeaf", clientConf.get("spark.mock.4"));
     assertEquals("9527", clientConf.get("spark.mock.5"));
     assertEquals(2, clientConf.size());
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHadoopTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/LegacyClientConfParserTest.java
similarity index 53%
copy from 
integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHadoopTest.java
copy to 
coordinator/src/test/java/org/apache/uniffle/coordinator/conf/LegacyClientConfParserTest.java
index 6e06a3ff9..579fc1a97 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHadoopTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/LegacyClientConfParserTest.java
@@ -15,28 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.test;
+package org.apache.uniffle.coordinator.conf;
 
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
-import org.apache.uniffle.common.KerberizedHadoopBase;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class ClientConfManagerKerberlizedHadoopTest extends 
KerberizedHadoopBase {
-
-  @BeforeAll
-  public static void beforeAll() throws Exception {
-    testRunner = ClientConfManagerKerberlizedHadoopTest.class;
-    KerberizedHadoopBase.init();
-  }
+public class LegacyClientConfParserTest {
 
   @Test
-  public void testConfInHadoop() throws Exception {
-    String cfgFile = kerberizedHadoop.getSchemeAndAuthorityPrefix() + 
"/test/client_conf";
-    ClientConfManagerHadoopTest.createAndRunClientConfManagerCases(
-        kerberizedHadoop.getSchemeAndAuthorityPrefix(),
-        cfgFile,
-        kerberizedHadoop.getFileSystem(),
-        kerberizedHadoop.getConf());
+  public void testParse() throws Exception {
+    ClientConfParser parser = new LegacyClientConfParser();
+    ClientConf conf =
+        parser.tryParse(
+            
getClass().getClassLoader().getResource("dynamicClientConf.legacy").openStream());
+    assertEquals("v1", conf.getRssClientConf().get("k1"));
+    assertEquals("v2", conf.getRssClientConf().get("k2"));
+    assertEquals("v1", 
conf.getRemoteStorageInfos().get("hdfs://a-ns01").getConfItems().get("k1"));
+    assertEquals("v1", 
conf.getRemoteStorageInfos().get("hdfs://x-ns01").getConfItems().get("k1"));
   }
 }
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/YamlClientConfParserTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/YamlClientConfParserTest.java
new file mode 100644
index 000000000..953b03db3
--- /dev/null
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/YamlClientConfParserTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.conf;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class YamlClientConfParserTest {
+
+  @Test
+  public void testFromFile() throws Exception {
+    ClientConfParser parser = new YamlClientConfParser();
+    ClientConf conf =
+        parser.tryParse(
+            
getClass().getClassLoader().getResource("dynamicClientConf.yaml").openStream());
+    assertEquals("v1", conf.getRssClientConf().get("k1"));
+    assertEquals("v2", conf.getRssClientConf().get("k2"));
+    assertEquals("v1", 
conf.getRemoteStorageInfos().get("hdfs://a-ns01").getConfItems().get("k1"));
+    assertEquals("v1", 
conf.getRemoteStorageInfos().get("hdfs://x-ns01").getConfItems().get("k1"));
+  }
+
+  @Test
+  public void testParse() throws Exception {
+    ClientConfParser parser = new YamlClientConfParser();
+
+    // rssClientConf with format of 'k : v'
+
+    String yaml = "rssClientConf:\n" + "    k1: v1\n" + "    k2: v2";
+
+    ClientConf conf = parser.tryParse(IOUtils.toInputStream(yaml));
+    assertEquals(2, conf.getRssClientConf().size());
+    assertEquals("v1", conf.getRssClientConf().get("k1"));
+    assertEquals("v2", conf.getRssClientConf().get("k2"));
+
+    // rssClientConf with format of xml
+
+    yaml =
+        "rssClientConf: |+\n"
+            + "   \t<configuration>\n"
+            + "    <property>\n"
+            + "      <name>k1</name>\n"
+            + "      <value>v1</value>\n"
+            + "    </property>\n"
+            + "    <property>\n"
+            + "      <name>k2</name>\n"
+            + "      <value>v2</value>\n"
+            + "    </property>\n"
+            + "    </configuration>";
+
+    conf = parser.tryParse(IOUtils.toInputStream(yaml));
+    assertEquals(2, conf.getRssClientConf().size());
+    assertEquals("v1", conf.getRssClientConf().get("k1"));
+    assertEquals("v2", conf.getRssClientConf().get("k2"));
+
+    // remote storage conf with the format of "k : v"
+
+    yaml =
+        "remoteStorageInfos:\n"
+            + "   hdfs://a-ns01:\n"
+            + "      k1: v1\n"
+            + "      k2: v2\n"
+            + "   hdfs://x-ns01:\n"
+            + "      k1: v1\n"
+            + "      k2: v2";
+    conf = parser.tryParse(IOUtils.toInputStream(yaml));
+    assertEquals(0, conf.getRssClientConf().size());
+    assertEquals(2, conf.getRemoteStorageInfos().size());
+    assertEquals("v1", 
conf.getRemoteStorageInfos().get("hdfs://a-ns01").getConfItems().get("k1"));
+    assertEquals("v1", 
conf.getRemoteStorageInfos().get("hdfs://x-ns01").getConfItems().get("k1"));
+
+    yaml =
+        "remoteStorageInfos:\n"
+            + "   hdfs://a-ns01: |+\n"
+            + "      <configuration>\n"
+            + "      <property>\n"
+            + "        <name>k1</name>\n"
+            + "        <value>v1</value>\n"
+            + "      </property>\n"
+            + "      <property>\n"
+            + "        <name>k2</name>\n"
+            + "        <value>v2</value>\n"
+            + "      </property>\n"
+            + "      </configuration>\n"
+            + "      \n"
+            + "   hdfs://x-ns01:\n"
+            + "      k1: v1\n"
+            + "      k2: v2\n"
+            + "\n";
+    conf = parser.tryParse(IOUtils.toInputStream(yaml));
+    assertEquals(0, conf.getRssClientConf().size());
+    assertEquals(2, conf.getRemoteStorageInfos().size());
+    assertEquals("v1", 
conf.getRemoteStorageInfos().get("hdfs://a-ns01").getConfItems().get("k1"));
+    assertEquals("v1", 
conf.getRemoteStorageInfos().get("hdfs://x-ns01").getConfItems().get("k1"));
+  }
+}
diff --git a/coordinator/src/test/resources/dynamicClientConf.legacy 
b/coordinator/src/test/resources/dynamicClientConf.legacy
new file mode 100644
index 000000000..179756ee1
--- /dev/null
+++ b/coordinator/src/test/resources/dynamicClientConf.legacy
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+k1 v1
+k2 v2
+
+rss.coordinator.remote.storage.path hdfs://x-ns01,hdfs://a-ns01
+rss.coordinator.remote.storage.cluster.conf 
x-ns01,k1=v1,k2=v2;a-ns01,k1=v1,k2=v2
+
diff --git a/coordinator/src/test/resources/dynamicClientConf.yaml 
b/coordinator/src/test/resources/dynamicClientConf.yaml
new file mode 100644
index 000000000..16f9829a7
--- /dev/null
+++ b/coordinator/src/test/resources/dynamicClientConf.yaml
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+rssClientConf:
+  k1: v1
+  k2: v2
+
+remoteStorageInfos:
+  hdfs://a-ns01: |+
+    <configuration>
+        <property>
+          <name>k1</name>
+          <value>v1</value>
+        </property>
+
+        <property>
+          <name>k2</name>
+          <value>v2</value>
+        </property>
+    </configuration>
+
+  hdfs://x-ns01:
+    k1: v1
+    k2: v2
+
+
+
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHadoopTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/DynamicClientConfServiceHadoopTest.java
similarity index 83%
rename from 
integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHadoopTest.java
rename to 
integration-test/common/src/test/java/org/apache/uniffle/test/DynamicClientConfServiceHadoopTest.java
index af4e5d762..8e3a15cef 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHadoopTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/DynamicClientConfServiceHadoopTest.java
@@ -27,9 +27,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
 
-import org.apache.uniffle.coordinator.ApplicationManager;
-import org.apache.uniffle.coordinator.ClientConfManager;
 import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.conf.DynamicClientConfService;
 import org.apache.uniffle.storage.HadoopTestBase;
 
 import static java.lang.Thread.sleep;
@@ -38,15 +37,15 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class ClientConfManagerHadoopTest extends HadoopTestBase {
+public class DynamicClientConfServiceHadoopTest extends HadoopTestBase {
 
   @Test
   public void test() throws Exception {
     String cfgFile = HDFS_URI + "/test/client_conf";
-    createAndRunClientConfManagerCases(HDFS_URI, cfgFile, fs, 
HadoopTestBase.conf);
+    createAndRunCases(HDFS_URI, cfgFile, fs, HadoopTestBase.conf);
   }
 
-  public static void createAndRunClientConfManagerCases(
+  public static void createAndRunCases(
       String clusterPathPrefix, String cfgFile, FileSystem fileSystem, 
Configuration hadoopConf)
       throws Exception {
 
@@ -58,7 +57,7 @@ public class ClientConfManagerHadoopTest extends 
HadoopTestBase {
     // file load checking at startup
     Exception expectedException = null;
     try {
-      new ClientConfManager(conf, new Configuration(), new 
ApplicationManager(conf));
+      new DynamicClientConfService(conf, new Configuration());
     } catch (RuntimeException e) {
       expectedException = e;
     }
@@ -68,9 +67,9 @@ public class ClientConfManagerHadoopTest extends 
HadoopTestBase {
     Path path = new Path(cfgFile);
     FSDataOutputStream out = fileSystem.create(path);
     conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile);
-    ClientConfManager clientConfManager =
-        new ClientConfManager(conf, new Configuration(), new 
ApplicationManager(conf));
-    assertEquals(0, clientConfManager.getClientConf().size());
+    DynamicClientConfService clientConfManager =
+        new DynamicClientConfService(conf, new Configuration());
+    assertEquals(0, clientConfManager.getRssClientConf().size());
 
     PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(out));
     printWriter.println("spark.mock.1 abc");
@@ -78,9 +77,9 @@ public class ClientConfManagerHadoopTest extends 
HadoopTestBase {
     printWriter.println("spark.mock.3 true  ");
     printWriter.flush();
     printWriter.close();
-    clientConfManager = new ClientConfManager(conf, hadoopConf, new 
ApplicationManager(conf));
+    clientConfManager = new DynamicClientConfService(conf, hadoopConf);
     sleep(1200);
-    Map<String, String> clientConf = clientConfManager.getClientConf();
+    Map<String, String> clientConf = clientConfManager.getRssClientConf();
     assertEquals("abc", clientConf.get("spark.mock.1"));
     assertEquals("123", clientConf.get("spark.mock.2"));
     assertEquals("true", clientConf.get("spark.mock.3"));
@@ -92,7 +91,7 @@ public class ClientConfManagerHadoopTest extends 
HadoopTestBase {
     printWriter.close();
     sleep(1300);
     assertTrue(fileSystem.exists(path));
-    clientConf = clientConfManager.getClientConf();
+    clientConf = clientConfManager.getRssClientConf();
     assertEquals("abc", clientConf.get("spark.mock.1"));
     assertEquals("123", clientConf.get("spark.mock.2"));
     assertEquals("true", clientConf.get("spark.mock.3"));
@@ -102,7 +101,7 @@ public class ClientConfManagerHadoopTest extends 
HadoopTestBase {
     fileSystem.delete(path, true);
     assertFalse(fileSystem.exists(path));
     sleep(1200);
-    clientConf = clientConfManager.getClientConf();
+    clientConf = clientConfManager.getRssClientConf();
     assertEquals("abc", clientConf.get("spark.mock.1"));
     assertEquals("123", clientConf.get("spark.mock.2"));
     assertEquals("true", clientConf.get("spark.mock.3"));
@@ -119,7 +118,7 @@ public class ClientConfManagerHadoopTest extends 
HadoopTestBase {
     printWriter.close();
     fileSystem.rename(tmpPath, path);
     sleep(1200);
-    clientConf = clientConfManager.getClientConf();
+    clientConf = clientConfManager.getRssClientConf();
     assertEquals("deadbeaf", clientConf.get("spark.mock.4"));
     assertEquals("9527", clientConf.get("spark.mock.5"));
     assertEquals(2, clientConf.size());
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHadoopTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/DynamicClientConfServiceKerberlizedHadoopTest.java
similarity index 86%
rename from 
integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHadoopTest.java
rename to 
integration-test/common/src/test/java/org/apache/uniffle/test/DynamicClientConfServiceKerberlizedHadoopTest.java
index 6e06a3ff9..c4c2b575f 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHadoopTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/DynamicClientConfServiceKerberlizedHadoopTest.java
@@ -22,18 +22,18 @@ import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.KerberizedHadoopBase;
 
-public class ClientConfManagerKerberlizedHadoopTest extends 
KerberizedHadoopBase {
+public class DynamicClientConfServiceKerberlizedHadoopTest extends 
KerberizedHadoopBase {
 
   @BeforeAll
   public static void beforeAll() throws Exception {
-    testRunner = ClientConfManagerKerberlizedHadoopTest.class;
+    testRunner = DynamicClientConfServiceKerberlizedHadoopTest.class;
     KerberizedHadoopBase.init();
   }
 
   @Test
   public void testConfInHadoop() throws Exception {
     String cfgFile = kerberizedHadoop.getSchemeAndAuthorityPrefix() + 
"/test/client_conf";
-    ClientConfManagerHadoopTest.createAndRunClientConfManagerCases(
+    DynamicClientConfServiceHadoopTest.createAndRunCases(
         kerberizedHadoop.getSchemeAndAuthorityPrefix(),
         cfgFile,
         kerberizedHadoop.getFileSystem(),
diff --git a/pom.xml b/pom.xml
index 87806e565..ddcf695b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
     <skipUTs>${skipTests}</skipUTs>
     <skipITs>${skipTests}</skipITs>
     <skipBuildImage>true</skipBuildImage>
+    <snakeyaml.version>2.2</snakeyaml.version>
   </properties>
 
   <repositories>
@@ -731,6 +732,11 @@
         <artifactId>hbase-shaded-jersey</artifactId>
         <version>${hbase.thirdparty.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.yaml</groupId>
+        <artifactId>snakeyaml</artifactId>
+        <version>${snakeyaml.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 

Reply via email to