This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d0e25700 [CELEBORN-846][FOLLOWUP] Fix broken link caused by unknown 
RPC
7d0e25700 is described below

commit 7d0e25700145c50557b9ccfa57ee7475ef1204e6
Author: mingji <[email protected]>
AuthorDate: Fri Aug 11 22:00:51 2023 +0800

    [CELEBORN-846][FOLLOWUP] Fix broken link caused by unknown RPC
    
    ### What changes were proposed in this pull request?
    Keep ReleaseSlots RPC to make sure that 0.3 client can worker with 
0.3.1-SNAPSHOT and 0.4.0-SNAPSHOT.
    This PR will need to merged into main and branch-0.3.
    
    ### Why are the changes needed?
    Ditto.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA and cluster.
    
    Closes #1794 from FMX/CELEBORN-846-FOLLOWUP.
    
    Lead-authored-by: mingji <[email protected]>
    Co-authored-by: Ethan Feng <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 common/src/main/proto/TransportMessages.proto      | 21 +++++++++-
 .../common/protocol/message/ControlMessages.scala  | 48 ++++++++++++++++++++++
 .../celeborn/service/deploy/master/Master.scala    |  4 ++
 3 files changed, 71 insertions(+), 2 deletions(-)

diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index b59ab2c2a..87a51328a 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -25,8 +25,10 @@ enum MessageType {
   REGISTER_SHUFFLE = 4;
   REGISTER_SHUFFLE_RESPONSE = 5;
   REQUEST_SLOTS = 6;
-  //  RELEASE_SLOTS = 7;
-  //  RELEASE_SLOTS_RESPONSE = 8;
+  // keep it for compatible with 0.3 client, will remove in 0.5
+  RELEASE_SLOTS = 7;
+  RELEASE_SLOTS_RESPONSE = 8;
+
   REQUEST_SLOTS_RESPONSE = 9;
   CHANGE_LOCATION = 10;
   CHANGE_LOCATION_RESPONSE = 11;
@@ -192,6 +194,21 @@ message PbSlotInfo {
   map<string, int32> slot = 1;
 }
 
+// keep it for compatible reason
+message PbReleaseSlots {
+  string applicationId = 1;
+  int32 shuffleId = 2;
+  repeated string workerIds = 3;
+  repeated PbSlotInfo slots = 4;
+  string requestId = 6;
+}
+
+// keep it for compatible reason
+message PbReleaseSlotsResponse {
+  int32 status = 1;
+}
+
+
 message PbRequestSlotsResponse {
   int32 status = 1;
   map<string, PbWorkerResource> workerResource = 2;
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index c9c6a2f49..d376832a3 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -170,6 +170,21 @@ object ControlMessages extends Logging {
       override var requestId: String = ZERO_UUID)
     extends MasterRequestMessage
 
+  // Keep it for compatible reason
+  @deprecated
+  case class ReleaseSlots(
+      applicationId: String,
+      shuffleId: Int,
+      workerIds: util.List[String],
+      slots: util.List[util.Map[String, Integer]],
+      override var requestId: String = ZERO_UUID)
+    extends MasterRequestMessage
+
+  // Keep it for compatible reason
+  @deprecated
+  case class ReleaseSlotsResponse(status: StatusCode)
+    extends MasterMessage
+
   case class RequestSlotsResponse(
       status: StatusCode,
       workerResource: WorkerResource)
@@ -487,6 +502,23 @@ object ControlMessages extends Logging {
         .build().toByteArray
       new TransportMessage(MessageType.REQUEST_SLOTS, payload)
 
+    case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) =>
+      val pbSlots = slots.asScala.map(slot =>
+        PbSlotInfo.newBuilder().putAllSlot(slot).build()).toList
+      val payload = PbReleaseSlots.newBuilder()
+        .setApplicationId(applicationId)
+        .setShuffleId(shuffleId)
+        .setRequestId(requestId)
+        .addAllWorkerIds(workerIds)
+        .addAllSlots(pbSlots.asJava)
+        .build().toByteArray
+      new TransportMessage(MessageType.RELEASE_SLOTS, payload)
+
+    case ReleaseSlotsResponse(status) =>
+      val payload = PbReleaseSlotsResponse.newBuilder()
+        .setStatus(status.getValue).build().toByteArray
+      new TransportMessage(MessageType.RELEASE_SLOTS_RESPONSE, payload)
+
     case RequestSlotsResponse(status, workerResource) =>
       val builder = PbRequestSlotsResponse.newBuilder()
         .setStatus(status.getValue)
@@ -746,6 +778,22 @@ object ControlMessages extends Logging {
         logError(msg)
         throw new UnsupportedOperationException(msg)
 
+      // keep it for compatible reason
+      case RELEASE_SLOTS_VALUE =>
+        val pbReleaseSlots = PbReleaseSlots.parseFrom(message.getPayload)
+        val slotsList = pbReleaseSlots.getSlotsList.asScala.map(pbSlot =>
+          new util.HashMap[String, Integer](pbSlot.getSlotMap)).toList.asJava
+        ReleaseSlots(
+          pbReleaseSlots.getApplicationId,
+          pbReleaseSlots.getShuffleId,
+          new util.ArrayList[String](pbReleaseSlots.getWorkerIdsList),
+          new util.ArrayList[util.Map[String, Integer]](slotsList),
+          pbReleaseSlots.getRequestId)
+
+      case RELEASE_SLOTS_RESPONSE_VALUE =>
+        val pbReleaseSlotsResponse = 
PbReleaseSlotsResponse.parseFrom(message.getPayload)
+        
ReleaseSlotsResponse(Utils.toStatusCode(pbReleaseSlotsResponse.getStatus))
+
       case REGISTER_WORKER_VALUE =>
         PbRegisterWorker.parseFrom(message.getPayload)
 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index d5ce62827..466e088d1 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -303,6 +303,10 @@ private[celeborn] class Master(
           userResourceConsumption,
           requestId))
 
+    case ReleaseSlots(_, _, _, _, _) =>
+      // keep it for compatible reason
+      context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))
+
     case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) =>
       logTrace(s"Received RequestSlots request $requestSlots.")
       executeWithLeaderChecker(context, handleRequestSlots(context, 
requestSlots))

Reply via email to