This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new d150f233e [CELEBORN-748] Rename RssHARetryClient to MasterClient
d150f233e is described below

commit d150f233e79b02f924ae310adb103377e027e2cb
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jun 29 16:47:15 2023 +0800

    [CELEBORN-748] Rename RssHARetryClient to MasterClient
    
    ### What changes were proposed in this pull request?
    
    Rename RssHARetryClient to MasterClient
    
    ### Why are the changes needed?
    
    Code refactor
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #1661 from AngersZhuuuu/CELEBORN-748.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 1fd8833756721291d62db5151a8b9ccb91ea2c66)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../apache/celeborn/client/ShuffleClientImpl.java    |  5 ++---
 .../celeborn/client/ApplicationHeartbeater.scala     |  8 ++++----
 .../apache/celeborn/client/LifecycleManager.scala    | 18 +++++++++---------
 .../MasterClient.java}                               |  8 ++++----
 .../MasterNotLeaderException.java                    |  2 +-
 .../MasterClientSuiteJ.java}                         | 20 ++++++++++----------
 .../deploy/master/clustermeta/ha/HAHelper.java       |  2 +-
 .../master/clustermeta/ha/HAMasterMetaManager.java   |  4 ++--
 .../deploy/master/clustermeta/ha/HARaftServer.java   |  4 ++--
 .../celeborn/service/deploy/master/Master.scala      |  6 +++---
 .../master/clustermeta/DefaultMetaSystemSuiteJ.java  |  5 ++---
 .../ha/RatisMasterStatusSystemSuiteJ.java            |  5 ++---
 .../celeborn/service/deploy/worker/Worker.scala      | 18 +++++++++---------
 13 files changed, 51 insertions(+), 54 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index c7c07a1ad..9927ea64a 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -37,8 +37,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.celeborn.client.compress.Compressor;
 import org.apache.celeborn.client.read.RssInputStream;
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.client.MasterClient;
 import org.apache.celeborn.common.exception.CelebornIOException;
-import org.apache.celeborn.common.haclient.RssHARetryClient;
 import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.network.TransportContext;
 import org.apache.celeborn.common.network.buffer.NettyManagedBuffer;
@@ -1496,8 +1496,7 @@ public class ShuffleClientImpl extends ShuffleClient {
     if (isDriver) {
       try {
         driverRssMetaService.send(
-            UnregisterShuffle$.MODULE$.apply(
-                appUniqueId, shuffleId, RssHARetryClient.genRequestId()));
+            UnregisterShuffle$.MODULE$.apply(appUniqueId, shuffleId, 
MasterClient.genRequestId()));
       } catch (Exception e) {
         // If some exceptions need to be ignored, they shouldn't be logged as 
error-level,
         // otherwise it will mislead users.
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index dd8a5d389..38dc2a47d 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 import scala.concurrent.duration.DurationInt
 
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.haclient.RssHARetryClient
+import org.apache.celeborn.common.client.MasterClient
 import org.apache.celeborn.common.internal.Logging
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.{HeartbeatFromApplication,
 HeartbeatFromApplicationResponse, ZERO_UUID}
 import org.apache.celeborn.common.protocol.message.StatusCode
@@ -32,7 +32,7 @@ import org.apache.celeborn.common.util.{ThreadUtils, Utils}
 class ApplicationHeartbeater(
     appId: String,
     conf: CelebornConf,
-    rssHARetryClient: RssHARetryClient,
+    masterClient: MasterClient,
     shuffleMetrics: () => (Long, Long),
     workerStatusTracker: WorkerStatusTracker) extends Logging {
 
@@ -47,7 +47,7 @@ class ApplicationHeartbeater(
       new Runnable {
         override def run(): Unit = {
           try {
-            require(rssHARetryClient != null, "When sending a heartbeat, 
client shouldn't be null.")
+            require(masterClient != null, "When sending a heartbeat, client 
shouldn't be null.")
             val (tmpTotalWritten, tmpTotalFileCount) = shuffleMetrics()
             logInfo("Send app heartbeat with " +
               s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: 
$tmpTotalFileCount")
@@ -82,7 +82,7 @@ class ApplicationHeartbeater(
   private def requestHeartbeat(message: HeartbeatFromApplication)
       : HeartbeatFromApplicationResponse = {
     try {
-      rssHARetryClient.askSync[HeartbeatFromApplicationResponse](
+      masterClient.askSync[HeartbeatFromApplicationResponse](
         message,
         classOf[HeartbeatFromApplicationResponse])
     } catch {
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 3e43d7b72..06e1a3de8 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -30,7 +30,7 @@ import com.google.common.annotations.VisibleForTesting
 import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, 
ShuffleFailedWorkers}
 import org.apache.celeborn.client.listener.WorkerStatusListener
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.haclient.RssHARetryClient
+import org.apache.celeborn.common.client.MasterClient
 import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, 
WorkerInfo}
@@ -118,14 +118,14 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
 
   logInfo(s"Starting LifecycleManager on ${rpcEnv.address}")
 
-  private val rssHARetryClient = new RssHARetryClient(rpcEnv, conf)
+  private val masterClient = new MasterClient(rpcEnv, conf)
   val commitManager = new CommitManager(appUniqueId, conf, this)
   val workerStatusTracker = new WorkerStatusTracker(conf, this)
   private val heartbeater =
     new ApplicationHeartbeater(
       appUniqueId,
       conf,
-      rssHARetryClient,
+      masterClient,
       () => commitManager.commitMetrics(),
       workerStatusTracker)
   private val changePartitionManager = new ChangePartitionManager(conf, this)
@@ -168,7 +168,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     releasePartitionManager.stop()
     heartbeater.stop()
 
-    rssHARetryClient.close()
+    masterClient.close()
     if (rpcEnv != null) {
       rpcEnv.shutdown()
       rpcEnv.awaitTermination()
@@ -1023,7 +1023,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         commitManager.removeExpiredShuffle(shuffleId)
         changePartitionManager.removeExpiredShuffle(shuffleId)
         val unregisterShuffleResponse = requestMasterUnregisterShuffle(
-          UnregisterShuffle(appUniqueId, shuffleId, 
RssHARetryClient.genRequestId()))
+          UnregisterShuffle(appUniqueId, shuffleId, 
MasterClient.genRequestId()))
         // if unregister shuffle not success, wait next turn
         if (StatusCode.SUCCESS == 
Utils.toStatusCode(unregisterShuffleResponse.getStatus)) {
           unregisterShuffleTime.remove(shuffleId)
@@ -1055,7 +1055,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
   private def requestMasterRequestSlots(message: RequestSlots): 
RequestSlotsResponse = {
     val shuffleKey = Utils.makeShuffleKey(message.applicationId, 
message.shuffleId)
     try {
-      rssHARetryClient.askSync[RequestSlotsResponse](message, 
classOf[RequestSlotsResponse])
+      masterClient.askSync[RequestSlotsResponse](message, 
classOf[RequestSlotsResponse])
     } catch {
       case e: Exception =>
         logError(s"AskSync RegisterShuffle for $shuffleKey failed.", e)
@@ -1095,7 +1095,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
 
   private def requestMasterReleaseSlots(message: ReleaseSlots): 
ReleaseSlotsResponse = {
     try {
-      rssHARetryClient.askSync[ReleaseSlotsResponse](message, 
classOf[ReleaseSlotsResponse])
+      masterClient.askSync[ReleaseSlotsResponse](message, 
classOf[ReleaseSlotsResponse])
     } catch {
       case e: Exception =>
         logError(s"AskSync ReleaseSlots for ${message.shuffleId} failed.", e)
@@ -1106,7 +1106,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
   private def requestMasterUnregisterShuffle(message: PbUnregisterShuffle)
       : PbUnregisterShuffleResponse = {
     try {
-      rssHARetryClient.askSync[PbUnregisterShuffleResponse](
+      masterClient.askSync[PbUnregisterShuffleResponse](
         message,
         classOf[PbUnregisterShuffleResponse])
     } catch {
@@ -1118,7 +1118,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
 
   def checkQuota(): CheckQuotaResponse = {
     try {
-      rssHARetryClient.askSync[CheckQuotaResponse](
+      masterClient.askSync[CheckQuotaResponse](
         CheckQuota(userIdentifier),
         classOf[CheckQuotaResponse])
     } catch {
diff --git 
a/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java
 b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
similarity index 97%
rename from 
common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java
rename to 
common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
index 360de2af0..364255241 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.common.haclient;
+package org.apache.celeborn.common.client;
 
 import java.io.IOException;
 import java.util.UUID;
@@ -48,8 +48,8 @@ import org.apache.celeborn.common.rpc.RpcEnv;
 import org.apache.celeborn.common.rpc.RpcTimeout;
 import org.apache.celeborn.common.util.ThreadUtils;
 
-public class RssHARetryClient {
-  private static final Logger LOG = 
LoggerFactory.getLogger(RssHARetryClient.class);
+public class MasterClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MasterClient.class);
 
   private final RpcEnv rpcEnv;
   private final String[] masterEndpoints;
@@ -60,7 +60,7 @@ public class RssHARetryClient {
   private final AtomicReference<RpcEndpointRef> rpcEndpointRef;
   private final ExecutorService oneWayMessageSender;
 
-  public RssHARetryClient(RpcEnv rpcEnv, CelebornConf conf) {
+  public MasterClient(RpcEnv rpcEnv, CelebornConf conf) {
     this.rpcEnv = rpcEnv;
     this.masterEndpoints = conf.masterEndpoints();
     this.maxRetries = Math.max(masterEndpoints.length, 
conf.masterClientMaxRetries());
diff --git 
a/common/src/main/java/org/apache/celeborn/common/haclient/MasterNotLeaderException.java
 
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
similarity index 97%
rename from 
common/src/main/java/org/apache/celeborn/common/haclient/MasterNotLeaderException.java
rename to 
common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
index bbeeb1e95..10c3c24ea 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/haclient/MasterNotLeaderException.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.common.haclient;
+package org.apache.celeborn.common.client;
 
 import java.io.IOException;
 
diff --git 
a/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
 
b/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
similarity index 95%
rename from 
common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
rename to 
common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
index 165029b46..a902fd2a4 100644
--- 
a/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
+++ 
b/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.common.haclient;
+package org.apache.celeborn.common.client;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -46,8 +46,8 @@ import org.apache.celeborn.common.rpc.RpcAddress;
 import org.apache.celeborn.common.rpc.RpcEndpointRef;
 import org.apache.celeborn.common.rpc.RpcEnv;
 
-public class RssHARetryClientSuiteJ {
-  private static final Logger LOG = 
LoggerFactory.getLogger(RssHARetryClientSuiteJ.class);
+public class MasterClientSuiteJ {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MasterClientSuiteJ.class);
 
   private final String masterHost = "localhost";
   private final int masterPort = 9097;
@@ -79,7 +79,7 @@ public class RssHARetryClientSuiteJ {
         });
     prepareForRpcEnvWithoutHA();
 
-    RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+    MasterClient client = new MasterClient(rpcEnv, conf);
     HeartbeatFromApplication message = 
Mockito.mock(HeartbeatFromApplication.class);
 
     try {
@@ -106,7 +106,7 @@ public class RssHARetryClientSuiteJ {
         });
     prepareForRpcEnvWithoutHA();
 
-    RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+    MasterClient client = new MasterClient(rpcEnv, conf);
     HeartbeatFromApplication message = 
Mockito.mock(HeartbeatFromApplication.class);
 
     try {
@@ -132,7 +132,7 @@ public class RssHARetryClientSuiteJ {
           return Future$.MODULE$.successful(response);
         });
 
-    RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+    MasterClient client = new MasterClient(rpcEnv, conf);
     HeartbeatFromApplication message = 
Mockito.mock(HeartbeatFromApplication.class);
 
     try {
@@ -152,7 +152,7 @@ public class RssHARetryClientSuiteJ {
     prepareForEndpointRefWithoutRetry(() -> 
Future$.MODULE$.successful(mockResponse));
     prepareForRpcEnvWithoutHA();
 
-    RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+    MasterClient client = new MasterClient(rpcEnv, conf);
     HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
 
     HeartbeatFromWorkerResponse response = null;
@@ -174,7 +174,7 @@ public class RssHARetryClientSuiteJ {
     prepareForEndpointRefWithRetry(numTries, () -> 
Future$.MODULE$.successful(mockResponse));
     prepareForRpcEnvWithoutHA();
 
-    RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+    MasterClient client = new MasterClient(rpcEnv, conf);
     HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
 
     HeartbeatFromWorkerResponse response = null;
@@ -195,7 +195,7 @@ public class RssHARetryClientSuiteJ {
 
     prepareForRpcEnvWithHA(() -> Future$.MODULE$.successful(mockResponse));
 
-    RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+    MasterClient client = new MasterClient(rpcEnv, conf);
     HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
 
     HeartbeatFromWorkerResponse response = null;
@@ -254,7 +254,7 @@ public class RssHARetryClientSuiteJ {
         .when(rpcEnv)
         .setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString());
 
-    RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+    MasterClient client = new MasterClient(rpcEnv, conf);
     HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
 
     HeartbeatFromWorkerResponse response = null;
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
index 570c09d53..527b5d557 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
@@ -26,8 +26,8 @@ import 
org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorageUtil;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
+import org.apache.celeborn.common.client.MasterNotLeaderException;
 import org.apache.celeborn.common.exception.CelebornIOException;
-import org.apache.celeborn.common.haclient.MasterNotLeaderException;
 import org.apache.celeborn.common.rpc.RpcCallContext;
 import 
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager;
 import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index ebf75b310..2a0cecf61 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -25,8 +25,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.client.MasterClient;
 import org.apache.celeborn.common.exception.CelebornRuntimeException;
-import org.apache.celeborn.common.haclient.RssHARetryClient;
 import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.meta.AppDiskUsageMetric;
 import org.apache.celeborn.common.meta.DiskInfo;
@@ -318,7 +318,7 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
       ratisServer.submitRequest(
           ResourceRequest.newBuilder()
               .setCmdType(Type.UpdatePartitionSize)
-              .setRequestId(RssHARetryClient.genRequestId())
+              .setRequestId(MasterClient.genRequestId())
               .build());
     } catch (CelebornRuntimeException e) {
       LOG.error("Handle update partition size failed!", e);
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
index cd89fca19..f20cfff16 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
@@ -51,8 +51,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.client.MasterClient;
 import org.apache.celeborn.common.exception.CelebornRuntimeException;
-import org.apache.celeborn.common.haclient.RssHARetryClient;
 import org.apache.celeborn.common.util.ThreadUtils;
 import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
 import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;
@@ -198,7 +198,7 @@ public class HARaftServer {
   public ResourceResponse submitRequest(ResourceProtos.ResourceRequest request)
       throws CelebornRuntimeException {
     String requestId = request.getRequestId();
-    Tuple2<String, Long> decoded = RssHARetryClient.decodeRequestId(requestId);
+    Tuple2<String, Long> decoded = MasterClient.decodeRequestId(requestId);
     if (decoded == null) {
       throw new CelebornRuntimeException(
           "RequestId:" + requestId + " invalid, should be: uuid#callId.");
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 475229aa9..1c1c1908d 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
 import scala.util.Random
 
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.haclient.RssHARetryClient
+import org.apache.celeborn.common.client.MasterClient
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo}
@@ -369,7 +369,7 @@ private[celeborn] class Master(
           worker.pushPort,
           worker.fetchPort,
           worker.replicatePort,
-          RssHARetryClient.genRequestId()))
+          MasterClient.genRequestId()))
       }
       ind += 1
     }
@@ -384,7 +384,7 @@ private[celeborn] class Master(
     statusSystem.appHeartbeatTime.keySet().asScala.foreach { key =>
       if (statusSystem.appHeartbeatTime.get(key) < currentTime - 
appHeartbeatTimeoutMs) {
         logWarning(s"Application $key timeout, trigger applicationLost event.")
-        val requestId = RssHARetryClient.genRequestId()
+        val requestId = MasterClient.genRequestId()
         var res = self.askSync[ApplicationLostResponse](ApplicationLost(key, 
requestId))
         var retry = 1
         while (res.status != StatusCode.SUCCESS && retry <= 3) {
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 1b6df8205..6d0ce6e21 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -30,7 +30,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.haclient.RssHARetryClient;
+import org.apache.celeborn.common.client.MasterClient;
 import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.meta.DiskInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
@@ -105,8 +105,7 @@ public class DefaultMetaSystemSuiteJ {
   public void tearDown() throws Exception {}
 
   private String getNewReqeustId() {
-    return RssHARetryClient.encodeRequestId(
-        UUID.randomUUID().toString(), callerId.incrementAndGet());
+    return MasterClient.encodeRequestId(UUID.randomUUID().toString(), 
callerId.incrementAndGet());
   }
 
   @Test
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index f2ab80996..85bb1db61 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -29,8 +29,8 @@ import org.junit.*;
 import org.mockito.Mockito;
 
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.client.MasterClient;
 import org.apache.celeborn.common.exception.CelebornRuntimeException;
-import org.apache.celeborn.common.haclient.RssHARetryClient;
 import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.meta.DiskInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
@@ -195,8 +195,7 @@ public class RatisMasterStatusSystemSuiteJ {
   private static String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
 
   private String getNewReqeustId() {
-    return RssHARetryClient.encodeRequestId(
-        UUID.randomUUID().toString(), callerId.incrementAndGet());
+    return MasterClient.encodeRequestId(UUID.randomUUID().toString(), 
callerId.incrementAndGet());
   }
 
   public HAMasterMetaManager pickLeaderStatusSystem() {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index f6be07d6e..550adb05a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -30,8 +30,8 @@ import io.netty.util.HashedWheelTimer
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.CelebornConf._
+import org.apache.celeborn.common.client.MasterClient
 import org.apache.celeborn.common.exception.CelebornException
-import org.apache.celeborn.common.haclient.RssHARetryClient
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, 
WorkerPartitionLocationInfo}
@@ -219,7 +219,7 @@ private[celeborn] class Worker(
   val shuffleCommitInfos =
     JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[Long, 
CommitInfo]]()
 
-  private val rssHARetryClient = new RssHARetryClient(rpcEnv, conf)
+  private val masterClient = new MasterClient(rpcEnv, conf)
 
   // (workerInfo -> last connect timeout timestamp)
   val unavailablePeers = JavaUtils.newConcurrentHashMap[WorkerInfo, Long]()
@@ -285,7 +285,7 @@ private[celeborn] class Worker(
     val resourceConsumption = workerInfo.updateThenGetUserResourceConsumption(
       storageManager.userResourceConsumptionSnapshot().asJava)
 
-    val response = rssHARetryClient.askSync[HeartbeatFromWorkerResponse](
+    val response = masterClient.askSync[HeartbeatFromWorkerResponse](
       HeartbeatFromWorker(
         host,
         rpcPort,
@@ -392,7 +392,7 @@ private[celeborn] class Worker(
       }
       memoryManager.close();
 
-      rssHARetryClient.close()
+      masterClient.close()
       replicateServer.close()
       fetchServer.close()
 
@@ -410,7 +410,7 @@ private[celeborn] class Worker(
     while (registerTimeout > 0) {
       val resp =
         try {
-          rssHARetryClient.askSync[PbRegisterWorkerResponse](
+          masterClient.askSync[PbRegisterWorkerResponse](
             RegisterWorker(
               host,
               rpcPort,
@@ -422,7 +422,7 @@ private[celeborn] class Worker(
               workerInfo.diskInfos.asScala.toMap,
               workerInfo.updateThenGetUserResourceConsumption(
                 
storageManager.userResourceConsumptionSnapshot().asJava).asScala.toMap,
-              RssHARetryClient.genRequestId()),
+              MasterClient.genRequestId()),
             classOf[PbRegisterWorkerResponse])
         } catch {
           case throwable: Throwable =>
@@ -532,18 +532,18 @@ private[celeborn] class Worker(
         // make master remove this worker from excluded list.
         try {
           if (gracefulShutdown) {
-            rssHARetryClient.askSync(
+            masterClient.askSync(
               ReportWorkerUnavailable(List(workerInfo).asJava),
               OneWayMessageResponse.getClass)
           } else {
-            rssHARetryClient.askSync[PbWorkerLostResponse](
+            masterClient.askSync[PbWorkerLostResponse](
               WorkerLost(
                 host,
                 rpcPort,
                 pushPort,
                 fetchPort,
                 replicatePort,
-                RssHARetryClient.genRequestId()),
+                MasterClient.genRequestId()),
               classOf[PbWorkerLostResponse])
           }
         } catch {

Reply via email to