This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 616898bf1 [CELEBORN-739] Rename HeartbeatResponse to 
HeartbeatFromWorkerResponse
616898bf1 is described below

commit 616898bf1ff70081e5dc3070fbf366ff7e913af3
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jun 29 13:08:08 2023 +0800

    [CELEBORN-739] Rename HeartbeatResponse to HeartbeatFromWorkerResponse
    
    ### What changes were proposed in this pull request?
    Rename HeartbeatResponse to HeartbeatFromWorkerResponse
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1651 from AngersZhuuuu/CELEBORN-739.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 1a53db22cec1a1b393949898857d2301a9dbadb8)
    Signed-off-by: Cheng Pan <[email protected]>
---
 common/src/main/proto/TransportMessages.proto       |  2 +-
 .../common/protocol/message/ControlMessages.scala   | 15 ++++++++-------
 .../common/haclient/RssHARetryClientSuiteJ.java     | 21 +++++++++++----------
 .../celeborn/service/deploy/master/Master.scala     |  2 +-
 .../celeborn/service/deploy/worker/Worker.scala     |  4 ++--
 5 files changed, 23 insertions(+), 21 deletions(-)

diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index 4b1137919..4cda0995e 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -148,7 +148,7 @@ message PbHeartbeatFromWorker {
   map<string, int64> estimatedAppDiskUsage = 10;
 }
 
-message PbHeartbeatResponse {
+message PbHeartbeatFromWorkerResponse {
   repeated string expiredShuffleKeys = 1;
   bool registered = 2;
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 43034aada..5e5b785a8 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -114,7 +114,7 @@ object ControlMessages extends Logging {
       estimatedAppDiskUsage: util.HashMap[String, java.lang.Long],
       override var requestId: String = ZERO_UUID) extends MasterRequestMessage
 
-  case class HeartbeatResponse(
+  case class HeartbeatFromWorkerResponse(
       expiredShuffleKeys: util.HashSet[String],
       registered: Boolean) extends MasterMessage
 
@@ -461,8 +461,8 @@ object ControlMessages extends Logging {
         .build().toByteArray
       new TransportMessage(MessageType.HEARTBEAT_FROM_WORKER, payload)
 
-    case HeartbeatResponse(expiredShuffleKeys, registered) =>
-      val payload = PbHeartbeatResponse.newBuilder()
+    case HeartbeatFromWorkerResponse(expiredShuffleKeys, registered) =>
+      val payload = PbHeartbeatFromWorkerResponse.newBuilder()
         .addAllExpiredShuffleKeys(expiredShuffleKeys)
         .setRegistered(registered)
         .build().toByteArray
@@ -816,12 +816,13 @@ object ControlMessages extends Logging {
           pbHeartbeatFromWorker.getRequestId)
 
       case HEARTBEAT_RESPONSE =>
-        val pbHeartbeatResponse = 
PbHeartbeatResponse.parseFrom(message.getPayload)
+        val pbHeartbeatFromWorkerResponse =
+          PbHeartbeatFromWorkerResponse.parseFrom(message.getPayload)
         val expiredShuffleKeys = new util.HashSet[String]()
-        if (pbHeartbeatResponse.getExpiredShuffleKeysCount > 0) {
-          
expiredShuffleKeys.addAll(pbHeartbeatResponse.getExpiredShuffleKeysList)
+        if (pbHeartbeatFromWorkerResponse.getExpiredShuffleKeysCount > 0) {
+          
expiredShuffleKeys.addAll(pbHeartbeatFromWorkerResponse.getExpiredShuffleKeysList)
         }
-        HeartbeatResponse(expiredShuffleKeys, 
pbHeartbeatResponse.getRegistered)
+        HeartbeatFromWorkerResponse(expiredShuffleKeys, 
pbHeartbeatFromWorkerResponse.getRegistered)
 
       case REGISTER_SHUFFLE =>
         PbRegisterShuffle.parseFrom(message.getPayload)
diff --git 
a/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
 
b/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
index 18feddc5f..165029b46 100644
--- 
a/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
+++ 
b/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java
@@ -40,7 +40,7 @@ import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.exception.CelebornException;
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplication;
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromWorker;
-import 
org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatResponse;
+import 
org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromWorkerResponse;
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.OneWayMessageResponse$;
 import org.apache.celeborn.common.rpc.RpcAddress;
 import org.apache.celeborn.common.rpc.RpcEndpointRef;
@@ -53,7 +53,8 @@ public class RssHARetryClientSuiteJ {
   private final int masterPort = 9097;
   private final CelebornConf conf = new CelebornConf(false);
   private final OneWayMessageResponse$ response = 
OneWayMessageResponse$.MODULE$;
-  private final HeartbeatResponse mockResponse = 
Mockito.mock(HeartbeatResponse.class);
+  private final HeartbeatFromWorkerResponse mockResponse =
+      Mockito.mock(HeartbeatFromWorkerResponse.class);
 
   private RpcEnv rpcEnv = null;
   private RpcEndpointRef endpointRef = null;
@@ -154,9 +155,9 @@ public class RssHARetryClientSuiteJ {
     RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
     HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
 
-    HeartbeatResponse response = null;
+    HeartbeatFromWorkerResponse response = null;
     try {
-      response = client.askSync(message, HeartbeatResponse.class);
+      response = client.askSync(message, HeartbeatFromWorkerResponse.class);
     } catch (Throwable t) {
       LOG.error("It should be no exceptions when sending one-way message.", t);
       fail("It should be no exceptions when sending one-way message.");
@@ -176,9 +177,9 @@ public class RssHARetryClientSuiteJ {
     RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
     HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
 
-    HeartbeatResponse response = null;
+    HeartbeatFromWorkerResponse response = null;
     try {
-      response = client.askSync(message, HeartbeatResponse.class);
+      response = client.askSync(message, HeartbeatFromWorkerResponse.class);
     } catch (Throwable t) {
       t.printStackTrace();
       LOG.error("It should be no exceptions when sending one-way message.", t);
@@ -197,9 +198,9 @@ public class RssHARetryClientSuiteJ {
     RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
     HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
 
-    HeartbeatResponse response = null;
+    HeartbeatFromWorkerResponse response = null;
     try {
-      response = client.askSync(message, HeartbeatResponse.class);
+      response = client.askSync(message, HeartbeatFromWorkerResponse.class);
     } catch (Throwable t) {
       LOG.error("It should be no exceptions when sending one-way message.", t);
       fail("It should be no exceptions when sending one-way message.");
@@ -256,9 +257,9 @@ public class RssHARetryClientSuiteJ {
     RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
     HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
 
-    HeartbeatResponse response = null;
+    HeartbeatFromWorkerResponse response = null;
     try {
-      response = client.askSync(message, HeartbeatResponse.class);
+      response = client.askSync(message, HeartbeatFromWorkerResponse.class);
     } catch (Throwable t) {
       LOG.error("It should be no exceptions when sending one-way message.", t);
       fail("It should be no exceptions when sending one-way message.");
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index eda5d6022..475229aa9 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -437,7 +437,7 @@ private[celeborn] class Master(
         expiredShuffleKeys.add(shuffleKey)
       }
     }
-    context.reply(HeartbeatResponse(expiredShuffleKeys, registered))
+    context.reply(HeartbeatFromWorkerResponse(expiredShuffleKeys, registered))
   }
 
   private def handleWorkerLost(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index f6775de28..f6be07d6e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -285,7 +285,7 @@ private[celeborn] class Worker(
     val resourceConsumption = workerInfo.updateThenGetUserResourceConsumption(
       storageManager.userResourceConsumptionSnapshot().asJava)
 
-    val response = rssHARetryClient.askSync[HeartbeatResponse](
+    val response = rssHARetryClient.askSync[HeartbeatFromWorkerResponse](
       HeartbeatFromWorker(
         host,
         rpcPort,
@@ -296,7 +296,7 @@ private[celeborn] class Worker(
         resourceConsumption,
         activeShuffleKeys,
         estimatedAppDiskUsage),
-      classOf[HeartbeatResponse])
+      classOf[HeartbeatFromWorkerResponse])
     response.expiredShuffleKeys.asScala.foreach(shuffleKey => 
workerInfo.releaseSlots(shuffleKey))
     cleanTaskQueue.put(response.expiredShuffleKeys)
     if (!response.registered) {

Reply via email to