This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ed2c00e3 [ISSUE-300] Make config type of RSS_CLIENT_TYPE as enum
(#310)
ed2c00e3 is described below
commit ed2c00e32f62bb7e46e757b76f084bd456a3c022
Author: Leping Huang <[email protected]>
AuthorDate: Mon Nov 28 09:55:03 2022 +0800
[ISSUE-300] Make config type of RSS_CLIENT_TYPE as enum (#310)
### What changes were proposed in this pull request?
1. move the ClentType.java from internal-client module to common module
2. change stringType to enumType for RSS_CLIENT_TYPE in the RssBaseConf.java
### Why are the changes needed?
Easy to add different extensions #300
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Already added
Co-authored-by: roryqi <[email protected]>
---
.../main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java | 3 ++-
.../org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java | 3 ++-
.../src/main/java/org/apache/uniffle/common}/ClientType.java | 2 +-
.../main/java/org/apache/uniffle/common/config/RssBaseConf.java | 8 +++++---
.../src/test/java/org/apache/uniffle/test/AccessClusterTest.java | 3 ++-
.../test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java | 2 +-
.../java/org/apache/uniffle/test/CoordinatorAssignmentTest.java | 2 +-
.../test/java/org/apache/uniffle/test/CoordinatorTestBase.java | 3 ++-
.../common/src/test/java/org/apache/uniffle/test/QuorumTest.java | 2 +-
.../java/org/apache/uniffle/test/ShuffleWithRssClientTest.java | 2 +-
.../apache/uniffle/client/factory/CoordinatorClientFactory.java | 8 ++++----
.../apache/uniffle/client/factory/ShuffleServerClientFactory.java | 2 +-
.../main/java/org/apache/uniffle/server/RegisterHeartBeat.java | 2 +-
.../org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java | 2 +-
14 files changed, 25 insertions(+), 19 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
index b7409687..6b4cc29f 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.factory.CoordinatorClientFactory;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;
@@ -77,7 +78,7 @@ public class RssSparkShuffleUtils {
public static List<CoordinatorClient> createCoordinatorClients(SparkConf
sparkConf) throws RuntimeException {
String clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
String coordinators = sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM);
- CoordinatorClientFactory coordinatorClientFactory = new
CoordinatorClientFactory(clientType);
+ CoordinatorClientFactory coordinatorClientFactory = new
CoordinatorClientFactory(ClientType.valueOf(clientType));
return coordinatorClientFactory.createCoordinatorClient(coordinators);
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index def845fd..1d76a41a 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -75,6 +75,7 @@ import
org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.client.util.ClientUtils;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
@@ -121,7 +122,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
this.clientType = clientType;
this.retryMax = retryMax;
this.retryIntervalMax = retryIntervalMax;
- this.coordinatorClientFactory = new CoordinatorClientFactory(clientType);
+ this.coordinatorClientFactory = new
CoordinatorClientFactory(ClientType.valueOf(clientType));
this.heartBeatExecutorService =
Executors.newFixedThreadPool(heartBeatThreadNum,
ThreadUtils.getThreadFactory("client-heartbeat-%d"));
this.replica = replica;
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/util/ClientType.java
b/common/src/main/java/org/apache/uniffle/common/ClientType.java
similarity index 95%
rename from
internal-client/src/main/java/org/apache/uniffle/client/util/ClientType.java
rename to common/src/main/java/org/apache/uniffle/common/ClientType.java
index d63c1286..7feccf4a 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/util/ClientType.java
+++ b/common/src/main/java/org/apache/uniffle/common/ClientType.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.client.util;
+package org.apache.uniffle.common;
public enum ClientType {
GRPC
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index cd895ccd..dee1d062 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -20,6 +20,8 @@ package org.apache.uniffle.common.config;
import java.util.List;
import java.util.Map;
+import org.apache.uniffle.common.ClientType;
+
public class RssBaseConf extends RssConf {
public static final ConfigOption<String> RSS_COORDINATOR_QUORUM =
ConfigOptions
@@ -120,10 +122,10 @@ public class RssBaseConf extends RssConf {
.defaultValue(1024L * 1024L * 1024L)
.withDescription("Max size of rpc message (byte)");
- public static final ConfigOption<String> RSS_CLIENT_TYPE = ConfigOptions
+ public static final ConfigOption<ClientType> RSS_CLIENT_TYPE = ConfigOptions
.key("rss.rpc.client.type")
- .stringType()
- .defaultValue("GRPC")
+ .enumType(ClientType.class)
+ .defaultValue(ClientType.GRPC)
.withDescription("client type for rss");
public static final ConfigOption<String> RSS_STORAGE_TYPE = ConfigOptions
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
index 4ee5b651..d65bf00b 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
@@ -37,6 +37,7 @@ import
org.apache.uniffle.client.factory.CoordinatorClientFactory;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.AccessCheckResult;
import org.apache.uniffle.coordinator.AccessChecker;
@@ -153,7 +154,7 @@ public class AccessClusterTest extends CoordinatorTestBase {
shuffleServer.start();
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
- CoordinatorClient client = new CoordinatorClientFactory("GRPC")
+ CoordinatorClient client = new CoordinatorClientFactory(ClientType.GRPC)
.createCoordinatorClient(LOCALHOST, COORDINATOR_PORT_1 + 13);
request = new RssAccessClusterRequest(accessId,
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
index f54e6050..bf28994f 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
@@ -36,7 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
-import org.apache.uniffle.client.util.ClientType;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index 234ae4cd..a767a543 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
-import org.apache.uniffle.client.util.ClientType;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.coordinator.CoordinatorConf;
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
index 5592a2e7..7a3110a9 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
@@ -22,10 +22,11 @@ import org.junit.jupiter.api.BeforeEach;
import org.apache.uniffle.client.factory.CoordinatorClientFactory;
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcClient;
+import org.apache.uniffle.common.ClientType;
public class CoordinatorTestBase extends IntegrationTestBase {
- protected CoordinatorClientFactory factory = new
CoordinatorClientFactory("GRPC");
+ protected CoordinatorClientFactory factory = new
CoordinatorClientFactory(ClientType.GRPC);
protected CoordinatorGrpcClient coordinatorClient;
@BeforeEach
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 7379e8f6..dd339d7a 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -37,8 +37,8 @@ import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.response.SendShuffleDataResult;
-import org.apache.uniffle.client.util.ClientType;
import org.apache.uniffle.client.util.DefaultIdHelper;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index ca62f7bf..a4e505ec 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -35,9 +35,9 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
-import org.apache.uniffle.client.util.ClientType;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.DefaultIdHelper;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
index 0c17cdad..b9f23058 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
@@ -27,19 +27,19 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcClient;
-import org.apache.uniffle.client.util.ClientType;
+import org.apache.uniffle.common.ClientType;
public class CoordinatorClientFactory {
private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorClientFactory.class);
- private String clientType;
+ private ClientType clientType;
- public CoordinatorClientFactory(String clientType) {
+ public CoordinatorClientFactory(ClientType clientType) {
this.clientType = clientType;
}
public CoordinatorClient createCoordinatorClient(String host, int port) {
- if (clientType.equalsIgnoreCase(ClientType.GRPC.name())) {
+ if (clientType.equals(ClientType.GRPC)) {
return new CoordinatorGrpcClient(host, port);
} else {
throw new UnsupportedOperationException("Unsupported client type " +
clientType);
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
index 3c36c4bb..4337b855 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
@@ -23,7 +23,7 @@ import com.google.common.collect.Maps;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
-import org.apache.uniffle.client.util.ClientType;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleServerInfo;
public class ShuffleServerClientFactory {
diff --git
a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index 07a6c9d4..f815a02e 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -58,7 +58,7 @@ public class RegisterHeartBeat {
this.heartBeatTimeout =
conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_TIMEOUT);
this.coordinatorQuorum =
conf.getString(ShuffleServerConf.RSS_COORDINATOR_QUORUM);
CoordinatorClientFactory factory =
- new
CoordinatorClientFactory(conf.getString(ShuffleServerConf.RSS_CLIENT_TYPE));
+ new
CoordinatorClientFactory(conf.get(ShuffleServerConf.RSS_CLIENT_TYPE));
this.coordinatorClients =
factory.createCoordinatorClient(this.coordinatorQuorum);
this.shuffleServer = shuffleServer;
this.heartBeatExecutorService = Executors.newFixedThreadPool(
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index dbe4ac3e..ef0fbe47 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -24,7 +24,7 @@ import java.util.stream.Collectors;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
-import org.apache.uniffle.client.util.ClientType;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;