This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 93f43473a [#1709] feat(coordinator): Introduce pluggable 
`ClientConfApplyStrategy` for `fetchClientConf` rpc (#1710)
93f43473a is described below

commit 93f43473a5fb9f3ad028ce9df5896c084bd4803a
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu May 16 15:40:49 2024 +0800

    [#1709] feat(coordinator): Introduce pluggable `ClientConfApplyStrategy` 
for `fetchClientConf` rpc (#1710)
    
    ### What changes were proposed in this pull request?
    
    Introduce pluggable ClientConfApplyManager for fetchClientConf rpc
    
    ### Why are the changes needed?
    
    For #1709
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    Unit tests
---
 .../shuffle/manager/RssShuffleManagerBase.java     |  13 ++-
 .../org/apache/uniffle/common/util/RssUtils.java   |   8 ++
 .../uniffle/coordinator/CoordinatorConf.java       |   7 ++
 .../coordinator/CoordinatorGrpcService.java        |  22 +++-
 .../uniffle/coordinator/CoordinatorServer.java     |  17 ++--
 .../conf/AbstractRssClientConfApplyStrategy.java   |  16 +--
 .../conf/BypassRssClientConfApplyStrategy.java     |  16 +--
 .../conf/RssClientConfApplyManager.java            |  62 +++++++++++
 .../coordinator/conf/RssClientConfFetchInfo.java   |  51 ++++++++++
 .../conf/RssClientConfApplyManagerTest.java        | 113 +++++++++++++++++++++
 .../client/impl/grpc/CoordinatorGrpcClient.java    |   2 +-
 .../client/request/RssFetchClientConfRequest.java  |  33 ++++++
 proto/src/main/proto/Rss.proto                     |  11 ++
 13 files changed, 343 insertions(+), 28 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 6a9baecad..77cb173e3 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.shuffle.manager;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -29,6 +30,7 @@ import java.util.stream.Collectors;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.spark.MapOutputTracker;
 import org.apache.spark.MapOutputTrackerMaster;
 import org.apache.spark.SparkConf;
@@ -356,9 +358,16 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
         sparkConf.getInt(
             RssSparkConfig.RSS_ACCESS_TIMEOUT_MS.key(),
             RssSparkConfig.RSS_ACCESS_TIMEOUT_MS.defaultValue().get());
+    String user;
+    try {
+      user = UserGroupInformation.getCurrentUser().getShortUserName();
+    } catch (Exception e) {
+      throw new RssException("Errors on getting current user.", e);
+    }
+    RssFetchClientConfRequest request =
+        new RssFetchClientConfRequest(timeoutMs, user, Collections.emptyMap());
     for (CoordinatorClient client : coordinatorClients) {
-      RssFetchClientConfResponse response =
-          client.fetchClientConf(new RssFetchClientConfRequest(timeoutMs));
+      RssFetchClientConfResponse response = client.fetchClientConf(request);
       if (response.getStatusCode() == StatusCode.SUCCESS) {
         LOG.info("Success to get conf from {}", client.getDesc());
         RssSparkShuffleUtils.applyDynamicClientConf(sparkConf, 
response.getClientConf());
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 7026320aa..5c65f5eb9 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -274,6 +274,14 @@ public class RssUtils {
         Constants.KEY_SPLIT_CHAR, appId, String.valueOf(shuffleId), 
String.valueOf(partition));
   }
 
+  public static <T> T loadExtension(Class<T> extCls, String clsPackage, Object 
obj) {
+    List<T> exts = loadExtensions(extCls, Arrays.asList(clsPackage), obj);
+    if (exts != null && exts.size() == 1) {
+      return exts.get(0);
+    }
+    throw new IllegalArgumentException("No such extension for " + clsPackage);
+  }
+
   @SuppressWarnings("unchecked")
   public static <T> List<T> loadExtensions(Class<T> extClass, List<String> 
classes, Object obj) {
     if (classes == null || classes.isEmpty()) {
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index 7e72036c1..e68b4cbb0 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -36,6 +36,13 @@ import static 
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrat
  */
 public class CoordinatorConf extends RssBaseConf {
 
+  public static final ConfigOption<String> 
COORDINATOR_CLIENT_CONF_APPLY_STRATEGY =
+      ConfigOptions.key("rss.coordinator.client.confApplyStrategy")
+          .stringType()
+          
.defaultValue("org.apache.uniffle.coordinator.conf.BypassRssClientConfApplyStrategy")
+          .withDescription(
+              "The client conf apply strategy which is used on fetchClientConf 
rpc interface.");
+
   public static final ConfigOption<String> COORDINATOR_EXCLUDE_NODES_FILE_PATH 
=
       ConfigOptions.key("rss.coordinator.exclude.nodes.file.path")
           .stringType()
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index e5bc88fcb..0b4b9d69e 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -38,7 +38,7 @@ import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.storage.StorageInfoUtils;
 import org.apache.uniffle.coordinator.access.AccessCheckResult;
 import org.apache.uniffle.coordinator.access.AccessInfo;
-import org.apache.uniffle.coordinator.conf.DynamicClientConfService;
+import org.apache.uniffle.coordinator.conf.RssClientConfFetchInfo;
 import 
org.apache.uniffle.coordinator.strategy.assignment.PartitionRangeAssignment;
 import org.apache.uniffle.coordinator.util.CoordinatorUtils;
 import org.apache.uniffle.proto.CoordinatorServerGrpc;
@@ -50,6 +50,7 @@ import 
org.apache.uniffle.proto.RssProtos.ApplicationInfoRequest;
 import org.apache.uniffle.proto.RssProtos.ApplicationInfoResponse;
 import org.apache.uniffle.proto.RssProtos.CheckServiceAvailableResponse;
 import org.apache.uniffle.proto.RssProtos.ClientConfItem;
+import org.apache.uniffle.proto.RssProtos.FetchClientConfRequest;
 import org.apache.uniffle.proto.RssProtos.FetchClientConfResponse;
 import org.apache.uniffle.proto.RssProtos.FetchRemoteStorageRequest;
 import org.apache.uniffle.proto.RssProtos.FetchRemoteStorageResponse;
@@ -297,9 +298,22 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
     responseObserver.onCompleted();
   }
 
+  /** To be compatible with the older client version. */
   @Override
   public void fetchClientConf(
       Empty empty, StreamObserver<FetchClientConfResponse> responseObserver) {
+    fetchClientConfImpl(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO, 
responseObserver);
+  }
+
+  @Override
+  public void fetchClientConfV2(
+      FetchClientConfRequest request, StreamObserver<FetchClientConfResponse> 
responseObserver) {
+    fetchClientConfImpl(RssClientConfFetchInfo.fromProto(request), 
responseObserver);
+  }
+
+  private void fetchClientConfImpl(
+      RssClientConfFetchInfo rssClientConfFetchInfo,
+      StreamObserver<FetchClientConfResponse> responseObserver) {
     FetchClientConfResponse response;
     FetchClientConfResponse.Builder builder =
         FetchClientConfResponse.newBuilder().setStatus(StatusCode.SUCCESS);
@@ -308,9 +322,9 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
             .getCoordinatorConf()
             
.getBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED);
     if (dynamicConfEnabled) {
-      DynamicClientConfService dynamicClientConfService =
-          coordinatorServer.getDynamicClientConfService();
-      for (Map.Entry<String, String> kv : 
dynamicClientConfService.getRssClientConf().entrySet()) {
+      Map<String, String> clientConfigs =
+          
coordinatorServer.getClientConfApplyManager().apply(rssClientConfFetchInfo);
+      for (Map.Entry<String, String> kv : clientConfigs.entrySet()) {
         builder.addClientConf(
             
ClientConfItem.newBuilder().setKey(kv.getKey()).setValue(kv.getValue()).build());
       }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index c64156b65..1a6308ed8 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -41,6 +41,7 @@ import 
org.apache.uniffle.common.web.CoalescedCollectorRegistry;
 import org.apache.uniffle.common.web.JettyServer;
 import org.apache.uniffle.coordinator.conf.ClientConf;
 import org.apache.uniffle.coordinator.conf.DynamicClientConfService;
+import org.apache.uniffle.coordinator.conf.RssClientConfApplyManager;
 import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
 import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
 import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy;
@@ -63,7 +64,7 @@ public class CoordinatorServer extends ReconfigurableBase {
   private ServerInterface server;
   private ClusterManager clusterManager;
   private AssignmentStrategy assignmentStrategy;
-  private DynamicClientConfService dynamicClientConfService;
+  private RssClientConfApplyManager clientConfApplyManager;
   private AccessManager accessManager;
   private ApplicationManager applicationManager;
   private GRPCMetrics grpcMetrics;
@@ -137,8 +138,8 @@ public class CoordinatorServer extends ReconfigurableBase {
     if (accessManager != null) {
       accessManager.close();
     }
-    if (dynamicClientConfService != null) {
-      dynamicClientConfService.close();
+    if (clientConfApplyManager != null) {
+      clientConfApplyManager.close();
     }
     if (metricReporter != null) {
       metricReporter.stop();
@@ -181,11 +182,15 @@ public class CoordinatorServer extends ReconfigurableBase 
{
         new ClusterManagerFactory(coordinatorConf, hadoopConf);
 
     this.clusterManager = clusterManagerFactory.getClusterManager();
-    this.dynamicClientConfService =
+
+    DynamicClientConfService dynamicClientConfService =
         new DynamicClientConfService(
             coordinatorConf,
             hadoopConf,
             new Consumer[] {(Consumer<ClientConf>) 
applicationManager::refreshRemoteStorages});
+    this.clientConfApplyManager =
+        new RssClientConfApplyManager(coordinatorConf, 
dynamicClientConfService);
+
     AssignmentStrategyFactory assignmentStrategyFactory =
         new AssignmentStrategyFactory(coordinatorConf, clusterManager);
     this.assignmentStrategy = 
assignmentStrategyFactory.getAssignmentStrategy();
@@ -255,8 +260,8 @@ public class CoordinatorServer extends ReconfigurableBase {
     return accessManager;
   }
 
-  public DynamicClientConfService getDynamicClientConfService() {
-    return dynamicClientConfService;
+  public RssClientConfApplyManager getClientConfApplyManager() {
+    return clientConfApplyManager;
   }
 
   public GRPCMetrics getGrpcMetrics() {
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/AbstractRssClientConfApplyStrategy.java
similarity index 64%
copy from 
internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
copy to 
coordinator/src/main/java/org/apache/uniffle/coordinator/conf/AbstractRssClientConfApplyStrategy.java
index b1deb5fc1..ca0aea9ea 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/AbstractRssClientConfApplyStrategy.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.client.request;
+package org.apache.uniffle.coordinator.conf;
 
-public class RssFetchClientConfRequest {
-  private final int timeoutMs;
+import java.util.Map;
 
-  public RssFetchClientConfRequest(int timeoutMs) {
-    this.timeoutMs = timeoutMs;
-  }
+public abstract class AbstractRssClientConfApplyStrategy {
+  protected DynamicClientConfService dynamicClientConfService;
 
-  public int getTimeoutMs() {
-    return timeoutMs;
+  protected AbstractRssClientConfApplyStrategy(DynamicClientConfService 
dynamicClientConfService) {
+    this.dynamicClientConfService = dynamicClientConfService;
   }
+
+  abstract Map<String, String> apply(RssClientConfFetchInfo 
rssClientConfFetchInfo);
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/BypassRssClientConfApplyStrategy.java
similarity index 65%
copy from 
internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
copy to 
coordinator/src/main/java/org/apache/uniffle/coordinator/conf/BypassRssClientConfApplyStrategy.java
index b1deb5fc1..a5b1c978a 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/BypassRssClientConfApplyStrategy.java
@@ -15,16 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.client.request;
+package org.apache.uniffle.coordinator.conf;
 
-public class RssFetchClientConfRequest {
-  private final int timeoutMs;
+import java.util.Map;
 
-  public RssFetchClientConfRequest(int timeoutMs) {
-    this.timeoutMs = timeoutMs;
+public class BypassRssClientConfApplyStrategy extends 
AbstractRssClientConfApplyStrategy {
+
+  public BypassRssClientConfApplyStrategy(DynamicClientConfService 
dynamicClientConfService) {
+    super(dynamicClientConfService);
   }
 
-  public int getTimeoutMs() {
-    return timeoutMs;
+  @Override
+  Map<String, String> apply(RssClientConfFetchInfo rssClientConfFetchInfo) {
+    return dynamicClientConfService.getRssClientConf();
   }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/RssClientConfApplyManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/RssClientConfApplyManager.java
new file mode 100644
index 000000000..ab99ecdc3
--- /dev/null
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/RssClientConfApplyManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.conf;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+
+import static 
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_CLIENT_CONF_APPLY_STRATEGY;
+
+public class RssClientConfApplyManager implements Closeable {
+  private final AbstractRssClientConfApplyStrategy strategy;
+  private final DynamicClientConfService dynamicClientConfService;
+
+  public RssClientConfApplyManager(
+      CoordinatorConf conf, DynamicClientConfService dynamicClientConfService) 
{
+    this.dynamicClientConfService = dynamicClientConfService;
+
+    String strategyCls = conf.get(COORDINATOR_CLIENT_CONF_APPLY_STRATEGY);
+    this.strategy =
+        RssUtils.loadExtension(
+            AbstractRssClientConfApplyStrategy.class, strategyCls, 
dynamicClientConfService);
+  }
+
+  public Map<String, String> apply(RssClientConfFetchInfo 
rssClientConfFetchInfo) {
+    // to be compatible with the older client version.
+    if (rssClientConfFetchInfo.isEmpty()) {
+      return dynamicClientConfService.getRssClientConf();
+    }
+    return strategy.apply(rssClientConfFetchInfo);
+  }
+
+  @VisibleForTesting
+  protected AbstractRssClientConfApplyStrategy getStrategy() {
+    return strategy;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // ignore.
+  }
+}
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/RssClientConfFetchInfo.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/RssClientConfFetchInfo.java
new file mode 100644
index 000000000..5297fe7fd
--- /dev/null
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/RssClientConfFetchInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.conf;
+
+import java.util.Map;
+
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssClientConfFetchInfo {
+  private String user;
+  private Map<String, String> properties;
+
+  public static final RssClientConfFetchInfo EMPTY_CLIENT_CONF_FETCH_INFO =
+      new RssClientConfFetchInfo(null, null);
+
+  public RssClientConfFetchInfo(String user, Map<String, String> properties) {
+    this.user = user;
+    this.properties = properties;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public Map<String, String> getProperties() {
+    return properties;
+  }
+
+  public boolean isEmpty() {
+    return user == null && (properties == null || properties.isEmpty());
+  }
+
+  public static RssClientConfFetchInfo 
fromProto(RssProtos.FetchClientConfRequest request) {
+    return new RssClientConfFetchInfo(request.getUser(), 
request.getPropertiesMap());
+  }
+}
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/RssClientConfApplyManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/RssClientConfApplyManagerTest.java
new file mode 100644
index 000000000..67b206dd7
--- /dev/null
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/RssClientConfApplyManagerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.conf;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+
+import static 
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_CLIENT_CONF_APPLY_STRATEGY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RssClientConfApplyManagerTest {
+
+  @BeforeEach
+  public void setUp() {
+    CoordinatorMetrics.register();
+  }
+
+  @AfterEach
+  public void clear() {
+    CoordinatorMetrics.clear();
+  }
+
+  @Test
+  public void testBypassApply() {
+    DynamicClientConfService dynamicClientConfService = 
mock(DynamicClientConfService.class);
+    Map<String, String> conf = new HashMap<>();
+    conf.put("a", "b");
+    when(dynamicClientConfService.getRssClientConf()).thenReturn(conf);
+
+    CoordinatorConf coordinatorConf = new CoordinatorConf();
+
+    RssClientConfApplyManager applyManager =
+        new RssClientConfApplyManager(coordinatorConf, 
dynamicClientConfService);
+
+    assertEquals(conf, 
applyManager.apply(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO));
+    assertEquals(conf, applyManager.apply(new RssClientConfFetchInfo("a", 
Collections.emptyMap())));
+  }
+
+  public static final class MockedRssClientConfApplyStrategy
+      extends AbstractRssClientConfApplyStrategy {
+    private Set<String> legalUsers;
+
+    public MockedRssClientConfApplyStrategy(DynamicClientConfService 
dynamicClientConfService) {
+      super(dynamicClientConfService);
+    }
+
+    public void setLegalUsers(Set<String> legalUsers) {
+      this.legalUsers = legalUsers;
+    }
+
+    @Override
+    Map<String, String> apply(RssClientConfFetchInfo rssClientConfFetchInfo) {
+      if (legalUsers.contains(rssClientConfFetchInfo.getUser())) {
+        return dynamicClientConfService.getRssClientConf();
+      }
+      return Collections.EMPTY_MAP;
+    }
+  }
+
+  @Test
+  public void testCustomizeApplyStrategy() {
+    DynamicClientConfService dynamicClientConfService = 
mock(DynamicClientConfService.class);
+    Map<String, String> conf = new HashMap<>();
+    conf.put("a", "b");
+    when(dynamicClientConfService.getRssClientConf()).thenReturn(conf);
+
+    CoordinatorConf coordinatorConf = new CoordinatorConf();
+    coordinatorConf.set(
+        COORDINATOR_CLIENT_CONF_APPLY_STRATEGY, 
MockedRssClientConfApplyStrategy.class.getName());
+
+    RssClientConfApplyManager applyManager =
+        new RssClientConfApplyManager(coordinatorConf, 
dynamicClientConfService);
+    MockedRssClientConfApplyStrategy strategy =
+        (MockedRssClientConfApplyStrategy) applyManager.getStrategy();
+    strategy.setLegalUsers(Sets.newHashSet("a", "b", "c"));
+
+    // if using the older client version, it will always return the conf
+    assertEquals(conf, 
applyManager.apply(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO));
+
+    assertEquals(conf, applyManager.apply(new RssClientConfFetchInfo("a", 
Collections.emptyMap())));
+    assertEquals(conf, applyManager.apply(new RssClientConfFetchInfo("b", 
Collections.emptyMap())));
+    assertEquals(conf, applyManager.apply(new RssClientConfFetchInfo("c", 
Collections.emptyMap())));
+    assertEquals(
+        0, applyManager.apply(new RssClientConfFetchInfo("d", 
Collections.emptyMap())).size());
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 728b184ec..be6e70df3 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -351,7 +351,7 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
       rpcResponse =
           blockingStub
               .withDeadlineAfter(request.getTimeoutMs(), TimeUnit.MILLISECONDS)
-              .fetchClientConf(Empty.getDefaultInstance());
+              .fetchClientConfV2(request.toProto());
       Map<String, String> clientConf =
           rpcResponse.getClientConfList().stream()
               .collect(Collectors.toMap(ClientConfItem::getKey, 
ClientConfItem::getValue));
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
index b1deb5fc1..f9c381447 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
@@ -17,14 +17,47 @@
 
 package org.apache.uniffle.client.request;
 
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.uniffle.proto.RssProtos;
+
 public class RssFetchClientConfRequest {
   private final int timeoutMs;
+  private String user;
+  private Map<String, String> properties = Collections.emptyMap();
+
+  public RssFetchClientConfRequest(int timeoutMs, String user, Map<String, 
String> properties) {
+    this.timeoutMs = timeoutMs;
+    this.user = user;
+    this.properties = properties;
+  }
 
+  @VisibleForTesting
   public RssFetchClientConfRequest(int timeoutMs) {
     this.timeoutMs = timeoutMs;
+    this.user = StringUtils.EMPTY;
   }
 
   public int getTimeoutMs() {
     return timeoutMs;
   }
+
+  public String getUser() {
+    return user;
+  }
+
+  public Map<String, String> getProperties() {
+    return properties;
+  }
+
+  public RssProtos.FetchClientConfRequest toProto() {
+    return RssProtos.FetchClientConfRequest.newBuilder()
+        .setUser(user)
+        .putAllProperties(properties)
+        .build();
+  }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index d8d384f35..5b3d9e133 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -367,6 +367,7 @@ service CoordinatorServer {
 
   // Get basic client conf from coordinator
   rpc fetchClientConf(google.protobuf.Empty) returns (FetchClientConfResponse);
+  rpc fetchClientConfV2(FetchClientConfRequest) returns 
(FetchClientConfResponse);
 
   // Get remote storage from coordinator
   rpc fetchRemoteStorage(FetchRemoteStorageRequest) returns 
(FetchRemoteStorageResponse);
@@ -463,6 +464,16 @@ message AccessClusterResponse {
   string uuid = 3;
 }
 
+message FetchClientConfRequest {
+  string user = 1;
+  /**
+   For the potential extension for customize delegation shuffle manager.
+   You could put more internal properties in customize delegation shuffle 
manager,
+   and then it could be as metadata in your own customize 
ClientConfApplyStrategy.
+   */
+  map<string, string> properties = 2;
+}
+
 message FetchClientConfResponse {
   StatusCode status = 1;
   string retMsg = 2;

Reply via email to