This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 93f43473a [#1709] feat(coordinator): Introduce pluggable
`ClientConfApplyStrategy` for `fetchClientConf` rpc (#1710)
93f43473a is described below
commit 93f43473a5fb9f3ad028ce9df5896c084bd4803a
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu May 16 15:40:49 2024 +0800
[#1709] feat(coordinator): Introduce pluggable `ClientConfApplyStrategy`
for `fetchClientConf` rpc (#1710)
### What changes were proposed in this pull request?
Introduce pluggable ClientConfApplyManager for fetchClientConf rpc
### Why are the changes needed?
For #1709
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Unit tests
---
.../shuffle/manager/RssShuffleManagerBase.java | 13 ++-
.../org/apache/uniffle/common/util/RssUtils.java | 8 ++
.../uniffle/coordinator/CoordinatorConf.java | 7 ++
.../coordinator/CoordinatorGrpcService.java | 22 +++-
.../uniffle/coordinator/CoordinatorServer.java | 17 ++--
.../conf/AbstractRssClientConfApplyStrategy.java | 16 +--
.../conf/BypassRssClientConfApplyStrategy.java | 16 +--
.../conf/RssClientConfApplyManager.java | 62 +++++++++++
.../coordinator/conf/RssClientConfFetchInfo.java | 51 ++++++++++
.../conf/RssClientConfApplyManagerTest.java | 113 +++++++++++++++++++++
.../client/impl/grpc/CoordinatorGrpcClient.java | 2 +-
.../client/request/RssFetchClientConfRequest.java | 33 ++++++
proto/src/main/proto/Rss.proto | 11 ++
13 files changed, 343 insertions(+), 28 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 6a9baecad..77cb173e3 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.shuffle.manager;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -29,6 +30,7 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.MapOutputTracker;
import org.apache.spark.MapOutputTrackerMaster;
import org.apache.spark.SparkConf;
@@ -356,9 +358,16 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
sparkConf.getInt(
RssSparkConfig.RSS_ACCESS_TIMEOUT_MS.key(),
RssSparkConfig.RSS_ACCESS_TIMEOUT_MS.defaultValue().get());
+ String user;
+ try {
+ user = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (Exception e) {
+ throw new RssException("Errors on getting current user.", e);
+ }
+ RssFetchClientConfRequest request =
+ new RssFetchClientConfRequest(timeoutMs, user, Collections.emptyMap());
for (CoordinatorClient client : coordinatorClients) {
- RssFetchClientConfResponse response =
- client.fetchClientConf(new RssFetchClientConfRequest(timeoutMs));
+ RssFetchClientConfResponse response = client.fetchClientConf(request);
if (response.getStatusCode() == StatusCode.SUCCESS) {
LOG.info("Success to get conf from {}", client.getDesc());
RssSparkShuffleUtils.applyDynamicClientConf(sparkConf,
response.getClientConf());
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 7026320aa..5c65f5eb9 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -274,6 +274,14 @@ public class RssUtils {
Constants.KEY_SPLIT_CHAR, appId, String.valueOf(shuffleId),
String.valueOf(partition));
}
+ public static <T> T loadExtension(Class<T> extCls, String clsPackage, Object
obj) {
+ List<T> exts = loadExtensions(extCls, Arrays.asList(clsPackage), obj);
+ if (exts != null && exts.size() == 1) {
+ return exts.get(0);
+ }
+ throw new IllegalArgumentException("No such extension for " + clsPackage);
+ }
+
@SuppressWarnings("unchecked")
public static <T> List<T> loadExtensions(Class<T> extClass, List<String>
classes, Object obj) {
if (classes == null || classes.isEmpty()) {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index 7e72036c1..e68b4cbb0 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -36,6 +36,13 @@ import static
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrat
*/
public class CoordinatorConf extends RssBaseConf {
+ public static final ConfigOption<String>
COORDINATOR_CLIENT_CONF_APPLY_STRATEGY =
+ ConfigOptions.key("rss.coordinator.client.confApplyStrategy")
+ .stringType()
+
.defaultValue("org.apache.uniffle.coordinator.conf.BypassRssClientConfApplyStrategy")
+ .withDescription(
+ "The client conf apply strategy which is used on fetchClientConf
rpc interface.");
+
public static final ConfigOption<String> COORDINATOR_EXCLUDE_NODES_FILE_PATH
=
ConfigOptions.key("rss.coordinator.exclude.nodes.file.path")
.stringType()
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index e5bc88fcb..0b4b9d69e 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -38,7 +38,7 @@ import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.storage.StorageInfoUtils;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
-import org.apache.uniffle.coordinator.conf.DynamicClientConfService;
+import org.apache.uniffle.coordinator.conf.RssClientConfFetchInfo;
import
org.apache.uniffle.coordinator.strategy.assignment.PartitionRangeAssignment;
import org.apache.uniffle.coordinator.util.CoordinatorUtils;
import org.apache.uniffle.proto.CoordinatorServerGrpc;
@@ -50,6 +50,7 @@ import
org.apache.uniffle.proto.RssProtos.ApplicationInfoRequest;
import org.apache.uniffle.proto.RssProtos.ApplicationInfoResponse;
import org.apache.uniffle.proto.RssProtos.CheckServiceAvailableResponse;
import org.apache.uniffle.proto.RssProtos.ClientConfItem;
+import org.apache.uniffle.proto.RssProtos.FetchClientConfRequest;
import org.apache.uniffle.proto.RssProtos.FetchClientConfResponse;
import org.apache.uniffle.proto.RssProtos.FetchRemoteStorageRequest;
import org.apache.uniffle.proto.RssProtos.FetchRemoteStorageResponse;
@@ -297,9 +298,22 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
responseObserver.onCompleted();
}
+ /** To be compatible with the older client version. */
@Override
public void fetchClientConf(
Empty empty, StreamObserver<FetchClientConfResponse> responseObserver) {
+ fetchClientConfImpl(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO,
responseObserver);
+ }
+
+ @Override
+ public void fetchClientConfV2(
+ FetchClientConfRequest request, StreamObserver<FetchClientConfResponse>
responseObserver) {
+ fetchClientConfImpl(RssClientConfFetchInfo.fromProto(request),
responseObserver);
+ }
+
+ private void fetchClientConfImpl(
+ RssClientConfFetchInfo rssClientConfFetchInfo,
+ StreamObserver<FetchClientConfResponse> responseObserver) {
FetchClientConfResponse response;
FetchClientConfResponse.Builder builder =
FetchClientConfResponse.newBuilder().setStatus(StatusCode.SUCCESS);
@@ -308,9 +322,9 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
.getCoordinatorConf()
.getBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED);
if (dynamicConfEnabled) {
- DynamicClientConfService dynamicClientConfService =
- coordinatorServer.getDynamicClientConfService();
- for (Map.Entry<String, String> kv :
dynamicClientConfService.getRssClientConf().entrySet()) {
+ Map<String, String> clientConfigs =
+
coordinatorServer.getClientConfApplyManager().apply(rssClientConfFetchInfo);
+ for (Map.Entry<String, String> kv : clientConfigs.entrySet()) {
builder.addClientConf(
ClientConfItem.newBuilder().setKey(kv.getKey()).setValue(kv.getValue()).build());
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index c64156b65..1a6308ed8 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -41,6 +41,7 @@ import
org.apache.uniffle.common.web.CoalescedCollectorRegistry;
import org.apache.uniffle.common.web.JettyServer;
import org.apache.uniffle.coordinator.conf.ClientConf;
import org.apache.uniffle.coordinator.conf.DynamicClientConfService;
+import org.apache.uniffle.coordinator.conf.RssClientConfApplyManager;
import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy;
@@ -63,7 +64,7 @@ public class CoordinatorServer extends ReconfigurableBase {
private ServerInterface server;
private ClusterManager clusterManager;
private AssignmentStrategy assignmentStrategy;
- private DynamicClientConfService dynamicClientConfService;
+ private RssClientConfApplyManager clientConfApplyManager;
private AccessManager accessManager;
private ApplicationManager applicationManager;
private GRPCMetrics grpcMetrics;
@@ -137,8 +138,8 @@ public class CoordinatorServer extends ReconfigurableBase {
if (accessManager != null) {
accessManager.close();
}
- if (dynamicClientConfService != null) {
- dynamicClientConfService.close();
+ if (clientConfApplyManager != null) {
+ clientConfApplyManager.close();
}
if (metricReporter != null) {
metricReporter.stop();
@@ -181,11 +182,15 @@ public class CoordinatorServer extends ReconfigurableBase
{
new ClusterManagerFactory(coordinatorConf, hadoopConf);
this.clusterManager = clusterManagerFactory.getClusterManager();
- this.dynamicClientConfService =
+
+ DynamicClientConfService dynamicClientConfService =
new DynamicClientConfService(
coordinatorConf,
hadoopConf,
new Consumer[] {(Consumer<ClientConf>)
applicationManager::refreshRemoteStorages});
+ this.clientConfApplyManager =
+ new RssClientConfApplyManager(coordinatorConf,
dynamicClientConfService);
+
AssignmentStrategyFactory assignmentStrategyFactory =
new AssignmentStrategyFactory(coordinatorConf, clusterManager);
this.assignmentStrategy =
assignmentStrategyFactory.getAssignmentStrategy();
@@ -255,8 +260,8 @@ public class CoordinatorServer extends ReconfigurableBase {
return accessManager;
}
- public DynamicClientConfService getDynamicClientConfService() {
- return dynamicClientConfService;
+ public RssClientConfApplyManager getClientConfApplyManager() {
+ return clientConfApplyManager;
}
public GRPCMetrics getGrpcMetrics() {
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/AbstractRssClientConfApplyStrategy.java
similarity index 64%
copy from
internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
copy to
coordinator/src/main/java/org/apache/uniffle/coordinator/conf/AbstractRssClientConfApplyStrategy.java
index b1deb5fc1..ca0aea9ea 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/AbstractRssClientConfApplyStrategy.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.uniffle.client.request;
+package org.apache.uniffle.coordinator.conf;
-public class RssFetchClientConfRequest {
- private final int timeoutMs;
+import java.util.Map;
- public RssFetchClientConfRequest(int timeoutMs) {
- this.timeoutMs = timeoutMs;
- }
+public abstract class AbstractRssClientConfApplyStrategy {
+ protected DynamicClientConfService dynamicClientConfService;
- public int getTimeoutMs() {
- return timeoutMs;
+ protected AbstractRssClientConfApplyStrategy(DynamicClientConfService
dynamicClientConfService) {
+ this.dynamicClientConfService = dynamicClientConfService;
}
+
+ abstract Map<String, String> apply(RssClientConfFetchInfo
rssClientConfFetchInfo);
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/BypassRssClientConfApplyStrategy.java
similarity index 65%
copy from
internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
copy to
coordinator/src/main/java/org/apache/uniffle/coordinator/conf/BypassRssClientConfApplyStrategy.java
index b1deb5fc1..a5b1c978a 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/BypassRssClientConfApplyStrategy.java
@@ -15,16 +15,18 @@
* limitations under the License.
*/
-package org.apache.uniffle.client.request;
+package org.apache.uniffle.coordinator.conf;
-public class RssFetchClientConfRequest {
- private final int timeoutMs;
+import java.util.Map;
- public RssFetchClientConfRequest(int timeoutMs) {
- this.timeoutMs = timeoutMs;
+public class BypassRssClientConfApplyStrategy extends
AbstractRssClientConfApplyStrategy {
+
+ public BypassRssClientConfApplyStrategy(DynamicClientConfService
dynamicClientConfService) {
+ super(dynamicClientConfService);
}
- public int getTimeoutMs() {
- return timeoutMs;
+ @Override
+ Map<String, String> apply(RssClientConfFetchInfo rssClientConfFetchInfo) {
+ return dynamicClientConfService.getRssClientConf();
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/RssClientConfApplyManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/RssClientConfApplyManager.java
new file mode 100644
index 000000000..ab99ecdc3
--- /dev/null
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/RssClientConfApplyManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.conf;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+
+import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_CLIENT_CONF_APPLY_STRATEGY;
+
+public class RssClientConfApplyManager implements Closeable {
+ private final AbstractRssClientConfApplyStrategy strategy;
+ private final DynamicClientConfService dynamicClientConfService;
+
+ public RssClientConfApplyManager(
+ CoordinatorConf conf, DynamicClientConfService dynamicClientConfService)
{
+ this.dynamicClientConfService = dynamicClientConfService;
+
+ String strategyCls = conf.get(COORDINATOR_CLIENT_CONF_APPLY_STRATEGY);
+ this.strategy =
+ RssUtils.loadExtension(
+ AbstractRssClientConfApplyStrategy.class, strategyCls,
dynamicClientConfService);
+ }
+
+ public Map<String, String> apply(RssClientConfFetchInfo
rssClientConfFetchInfo) {
+ // to be compatible with the older client version.
+ if (rssClientConfFetchInfo.isEmpty()) {
+ return dynamicClientConfService.getRssClientConf();
+ }
+ return strategy.apply(rssClientConfFetchInfo);
+ }
+
+ @VisibleForTesting
+ protected AbstractRssClientConfApplyStrategy getStrategy() {
+ return strategy;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // ignore.
+ }
+}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/RssClientConfFetchInfo.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/RssClientConfFetchInfo.java
new file mode 100644
index 000000000..5297fe7fd
--- /dev/null
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/conf/RssClientConfFetchInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.conf;
+
+import java.util.Map;
+
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssClientConfFetchInfo {
+ private String user;
+ private Map<String, String> properties;
+
+ public static final RssClientConfFetchInfo EMPTY_CLIENT_CONF_FETCH_INFO =
+ new RssClientConfFetchInfo(null, null);
+
+ public RssClientConfFetchInfo(String user, Map<String, String> properties) {
+ this.user = user;
+ this.properties = properties;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public boolean isEmpty() {
+ return user == null && (properties == null || properties.isEmpty());
+ }
+
+ public static RssClientConfFetchInfo
fromProto(RssProtos.FetchClientConfRequest request) {
+ return new RssClientConfFetchInfo(request.getUser(),
request.getPropertiesMap());
+ }
+}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/RssClientConfApplyManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/RssClientConfApplyManagerTest.java
new file mode 100644
index 000000000..67b206dd7
--- /dev/null
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/RssClientConfApplyManagerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.conf;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+
+import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_CLIENT_CONF_APPLY_STRATEGY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RssClientConfApplyManagerTest {
+
+ @BeforeEach
+ public void setUp() {
+ CoordinatorMetrics.register();
+ }
+
+ @AfterEach
+ public void clear() {
+ CoordinatorMetrics.clear();
+ }
+
+ @Test
+ public void testBypassApply() {
+ DynamicClientConfService dynamicClientConfService =
mock(DynamicClientConfService.class);
+ Map<String, String> conf = new HashMap<>();
+ conf.put("a", "b");
+ when(dynamicClientConfService.getRssClientConf()).thenReturn(conf);
+
+ CoordinatorConf coordinatorConf = new CoordinatorConf();
+
+ RssClientConfApplyManager applyManager =
+ new RssClientConfApplyManager(coordinatorConf,
dynamicClientConfService);
+
+ assertEquals(conf,
applyManager.apply(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO));
+ assertEquals(conf, applyManager.apply(new RssClientConfFetchInfo("a",
Collections.emptyMap())));
+ }
+
+ public static final class MockedRssClientConfApplyStrategy
+ extends AbstractRssClientConfApplyStrategy {
+ private Set<String> legalUsers;
+
+ public MockedRssClientConfApplyStrategy(DynamicClientConfService
dynamicClientConfService) {
+ super(dynamicClientConfService);
+ }
+
+ public void setLegalUsers(Set<String> legalUsers) {
+ this.legalUsers = legalUsers;
+ }
+
+ @Override
+ Map<String, String> apply(RssClientConfFetchInfo rssClientConfFetchInfo) {
+ if (legalUsers.contains(rssClientConfFetchInfo.getUser())) {
+ return dynamicClientConfService.getRssClientConf();
+ }
+ return Collections.EMPTY_MAP;
+ }
+ }
+
+ @Test
+ public void testCustomizeApplyStrategy() {
+ DynamicClientConfService dynamicClientConfService =
mock(DynamicClientConfService.class);
+ Map<String, String> conf = new HashMap<>();
+ conf.put("a", "b");
+ when(dynamicClientConfService.getRssClientConf()).thenReturn(conf);
+
+ CoordinatorConf coordinatorConf = new CoordinatorConf();
+ coordinatorConf.set(
+ COORDINATOR_CLIENT_CONF_APPLY_STRATEGY,
MockedRssClientConfApplyStrategy.class.getName());
+
+ RssClientConfApplyManager applyManager =
+ new RssClientConfApplyManager(coordinatorConf,
dynamicClientConfService);
+ MockedRssClientConfApplyStrategy strategy =
+ (MockedRssClientConfApplyStrategy) applyManager.getStrategy();
+ strategy.setLegalUsers(Sets.newHashSet("a", "b", "c"));
+
+ // if using the older client version, it will always return the conf
+ assertEquals(conf,
applyManager.apply(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO));
+
+ assertEquals(conf, applyManager.apply(new RssClientConfFetchInfo("a",
Collections.emptyMap())));
+ assertEquals(conf, applyManager.apply(new RssClientConfFetchInfo("b",
Collections.emptyMap())));
+ assertEquals(conf, applyManager.apply(new RssClientConfFetchInfo("c",
Collections.emptyMap())));
+ assertEquals(
+ 0, applyManager.apply(new RssClientConfFetchInfo("d",
Collections.emptyMap())).size());
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 728b184ec..be6e70df3 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -351,7 +351,7 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
rpcResponse =
blockingStub
.withDeadlineAfter(request.getTimeoutMs(), TimeUnit.MILLISECONDS)
- .fetchClientConf(Empty.getDefaultInstance());
+ .fetchClientConfV2(request.toProto());
Map<String, String> clientConf =
rpcResponse.getClientConfList().stream()
.collect(Collectors.toMap(ClientConfItem::getKey,
ClientConfItem::getValue));
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
index b1deb5fc1..f9c381447 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssFetchClientConfRequest.java
@@ -17,14 +17,47 @@
package org.apache.uniffle.client.request;
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.uniffle.proto.RssProtos;
+
public class RssFetchClientConfRequest {
private final int timeoutMs;
+ private String user;
+ private Map<String, String> properties = Collections.emptyMap();
+
+ public RssFetchClientConfRequest(int timeoutMs, String user, Map<String,
String> properties) {
+ this.timeoutMs = timeoutMs;
+ this.user = user;
+ this.properties = properties;
+ }
+ @VisibleForTesting
public RssFetchClientConfRequest(int timeoutMs) {
this.timeoutMs = timeoutMs;
+ this.user = StringUtils.EMPTY;
}
public int getTimeoutMs() {
return timeoutMs;
}
+
+ public String getUser() {
+ return user;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public RssProtos.FetchClientConfRequest toProto() {
+ return RssProtos.FetchClientConfRequest.newBuilder()
+ .setUser(user)
+ .putAllProperties(properties)
+ .build();
+ }
}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index d8d384f35..5b3d9e133 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -367,6 +367,7 @@ service CoordinatorServer {
// Get basic client conf from coordinator
rpc fetchClientConf(google.protobuf.Empty) returns (FetchClientConfResponse);
+ rpc fetchClientConfV2(FetchClientConfRequest) returns
(FetchClientConfResponse);
// Get remote storage from coordinator
rpc fetchRemoteStorage(FetchRemoteStorageRequest) returns
(FetchRemoteStorageResponse);
@@ -463,6 +464,16 @@ message AccessClusterResponse {
string uuid = 3;
}
+message FetchClientConfRequest {
+ string user = 1;
+ /**
+ For the potential extension for customize delegation shuffle manager.
+ You could put more internal properties in customize delegation shuffle
manager,
+ and then it could be as metadata in your own customize
ClientConfApplyStrategy.
+ */
+ map<string, string> properties = 2;
+}
+
message FetchClientConfResponse {
StatusCode status = 1;
string retMsg = 2;