This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 44fad8d32 [#363] improvement(server): Make the coordinator client 
managed by CoordinatorClientFactory singleton (#1377)
44fad8d32 is described below

commit 44fad8d327ac172453d51179817dfc40f0550513
Author: Qing <[email protected]>
AuthorDate: Mon Dec 25 10:09:24 2023 +0800

    [#363] improvement(server): Make the coordinator client managed by 
CoordinatorClientFactory singleton (#1377)
    
    ### What changes were proposed in this pull request?
    
    Make the coordinator client managed by CoordinatorClientFactory singleton
    
    ### Why are the changes needed?
    
    Fix: https://github.com/apache/incubator-uniffle/issues/363
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    unit test .
---
 .../apache/spark/shuffle/RssSparkShuffleUtils.java  |  6 +++---
 .../uniffle/client/impl/ShuffleWriteClientImpl.java |  5 +++--
 .../org/apache/uniffle/test/AccessClusterTest.java  |  4 ++--
 .../apache/uniffle/test/CoordinatorTestBase.java    |  5 +++--
 .../uniffle/test/ShuffleServerOnRandomPortTest.java |  3 +++
 .../client/factory/CoordinatorClientFactory.java    | 21 ++++++++++++++-------
 .../client/impl/grpc/CoordinatorGrpcClient.java     |  8 +++++++-
 .../apache/uniffle/server/RegisterHeartBeat.java    |  7 ++++---
 8 files changed, 39 insertions(+), 20 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
index 0edef8f40..e846eb7f4 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
@@ -108,9 +108,9 @@ public class RssSparkShuffleUtils {
   public static List<CoordinatorClient> createCoordinatorClients(SparkConf 
sparkConf) {
     String clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
     String coordinators = sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM);
-    CoordinatorClientFactory coordinatorClientFactory =
-        new CoordinatorClientFactory(ClientType.valueOf(clientType));
-    return coordinatorClientFactory.createCoordinatorClient(coordinators);
+    CoordinatorClientFactory coordinatorClientFactory = 
CoordinatorClientFactory.getInstance();
+    return coordinatorClientFactory.createCoordinatorClient(
+        ClientType.valueOf(clientType), coordinators);
   }
 
   public static void applyDynamicClientConf(SparkConf sparkConf, Map<String, 
String> confItems) {
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 6ad33e026..49be1d325 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -131,7 +131,7 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     this.clientType = builder.getClientType();
     this.retryMax = builder.getRetryMax();
     this.retryIntervalMax = builder.getRetryIntervalMax();
-    this.coordinatorClientFactory = new 
CoordinatorClientFactory(ClientType.valueOf(clientType));
+    this.coordinatorClientFactory = CoordinatorClientFactory.getInstance();
     this.heartBeatExecutorService =
         ThreadUtils.getDaemonFixedThreadPool(builder.getHeartBeatThreadNum(), 
"client-heartbeat");
     this.replica = builder.getReplica();
@@ -562,7 +562,8 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
   @Override
   public void registerCoordinators(String coordinators) {
     List<CoordinatorClient> clients =
-        coordinatorClientFactory.createCoordinatorClient(coordinators);
+        coordinatorClientFactory.createCoordinatorClient(
+            ClientType.valueOf(this.clientType), coordinators);
     coordinatorClients.addAll(clients);
   }
 
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
index f25927021..aa56d4d56 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
@@ -173,8 +173,8 @@ public class AccessClusterTest extends CoordinatorTestBase {
     Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
 
     CoordinatorClient client =
-        new CoordinatorClientFactory(ClientType.GRPC)
-            .createCoordinatorClient(LOCALHOST, COORDINATOR_PORT_1 + 13);
+        CoordinatorClientFactory.getInstance()
+            .createCoordinatorClient(ClientType.GRPC, LOCALHOST, 
COORDINATOR_PORT_1 + 13);
     request =
         new RssAccessClusterRequest(
             accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, 
"user");
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
index 7a3110a94..4c6def032 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
@@ -26,13 +26,14 @@ import org.apache.uniffle.common.ClientType;
 
 public class CoordinatorTestBase extends IntegrationTestBase {
 
-  protected CoordinatorClientFactory factory = new 
CoordinatorClientFactory(ClientType.GRPC);
+  protected CoordinatorClientFactory factory = 
CoordinatorClientFactory.getInstance();
   protected CoordinatorGrpcClient coordinatorClient;
 
   @BeforeEach
   public void createClient() {
     coordinatorClient =
-        (CoordinatorGrpcClient) factory.createCoordinatorClient(LOCALHOST, 
COORDINATOR_PORT_1);
+        (CoordinatorGrpcClient)
+            factory.createCoordinatorClient(ClientType.GRPC, LOCALHOST, 
COORDINATOR_PORT_1);
   }
 
   @AfterEach
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java
index 28d19065e..8f1ccbab7 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java
@@ -36,14 +36,17 @@ public class ShuffleServerOnRandomPortTest extends 
CoordinatorTestBase {
     coordinatorConf.setLong("rss.coordinator.app.expired", 2000);
     coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout", 3000);
     createCoordinatorServer(coordinatorConf);
+
     ShuffleServerConf shuffleServerConf = getShuffleServerConf();
     shuffleServerConf.setInteger("rss.server.netty.port", 0);
     shuffleServerConf.setInteger("rss.rpc.server.port", 0);
     shuffleServerConf.setInteger("rss.random.port.min", 30000);
     shuffleServerConf.setInteger("rss.random.port.max", 40000);
     createShuffleServer(shuffleServerConf);
+
     shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
     createShuffleServer(shuffleServerConf);
+
     startServers();
   }
 
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
index b1744dce6..c01cb8d76 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
@@ -33,13 +33,18 @@ import org.apache.uniffle.common.exception.RssException;
 public class CoordinatorClientFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorClientFactory.class);
 
-  private ClientType clientType;
+  private CoordinatorClientFactory() {}
 
-  public CoordinatorClientFactory(ClientType clientType) {
-    this.clientType = clientType;
+  private static class LazyHolder {
+    static final CoordinatorClientFactory INSTANCE = new 
CoordinatorClientFactory();
   }
 
-  public CoordinatorClient createCoordinatorClient(String host, int port) {
+  public static CoordinatorClientFactory getInstance() {
+    return LazyHolder.INSTANCE;
+  }
+
+  public synchronized CoordinatorClient createCoordinatorClient(
+      ClientType clientType, String host, int port) {
     if (clientType.equals(ClientType.GRPC) || 
clientType.equals(ClientType.GRPC_NETTY)) {
       return new CoordinatorGrpcClient(host, port);
     } else {
@@ -47,9 +52,10 @@ public class CoordinatorClientFactory {
     }
   }
 
-  public List<CoordinatorClient> createCoordinatorClient(String coordinators) {
+  public synchronized List<CoordinatorClient> createCoordinatorClient(
+      ClientType clientType, String coordinators) {
     LOG.info("Start to create coordinator clients from {}", coordinators);
-    List<CoordinatorClient> coordinatorClients = Lists.newLinkedList();
+
     String[] coordinatorList = coordinators.trim().split(",");
     if (coordinatorList.length == 0) {
       String msg = "Invalid " + coordinators;
@@ -57,6 +63,7 @@ public class CoordinatorClientFactory {
       throw new RssException(msg);
     }
 
+    List<CoordinatorClient> coordinatorClients = Lists.newLinkedList();
     for (String coordinator : coordinatorList) {
       String[] ipPort = coordinator.trim().split(":");
       if (ipPort.length != 2) {
@@ -67,7 +74,7 @@ public class CoordinatorClientFactory {
 
       String host = ipPort[0];
       int port = Integer.parseInt(ipPort[1]);
-      CoordinatorClient coordinatorClient = createCoordinatorClient(host, 
port);
+      CoordinatorClient coordinatorClient = 
createCoordinatorClient(clientType, host, port);
       coordinatorClients.add(coordinatorClient);
       LOG.info("Add coordinator client {}", coordinatorClient.getDesc());
     }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index c3da5f484..728b184ec 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -90,6 +90,12 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
   public CoordinatorGrpcClient(String host, int port, int maxRetryAttempts, 
boolean usePlaintext) {
     super(host, port, maxRetryAttempts, usePlaintext);
     blockingStub = CoordinatorServerGrpc.newBlockingStub(channel);
+    LOG.info(
+        "Created CoordinatorGrpcClient, host:{}, port:{}, maxRetryAttempts:{}, 
usePlaintext:{}",
+        host,
+        port,
+        maxRetryAttempts,
+        usePlaintext);
   }
 
   public CoordinatorGrpcClient(ManagedChannel channel) {
@@ -145,7 +151,7 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
       response = blockingStub.withDeadlineAfter(timeout, 
TimeUnit.MILLISECONDS).heartbeat(request);
       status = response.getStatus();
     } catch (StatusRuntimeException e) {
-      LOG.error(e.getMessage());
+      LOG.error("Failed to doSendHeartBeat, request: {}", request, e);
       status = RssProtos.StatusCode.TIMEOUT;
     } catch (Exception e) {
       LOG.error(e.getMessage());
diff --git 
a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java 
b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index f83cb7d2d..95b7b729c 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -57,9 +57,10 @@ public class RegisterHeartBeat {
     this.heartBeatInitialDelay = 
conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_DELAY);
     this.heartBeatInterval = 
conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL);
     this.coordinatorQuorum = 
conf.getString(ShuffleServerConf.RSS_COORDINATOR_QUORUM);
-    CoordinatorClientFactory factory =
-        new 
CoordinatorClientFactory(conf.get(ShuffleServerConf.RSS_CLIENT_TYPE));
-    this.coordinatorClients = 
factory.createCoordinatorClient(this.coordinatorQuorum);
+    CoordinatorClientFactory factory = CoordinatorClientFactory.getInstance();
+    this.coordinatorClients =
+        factory.createCoordinatorClient(
+            conf.get(ShuffleServerConf.RSS_CLIENT_TYPE), 
this.coordinatorQuorum);
     this.shuffleServer = shuffleServer;
     this.heartBeatExecutorService =
         ThreadUtils.getDaemonFixedThreadPool(

Reply via email to