This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 44fad8d32 [#363] improvement(server): Make the coordinator client
managed by CoordinatorClientFactory singleton (#1377)
44fad8d32 is described below
commit 44fad8d327ac172453d51179817dfc40f0550513
Author: Qing <[email protected]>
AuthorDate: Mon Dec 25 10:09:24 2023 +0800
[#363] improvement(server): Make the coordinator client managed by
CoordinatorClientFactory singleton (#1377)
### What changes were proposed in this pull request?
Make the coordinator client managed by CoordinatorClientFactory singleton
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/363
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test .
---
.../apache/spark/shuffle/RssSparkShuffleUtils.java | 6 +++---
.../uniffle/client/impl/ShuffleWriteClientImpl.java | 5 +++--
.../org/apache/uniffle/test/AccessClusterTest.java | 4 ++--
.../apache/uniffle/test/CoordinatorTestBase.java | 5 +++--
.../uniffle/test/ShuffleServerOnRandomPortTest.java | 3 +++
.../client/factory/CoordinatorClientFactory.java | 21 ++++++++++++++-------
.../client/impl/grpc/CoordinatorGrpcClient.java | 8 +++++++-
.../apache/uniffle/server/RegisterHeartBeat.java | 7 ++++---
8 files changed, 39 insertions(+), 20 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
index 0edef8f40..e846eb7f4 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
@@ -108,9 +108,9 @@ public class RssSparkShuffleUtils {
public static List<CoordinatorClient> createCoordinatorClients(SparkConf
sparkConf) {
String clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
String coordinators = sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM);
- CoordinatorClientFactory coordinatorClientFactory =
- new CoordinatorClientFactory(ClientType.valueOf(clientType));
- return coordinatorClientFactory.createCoordinatorClient(coordinators);
+ CoordinatorClientFactory coordinatorClientFactory =
CoordinatorClientFactory.getInstance();
+ return coordinatorClientFactory.createCoordinatorClient(
+ ClientType.valueOf(clientType), coordinators);
}
public static void applyDynamicClientConf(SparkConf sparkConf, Map<String,
String> confItems) {
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 6ad33e026..49be1d325 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -131,7 +131,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
this.clientType = builder.getClientType();
this.retryMax = builder.getRetryMax();
this.retryIntervalMax = builder.getRetryIntervalMax();
- this.coordinatorClientFactory = new
CoordinatorClientFactory(ClientType.valueOf(clientType));
+ this.coordinatorClientFactory = CoordinatorClientFactory.getInstance();
this.heartBeatExecutorService =
ThreadUtils.getDaemonFixedThreadPool(builder.getHeartBeatThreadNum(),
"client-heartbeat");
this.replica = builder.getReplica();
@@ -562,7 +562,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
@Override
public void registerCoordinators(String coordinators) {
List<CoordinatorClient> clients =
- coordinatorClientFactory.createCoordinatorClient(coordinators);
+ coordinatorClientFactory.createCoordinatorClient(
+ ClientType.valueOf(this.clientType), coordinators);
coordinatorClients.addAll(clients);
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
index f25927021..aa56d4d56 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
@@ -173,8 +173,8 @@ public class AccessClusterTest extends CoordinatorTestBase {
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
CoordinatorClient client =
- new CoordinatorClientFactory(ClientType.GRPC)
- .createCoordinatorClient(LOCALHOST, COORDINATOR_PORT_1 + 13);
+ CoordinatorClientFactory.getInstance()
+ .createCoordinatorClient(ClientType.GRPC, LOCALHOST,
COORDINATOR_PORT_1 + 13);
request =
new RssAccessClusterRequest(
accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000,
"user");
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
index 7a3110a94..4c6def032 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
@@ -26,13 +26,14 @@ import org.apache.uniffle.common.ClientType;
public class CoordinatorTestBase extends IntegrationTestBase {
- protected CoordinatorClientFactory factory = new
CoordinatorClientFactory(ClientType.GRPC);
+ protected CoordinatorClientFactory factory =
CoordinatorClientFactory.getInstance();
protected CoordinatorGrpcClient coordinatorClient;
@BeforeEach
public void createClient() {
coordinatorClient =
- (CoordinatorGrpcClient) factory.createCoordinatorClient(LOCALHOST,
COORDINATOR_PORT_1);
+ (CoordinatorGrpcClient)
+ factory.createCoordinatorClient(ClientType.GRPC, LOCALHOST,
COORDINATOR_PORT_1);
}
@AfterEach
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java
index 28d19065e..8f1ccbab7 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java
@@ -36,14 +36,17 @@ public class ShuffleServerOnRandomPortTest extends
CoordinatorTestBase {
coordinatorConf.setLong("rss.coordinator.app.expired", 2000);
coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout", 3000);
createCoordinatorServer(coordinatorConf);
+
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setInteger("rss.server.netty.port", 0);
shuffleServerConf.setInteger("rss.rpc.server.port", 0);
shuffleServerConf.setInteger("rss.random.port.min", 30000);
shuffleServerConf.setInteger("rss.random.port.max", 40000);
createShuffleServer(shuffleServerConf);
+
shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
createShuffleServer(shuffleServerConf);
+
startServers();
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
index b1744dce6..c01cb8d76 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
@@ -33,13 +33,18 @@ import org.apache.uniffle.common.exception.RssException;
public class CoordinatorClientFactory {
private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorClientFactory.class);
- private ClientType clientType;
+ private CoordinatorClientFactory() {}
- public CoordinatorClientFactory(ClientType clientType) {
- this.clientType = clientType;
+ private static class LazyHolder {
+ static final CoordinatorClientFactory INSTANCE = new
CoordinatorClientFactory();
}
- public CoordinatorClient createCoordinatorClient(String host, int port) {
+ public static CoordinatorClientFactory getInstance() {
+ return LazyHolder.INSTANCE;
+ }
+
+ public synchronized CoordinatorClient createCoordinatorClient(
+ ClientType clientType, String host, int port) {
if (clientType.equals(ClientType.GRPC) ||
clientType.equals(ClientType.GRPC_NETTY)) {
return new CoordinatorGrpcClient(host, port);
} else {
@@ -47,9 +52,10 @@ public class CoordinatorClientFactory {
}
}
- public List<CoordinatorClient> createCoordinatorClient(String coordinators) {
+ public synchronized List<CoordinatorClient> createCoordinatorClient(
+ ClientType clientType, String coordinators) {
LOG.info("Start to create coordinator clients from {}", coordinators);
- List<CoordinatorClient> coordinatorClients = Lists.newLinkedList();
+
String[] coordinatorList = coordinators.trim().split(",");
if (coordinatorList.length == 0) {
String msg = "Invalid " + coordinators;
@@ -57,6 +63,7 @@ public class CoordinatorClientFactory {
throw new RssException(msg);
}
+ List<CoordinatorClient> coordinatorClients = Lists.newLinkedList();
for (String coordinator : coordinatorList) {
String[] ipPort = coordinator.trim().split(":");
if (ipPort.length != 2) {
@@ -67,7 +74,7 @@ public class CoordinatorClientFactory {
String host = ipPort[0];
int port = Integer.parseInt(ipPort[1]);
- CoordinatorClient coordinatorClient = createCoordinatorClient(host,
port);
+ CoordinatorClient coordinatorClient =
createCoordinatorClient(clientType, host, port);
coordinatorClients.add(coordinatorClient);
LOG.info("Add coordinator client {}", coordinatorClient.getDesc());
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index c3da5f484..728b184ec 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -90,6 +90,12 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
public CoordinatorGrpcClient(String host, int port, int maxRetryAttempts,
boolean usePlaintext) {
super(host, port, maxRetryAttempts, usePlaintext);
blockingStub = CoordinatorServerGrpc.newBlockingStub(channel);
+ LOG.info(
+ "Created CoordinatorGrpcClient, host:{}, port:{}, maxRetryAttempts:{},
usePlaintext:{}",
+ host,
+ port,
+ maxRetryAttempts,
+ usePlaintext);
}
public CoordinatorGrpcClient(ManagedChannel channel) {
@@ -145,7 +151,7 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
response = blockingStub.withDeadlineAfter(timeout,
TimeUnit.MILLISECONDS).heartbeat(request);
status = response.getStatus();
} catch (StatusRuntimeException e) {
- LOG.error(e.getMessage());
+ LOG.error("Failed to doSendHeartBeat, request: {}", request, e);
status = RssProtos.StatusCode.TIMEOUT;
} catch (Exception e) {
LOG.error(e.getMessage());
diff --git
a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index f83cb7d2d..95b7b729c 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -57,9 +57,10 @@ public class RegisterHeartBeat {
this.heartBeatInitialDelay =
conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_DELAY);
this.heartBeatInterval =
conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL);
this.coordinatorQuorum =
conf.getString(ShuffleServerConf.RSS_COORDINATOR_QUORUM);
- CoordinatorClientFactory factory =
- new
CoordinatorClientFactory(conf.get(ShuffleServerConf.RSS_CLIENT_TYPE));
- this.coordinatorClients =
factory.createCoordinatorClient(this.coordinatorQuorum);
+ CoordinatorClientFactory factory = CoordinatorClientFactory.getInstance();
+ this.coordinatorClients =
+ factory.createCoordinatorClient(
+ conf.get(ShuffleServerConf.RSS_CLIENT_TYPE),
this.coordinatorQuorum);
this.shuffleServer = shuffleServer;
this.heartBeatExecutorService =
ThreadUtils.getDaemonFixedThreadPool(