This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1a53db22c [CELEBORN-739] Rename HeartbeatResponse to
HeartbeatFromWorkerResponse
1a53db22c is described below
commit 1a53db22cec1a1b393949898857d2301a9dbadb8
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jun 29 13:08:08 2023 +0800
[CELEBORN-739] Rename HeartbeatResponse to HeartbeatFromWorkerResponse
### What changes were proposed in this pull request?
Rename HeartbeatResponse to HeartbeatFromWorkerResponse
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1651 from AngersZhuuuu/CELEBORN-739.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
common/src/main/proto/TransportMessages.proto | 2 +-
.../common/protocol/message/ControlMessages.scala | 15 ++++++++-------
.../common/haclient/RssHARetryClientSuiteJ.java | 21 +++++++++++----------
.../celeborn/service/deploy/master/Master.scala | 2 +-
.../celeborn/service/deploy/worker/Worker.scala | 4 ++--
5 files changed, 23 insertions(+), 21 deletions(-)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 4b1137919..4cda0995e 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -148,7 +148,7 @@ message PbHeartbeatFromWorker {
map<string, int64> estimatedAppDiskUsage = 10;
}
-message PbHeartbeatResponse {
+message PbHeartbeatFromWorkerResponse {
repeated string expiredShuffleKeys = 1;
bool registered = 2;
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 43034aada..5e5b785a8 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -114,7 +114,7 @@ object ControlMessages extends Logging {
estimatedAppDiskUsage: util.HashMap[String, java.lang.Long],
override var requestId: String = ZERO_UUID) extends MasterRequestMessage
- case class HeartbeatResponse(
+ case class HeartbeatFromWorkerResponse(
expiredShuffleKeys: util.HashSet[String],
registered: Boolean) extends MasterMessage
@@ -461,8 +461,8 @@ object ControlMessages extends Logging {
.build().toByteArray
new TransportMessage(MessageType.HEARTBEAT_FROM_WORKER, payload)
- case HeartbeatResponse(expiredShuffleKeys, registered) =>
- val payload = PbHeartbeatResponse.newBuilder()
+ case HeartbeatFromWorkerResponse(expiredShuffleKeys, registered) =>
+ val payload = PbHeartbeatFromWorkerResponse.newBuilder()
.addAllExpiredShuffleKeys(expiredShuffleKeys)
.setRegistered(registered)
.build().toByteArray
@@ -816,12 +816,13 @@ object ControlMessages extends Logging {
pbHeartbeatFromWorker.getRequestId)
case HEARTBEAT_RESPONSE =>
- val pbHeartbeatResponse =
PbHeartbeatResponse.parseFrom(message.getPayload)
+ val pbHeartbeatFromWorkerResponse =
+ PbHeartbeatFromWorkerResponse.parseFrom(message.getPayload)
val expiredShuffleKeys = new util.HashSet[String]()
- if (pbHeartbeatResponse.getExpiredShuffleKeysCount > 0) {
-
expiredShuffleKeys.addAll(pbHeartbeatResponse.getExpiredShuffleKeysList)
+ if (pbHeartbeatFromWorkerResponse.getExpiredShuffleKeysCount > 0) {
+
expiredShuffleKeys.addAll(pbHeartbeatFromWorkerResponse.getExpiredShuffleKeysList)
}
- HeartbeatResponse(expiredShuffleKeys,
pbHeartbeatResponse.getRegistered)
+ HeartbeatFromWorkerResponse(expiredShuffleKeys,
pbHeartbeatFromWorkerResponse.getRegistered)
case REGISTER_SHUFFLE =>
PbRegisterShuffle.parseFrom(message.getPayload)
diff --git
a/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
b/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
index 18feddc5f..165029b46 100644
---
a/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
+++
b/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
@@ -40,7 +40,7 @@ import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.exception.CelebornException;
import
org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplication;
import
org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromWorker;
-import
org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatResponse;
+import
org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromWorkerResponse;
import
org.apache.celeborn.common.protocol.message.ControlMessages.OneWayMessageResponse$;
import org.apache.celeborn.common.rpc.RpcAddress;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
@@ -53,7 +53,8 @@ public class RssHARetryClientSuiteJ {
private final int masterPort = 9097;
private final CelebornConf conf = new CelebornConf(false);
private final OneWayMessageResponse$ response =
OneWayMessageResponse$.MODULE$;
- private final HeartbeatResponse mockResponse =
Mockito.mock(HeartbeatResponse.class);
+ private final HeartbeatFromWorkerResponse mockResponse =
+ Mockito.mock(HeartbeatFromWorkerResponse.class);
private RpcEnv rpcEnv = null;
private RpcEndpointRef endpointRef = null;
@@ -154,9 +155,9 @@ public class RssHARetryClientSuiteJ {
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
- HeartbeatResponse response = null;
+ HeartbeatFromWorkerResponse response = null;
try {
- response = client.askSync(message, HeartbeatResponse.class);
+ response = client.askSync(message, HeartbeatFromWorkerResponse.class);
} catch (Throwable t) {
LOG.error("It should be no exceptions when sending one-way message.", t);
fail("It should be no exceptions when sending one-way message.");
@@ -176,9 +177,9 @@ public class RssHARetryClientSuiteJ {
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
- HeartbeatResponse response = null;
+ HeartbeatFromWorkerResponse response = null;
try {
- response = client.askSync(message, HeartbeatResponse.class);
+ response = client.askSync(message, HeartbeatFromWorkerResponse.class);
} catch (Throwable t) {
t.printStackTrace();
LOG.error("It should be no exceptions when sending one-way message.", t);
@@ -197,9 +198,9 @@ public class RssHARetryClientSuiteJ {
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
- HeartbeatResponse response = null;
+ HeartbeatFromWorkerResponse response = null;
try {
- response = client.askSync(message, HeartbeatResponse.class);
+ response = client.askSync(message, HeartbeatFromWorkerResponse.class);
} catch (Throwable t) {
LOG.error("It should be no exceptions when sending one-way message.", t);
fail("It should be no exceptions when sending one-way message.");
@@ -256,9 +257,9 @@ public class RssHARetryClientSuiteJ {
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
- HeartbeatResponse response = null;
+ HeartbeatFromWorkerResponse response = null;
try {
- response = client.askSync(message, HeartbeatResponse.class);
+ response = client.askSync(message, HeartbeatFromWorkerResponse.class);
} catch (Throwable t) {
LOG.error("It should be no exceptions when sending one-way message.", t);
fail("It should be no exceptions when sending one-way message.");
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index eda5d6022..475229aa9 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -437,7 +437,7 @@ private[celeborn] class Master(
expiredShuffleKeys.add(shuffleKey)
}
}
- context.reply(HeartbeatResponse(expiredShuffleKeys, registered))
+ context.reply(HeartbeatFromWorkerResponse(expiredShuffleKeys, registered))
}
private def handleWorkerLost(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index f6775de28..f6be07d6e 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -285,7 +285,7 @@ private[celeborn] class Worker(
val resourceConsumption = workerInfo.updateThenGetUserResourceConsumption(
storageManager.userResourceConsumptionSnapshot().asJava)
- val response = rssHARetryClient.askSync[HeartbeatResponse](
+ val response = rssHARetryClient.askSync[HeartbeatFromWorkerResponse](
HeartbeatFromWorker(
host,
rpcPort,
@@ -296,7 +296,7 @@ private[celeborn] class Worker(
resourceConsumption,
activeShuffleKeys,
estimatedAppDiskUsage),
- classOf[HeartbeatResponse])
+ classOf[HeartbeatFromWorkerResponse])
response.expiredShuffleKeys.asScala.foreach(shuffleKey =>
workerInfo.releaseSlots(shuffleKey))
cleanTaskQueue.put(response.expiredShuffleKeys)
if (!response.registered) {