This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new d150f233e [CELEBORN-748] Rename RssHARetryClient to MasterClient
d150f233e is described below
commit d150f233e79b02f924ae310adb103377e027e2cb
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jun 29 16:47:15 2023 +0800
[CELEBORN-748] Rename RssHARetryClient to MasterClient
### What changes were proposed in this pull request?
Rename RssHARetryClient to MasterClient
### Why are the changes needed?
Code refactor
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes #1661 from AngersZhuuuu/CELEBORN-748.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 1fd8833756721291d62db5151a8b9ccb91ea2c66)
Signed-off-by: Cheng Pan <[email protected]>
---
.../apache/celeborn/client/ShuffleClientImpl.java | 5 ++---
.../celeborn/client/ApplicationHeartbeater.scala | 8 ++++----
.../apache/celeborn/client/LifecycleManager.scala | 18 +++++++++---------
.../MasterClient.java} | 8 ++++----
.../MasterNotLeaderException.java | 2 +-
.../MasterClientSuiteJ.java} | 20 ++++++++++----------
.../deploy/master/clustermeta/ha/HAHelper.java | 2 +-
.../master/clustermeta/ha/HAMasterMetaManager.java | 4 ++--
.../deploy/master/clustermeta/ha/HARaftServer.java | 4 ++--
.../celeborn/service/deploy/master/Master.scala | 6 +++---
.../master/clustermeta/DefaultMetaSystemSuiteJ.java | 5 ++---
.../ha/RatisMasterStatusSystemSuiteJ.java | 5 ++---
.../celeborn/service/deploy/worker/Worker.scala | 18 +++++++++---------
13 files changed, 51 insertions(+), 54 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index c7c07a1ad..9927ea64a 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -37,8 +37,8 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.compress.Compressor;
import org.apache.celeborn.client.read.RssInputStream;
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornIOException;
-import org.apache.celeborn.common.haclient.RssHARetryClient;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.network.TransportContext;
import org.apache.celeborn.common.network.buffer.NettyManagedBuffer;
@@ -1496,8 +1496,7 @@ public class ShuffleClientImpl extends ShuffleClient {
if (isDriver) {
try {
driverRssMetaService.send(
- UnregisterShuffle$.MODULE$.apply(
- appUniqueId, shuffleId, RssHARetryClient.genRequestId()));
+ UnregisterShuffle$.MODULE$.apply(appUniqueId, shuffleId,
MasterClient.genRequestId()));
} catch (Exception e) {
// If some exceptions need to be ignored, they shouldn't be logged as
error-level,
// otherwise it will mislead users.
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index dd8a5d389..38dc2a47d 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import scala.concurrent.duration.DurationInt
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.haclient.RssHARetryClient
+import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.internal.Logging
import
org.apache.celeborn.common.protocol.message.ControlMessages.{HeartbeatFromApplication,
HeartbeatFromApplicationResponse, ZERO_UUID}
import org.apache.celeborn.common.protocol.message.StatusCode
@@ -32,7 +32,7 @@ import org.apache.celeborn.common.util.{ThreadUtils, Utils}
class ApplicationHeartbeater(
appId: String,
conf: CelebornConf,
- rssHARetryClient: RssHARetryClient,
+ masterClient: MasterClient,
shuffleMetrics: () => (Long, Long),
workerStatusTracker: WorkerStatusTracker) extends Logging {
@@ -47,7 +47,7 @@ class ApplicationHeartbeater(
new Runnable {
override def run(): Unit = {
try {
- require(rssHARetryClient != null, "When sending a heartbeat,
client shouldn't be null.")
+ require(masterClient != null, "When sending a heartbeat, client
shouldn't be null.")
val (tmpTotalWritten, tmpTotalFileCount) = shuffleMetrics()
logInfo("Send app heartbeat with " +
s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count:
$tmpTotalFileCount")
@@ -82,7 +82,7 @@ class ApplicationHeartbeater(
private def requestHeartbeat(message: HeartbeatFromApplication)
: HeartbeatFromApplicationResponse = {
try {
- rssHARetryClient.askSync[HeartbeatFromApplicationResponse](
+ masterClient.askSync[HeartbeatFromApplicationResponse](
message,
classOf[HeartbeatFromApplicationResponse])
} catch {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 3e43d7b72..06e1a3de8 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -30,7 +30,7 @@ import com.google.common.annotations.VisibleForTesting
import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers,
ShuffleFailedWorkers}
import org.apache.celeborn.client.listener.WorkerStatusListener
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.haclient.RssHARetryClient
+import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo,
WorkerInfo}
@@ -118,14 +118,14 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
logInfo(s"Starting LifecycleManager on ${rpcEnv.address}")
- private val rssHARetryClient = new RssHARetryClient(rpcEnv, conf)
+ private val masterClient = new MasterClient(rpcEnv, conf)
val commitManager = new CommitManager(appUniqueId, conf, this)
val workerStatusTracker = new WorkerStatusTracker(conf, this)
private val heartbeater =
new ApplicationHeartbeater(
appUniqueId,
conf,
- rssHARetryClient,
+ masterClient,
() => commitManager.commitMetrics(),
workerStatusTracker)
private val changePartitionManager = new ChangePartitionManager(conf, this)
@@ -168,7 +168,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
releasePartitionManager.stop()
heartbeater.stop()
- rssHARetryClient.close()
+ masterClient.close()
if (rpcEnv != null) {
rpcEnv.shutdown()
rpcEnv.awaitTermination()
@@ -1023,7 +1023,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
commitManager.removeExpiredShuffle(shuffleId)
changePartitionManager.removeExpiredShuffle(shuffleId)
val unregisterShuffleResponse = requestMasterUnregisterShuffle(
- UnregisterShuffle(appUniqueId, shuffleId,
RssHARetryClient.genRequestId()))
+ UnregisterShuffle(appUniqueId, shuffleId,
MasterClient.genRequestId()))
// if unregister shuffle not success, wait next turn
if (StatusCode.SUCCESS ==
Utils.toStatusCode(unregisterShuffleResponse.getStatus)) {
unregisterShuffleTime.remove(shuffleId)
@@ -1055,7 +1055,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
private def requestMasterRequestSlots(message: RequestSlots):
RequestSlotsResponse = {
val shuffleKey = Utils.makeShuffleKey(message.applicationId,
message.shuffleId)
try {
- rssHARetryClient.askSync[RequestSlotsResponse](message,
classOf[RequestSlotsResponse])
+ masterClient.askSync[RequestSlotsResponse](message,
classOf[RequestSlotsResponse])
} catch {
case e: Exception =>
logError(s"AskSync RegisterShuffle for $shuffleKey failed.", e)
@@ -1095,7 +1095,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
private def requestMasterReleaseSlots(message: ReleaseSlots):
ReleaseSlotsResponse = {
try {
- rssHARetryClient.askSync[ReleaseSlotsResponse](message,
classOf[ReleaseSlotsResponse])
+ masterClient.askSync[ReleaseSlotsResponse](message,
classOf[ReleaseSlotsResponse])
} catch {
case e: Exception =>
logError(s"AskSync ReleaseSlots for ${message.shuffleId} failed.", e)
@@ -1106,7 +1106,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
private def requestMasterUnregisterShuffle(message: PbUnregisterShuffle)
: PbUnregisterShuffleResponse = {
try {
- rssHARetryClient.askSync[PbUnregisterShuffleResponse](
+ masterClient.askSync[PbUnregisterShuffleResponse](
message,
classOf[PbUnregisterShuffleResponse])
} catch {
@@ -1118,7 +1118,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
def checkQuota(): CheckQuotaResponse = {
try {
- rssHARetryClient.askSync[CheckQuotaResponse](
+ masterClient.askSync[CheckQuotaResponse](
CheckQuota(userIdentifier),
classOf[CheckQuotaResponse])
} catch {
diff --git
a/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java
b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
similarity index 97%
rename from
common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java
rename to
common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
index 360de2af0..364255241 100644
---
a/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.celeborn.common.haclient;
+package org.apache.celeborn.common.client;
import java.io.IOException;
import java.util.UUID;
@@ -48,8 +48,8 @@ import org.apache.celeborn.common.rpc.RpcEnv;
import org.apache.celeborn.common.rpc.RpcTimeout;
import org.apache.celeborn.common.util.ThreadUtils;
-public class RssHARetryClient {
- private static final Logger LOG =
LoggerFactory.getLogger(RssHARetryClient.class);
+public class MasterClient {
+ private static final Logger LOG =
LoggerFactory.getLogger(MasterClient.class);
private final RpcEnv rpcEnv;
private final String[] masterEndpoints;
@@ -60,7 +60,7 @@ public class RssHARetryClient {
private final AtomicReference<RpcEndpointRef> rpcEndpointRef;
private final ExecutorService oneWayMessageSender;
- public RssHARetryClient(RpcEnv rpcEnv, CelebornConf conf) {
+ public MasterClient(RpcEnv rpcEnv, CelebornConf conf) {
this.rpcEnv = rpcEnv;
this.masterEndpoints = conf.masterEndpoints();
this.maxRetries = Math.max(masterEndpoints.length,
conf.masterClientMaxRetries());
diff --git
a/common/src/main/java/org/apache/celeborn/common/haclient/MasterNotLeaderException.java
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
similarity index 97%
rename from
common/src/main/java/org/apache/celeborn/common/haclient/MasterNotLeaderException.java
rename to
common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
index bbeeb1e95..10c3c24ea 100644
---
a/common/src/main/java/org/apache/celeborn/common/haclient/MasterNotLeaderException.java
+++
b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.celeborn.common.haclient;
+package org.apache.celeborn.common.client;
import java.io.IOException;
diff --git
a/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
b/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
similarity index 95%
rename from
common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
rename to
common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
index 165029b46..a902fd2a4 100644
---
a/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
+++
b/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.celeborn.common.haclient;
+package org.apache.celeborn.common.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -46,8 +46,8 @@ import org.apache.celeborn.common.rpc.RpcAddress;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.rpc.RpcEnv;
-public class RssHARetryClientSuiteJ {
- private static final Logger LOG =
LoggerFactory.getLogger(RssHARetryClientSuiteJ.class);
+public class MasterClientSuiteJ {
+ private static final Logger LOG =
LoggerFactory.getLogger(MasterClientSuiteJ.class);
private final String masterHost = "localhost";
private final int masterPort = 9097;
@@ -79,7 +79,7 @@ public class RssHARetryClientSuiteJ {
});
prepareForRpcEnvWithoutHA();
- RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+ MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromApplication message =
Mockito.mock(HeartbeatFromApplication.class);
try {
@@ -106,7 +106,7 @@ public class RssHARetryClientSuiteJ {
});
prepareForRpcEnvWithoutHA();
- RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+ MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromApplication message =
Mockito.mock(HeartbeatFromApplication.class);
try {
@@ -132,7 +132,7 @@ public class RssHARetryClientSuiteJ {
return Future$.MODULE$.successful(response);
});
- RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+ MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromApplication message =
Mockito.mock(HeartbeatFromApplication.class);
try {
@@ -152,7 +152,7 @@ public class RssHARetryClientSuiteJ {
prepareForEndpointRefWithoutRetry(() ->
Future$.MODULE$.successful(mockResponse));
prepareForRpcEnvWithoutHA();
- RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+ MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
HeartbeatFromWorkerResponse response = null;
@@ -174,7 +174,7 @@ public class RssHARetryClientSuiteJ {
prepareForEndpointRefWithRetry(numTries, () ->
Future$.MODULE$.successful(mockResponse));
prepareForRpcEnvWithoutHA();
- RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+ MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
HeartbeatFromWorkerResponse response = null;
@@ -195,7 +195,7 @@ public class RssHARetryClientSuiteJ {
prepareForRpcEnvWithHA(() -> Future$.MODULE$.successful(mockResponse));
- RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+ MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
HeartbeatFromWorkerResponse response = null;
@@ -254,7 +254,7 @@ public class RssHARetryClientSuiteJ {
.when(rpcEnv)
.setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString());
- RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
+ MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
HeartbeatFromWorkerResponse response = null;
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
index 570c09d53..527b5d557 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
@@ -26,8 +26,8 @@ import
org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorageUtil;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.celeborn.common.client.MasterNotLeaderException;
import org.apache.celeborn.common.exception.CelebornIOException;
-import org.apache.celeborn.common.haclient.MasterNotLeaderException;
import org.apache.celeborn.common.rpc.RpcCallContext;
import
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index ebf75b310..2a0cecf61 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -25,8 +25,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornRuntimeException;
-import org.apache.celeborn.common.haclient.RssHARetryClient;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.AppDiskUsageMetric;
import org.apache.celeborn.common.meta.DiskInfo;
@@ -318,7 +318,7 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
ratisServer.submitRequest(
ResourceRequest.newBuilder()
.setCmdType(Type.UpdatePartitionSize)
- .setRequestId(RssHARetryClient.genRequestId())
+ .setRequestId(MasterClient.genRequestId())
.build());
} catch (CelebornRuntimeException e) {
LOG.error("Handle update partition size failed!", e);
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
index cd89fca19..f20cfff16 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
@@ -51,8 +51,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornRuntimeException;
-import org.apache.celeborn.common.haclient.RssHARetryClient;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
import
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;
@@ -198,7 +198,7 @@ public class HARaftServer {
public ResourceResponse submitRequest(ResourceProtos.ResourceRequest request)
throws CelebornRuntimeException {
String requestId = request.getRequestId();
- Tuple2<String, Long> decoded = RssHARetryClient.decodeRequestId(requestId);
+ Tuple2<String, Long> decoded = MasterClient.decodeRequestId(requestId);
if (decoded == null) {
throw new CelebornRuntimeException(
"RequestId:" + requestId + " invalid, should be: uuid#callId.");
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 475229aa9..1c1c1908d 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
import scala.util.Random
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.haclient.RssHARetryClient
+import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo}
@@ -369,7 +369,7 @@ private[celeborn] class Master(
worker.pushPort,
worker.fetchPort,
worker.replicatePort,
- RssHARetryClient.genRequestId()))
+ MasterClient.genRequestId()))
}
ind += 1
}
@@ -384,7 +384,7 @@ private[celeborn] class Master(
statusSystem.appHeartbeatTime.keySet().asScala.foreach { key =>
if (statusSystem.appHeartbeatTime.get(key) < currentTime -
appHeartbeatTimeoutMs) {
logWarning(s"Application $key timeout, trigger applicationLost event.")
- val requestId = RssHARetryClient.genRequestId()
+ val requestId = MasterClient.genRequestId()
var res = self.askSync[ApplicationLostResponse](ApplicationLost(key,
requestId))
var retry = 1
while (res.status != StatusCode.SUCCESS && retry <= 3) {
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 1b6df8205..6d0ce6e21 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -30,7 +30,7 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.haclient.RssHARetryClient;
+import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
@@ -105,8 +105,7 @@ public class DefaultMetaSystemSuiteJ {
public void tearDown() throws Exception {}
private String getNewReqeustId() {
- return RssHARetryClient.encodeRequestId(
- UUID.randomUUID().toString(), callerId.incrementAndGet());
+ return MasterClient.encodeRequestId(UUID.randomUUID().toString(),
callerId.incrementAndGet());
}
@Test
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index f2ab80996..85bb1db61 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -29,8 +29,8 @@ import org.junit.*;
import org.mockito.Mockito;
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornRuntimeException;
-import org.apache.celeborn.common.haclient.RssHARetryClient;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
@@ -195,8 +195,7 @@ public class RatisMasterStatusSystemSuiteJ {
private static String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
private String getNewReqeustId() {
- return RssHARetryClient.encodeRequestId(
- UUID.randomUUID().toString(), callerId.incrementAndGet());
+ return MasterClient.encodeRequestId(UUID.randomUUID().toString(),
callerId.incrementAndGet());
}
public HAMasterMetaManager pickLeaderStatusSystem() {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index f6be07d6e..550adb05a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -30,8 +30,8 @@ import io.netty.util.HashedWheelTimer
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf._
+import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.exception.CelebornException
-import org.apache.celeborn.common.haclient.RssHARetryClient
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo,
WorkerPartitionLocationInfo}
@@ -219,7 +219,7 @@ private[celeborn] class Worker(
val shuffleCommitInfos =
JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[Long,
CommitInfo]]()
- private val rssHARetryClient = new RssHARetryClient(rpcEnv, conf)
+ private val masterClient = new MasterClient(rpcEnv, conf)
// (workerInfo -> last connect timeout timestamp)
val unavailablePeers = JavaUtils.newConcurrentHashMap[WorkerInfo, Long]()
@@ -285,7 +285,7 @@ private[celeborn] class Worker(
val resourceConsumption = workerInfo.updateThenGetUserResourceConsumption(
storageManager.userResourceConsumptionSnapshot().asJava)
- val response = rssHARetryClient.askSync[HeartbeatFromWorkerResponse](
+ val response = masterClient.askSync[HeartbeatFromWorkerResponse](
HeartbeatFromWorker(
host,
rpcPort,
@@ -392,7 +392,7 @@ private[celeborn] class Worker(
}
memoryManager.close();
- rssHARetryClient.close()
+ masterClient.close()
replicateServer.close()
fetchServer.close()
@@ -410,7 +410,7 @@ private[celeborn] class Worker(
while (registerTimeout > 0) {
val resp =
try {
- rssHARetryClient.askSync[PbRegisterWorkerResponse](
+ masterClient.askSync[PbRegisterWorkerResponse](
RegisterWorker(
host,
rpcPort,
@@ -422,7 +422,7 @@ private[celeborn] class Worker(
workerInfo.diskInfos.asScala.toMap,
workerInfo.updateThenGetUserResourceConsumption(
storageManager.userResourceConsumptionSnapshot().asJava).asScala.toMap,
- RssHARetryClient.genRequestId()),
+ MasterClient.genRequestId()),
classOf[PbRegisterWorkerResponse])
} catch {
case throwable: Throwable =>
@@ -532,18 +532,18 @@ private[celeborn] class Worker(
// make master remove this worker from excluded list.
try {
if (gracefulShutdown) {
- rssHARetryClient.askSync(
+ masterClient.askSync(
ReportWorkerUnavailable(List(workerInfo).asJava),
OneWayMessageResponse.getClass)
} else {
- rssHARetryClient.askSync[PbWorkerLostResponse](
+ masterClient.askSync[PbWorkerLostResponse](
WorkerLost(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
- RssHARetryClient.genRequestId()),
+ MasterClient.genRequestId()),
classOf[PbWorkerLostResponse])
}
} catch {