This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 089bc1901 [CELEBORN-846][FOLLOWUP] Fix broken link caused by unknown
RPC
089bc1901 is described below
commit 089bc1901a1cc534db3bf247ccd7f6c51526c319
Author: mingji <[email protected]>
AuthorDate: Fri Aug 11 22:00:51 2023 +0800
[CELEBORN-846][FOLLOWUP] Fix broken link caused by unknown RPC
### What changes were proposed in this pull request?
Keep ReleaseSlots RPC to make sure that 0.3 client can worker with
0.3.1-SNAPSHOT and 0.4.0-SNAPSHOT.
This PR will need to merged into main and branch-0.3.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster.
Closes #1794 from FMX/CELEBORN-846-FOLLOWUP.
Lead-authored-by: mingji <[email protected]>
Co-authored-by: Ethan Feng <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 7d0e25700145c50557b9ccfa57ee7475ef1204e6)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
common/src/main/proto/TransportMessages.proto | 21 +++++++++-
.../common/protocol/message/ControlMessages.scala | 48 ++++++++++++++++++++++
.../celeborn/service/deploy/master/Master.scala | 4 ++
3 files changed, 71 insertions(+), 2 deletions(-)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 075bde8c7..d2b104670 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -25,8 +25,10 @@ enum MessageType {
REGISTER_SHUFFLE = 4;
REGISTER_SHUFFLE_RESPONSE = 5;
REQUEST_SLOTS = 6;
- // RELEASE_SLOTS = 7;
- // RELEASE_SLOTS_RESPONSE = 8;
+ // keep it for compatible with 0.3 client, will remove in 0.5
+ RELEASE_SLOTS = 7;
+ RELEASE_SLOTS_RESPONSE = 8;
+
REQUEST_SLOTS_RESPONSE = 9;
REVIVE = 10;
CHANGE_LOCATION_RESPONSE = 11;
@@ -192,6 +194,21 @@ message PbSlotInfo {
map<string, int32> slot = 1;
}
+// keep it for compatible reason
+message PbReleaseSlots {
+ string applicationId = 1;
+ int32 shuffleId = 2;
+ repeated string workerIds = 3;
+ repeated PbSlotInfo slots = 4;
+ string requestId = 6;
+}
+
+// keep it for compatible reason
+message PbReleaseSlotsResponse {
+ int32 status = 1;
+}
+
+
message PbRequestSlotsResponse {
int32 status = 1;
map<string, PbWorkerResource> workerResource = 2;
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 9607b98c9..c358e17e2 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -170,6 +170,21 @@ object ControlMessages extends Logging {
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage
+ // Keep it for compatible reason
+ @deprecated
+ case class ReleaseSlots(
+ applicationId: String,
+ shuffleId: Int,
+ workerIds: util.List[String],
+ slots: util.List[util.Map[String, Integer]],
+ override var requestId: String = ZERO_UUID)
+ extends MasterRequestMessage
+
+ // Keep it for compatible reason
+ @deprecated
+ case class ReleaseSlotsResponse(status: StatusCode)
+ extends MasterMessage
+
case class RequestSlotsResponse(
status: StatusCode,
workerResource: WorkerResource)
@@ -494,6 +509,23 @@ object ControlMessages extends Logging {
.build().toByteArray
new TransportMessage(MessageType.REQUEST_SLOTS, payload)
+ case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) =>
+ val pbSlots = slots.asScala.map(slot =>
+ PbSlotInfo.newBuilder().putAllSlot(slot).build()).toList
+ val payload = PbReleaseSlots.newBuilder()
+ .setApplicationId(applicationId)
+ .setShuffleId(shuffleId)
+ .setRequestId(requestId)
+ .addAllWorkerIds(workerIds)
+ .addAllSlots(pbSlots.asJava)
+ .build().toByteArray
+ new TransportMessage(MessageType.RELEASE_SLOTS, payload)
+
+ case ReleaseSlotsResponse(status) =>
+ val payload = PbReleaseSlotsResponse.newBuilder()
+ .setStatus(status.getValue).build().toByteArray
+ new TransportMessage(MessageType.RELEASE_SLOTS_RESPONSE, payload)
+
case RequestSlotsResponse(status, workerResource) =>
val builder = PbRequestSlotsResponse.newBuilder()
.setStatus(status.getValue)
@@ -772,6 +804,22 @@ object ControlMessages extends Logging {
logError(msg)
throw new UnsupportedOperationException(msg)
+ // keep it for compatible reason
+ case RELEASE_SLOTS_VALUE =>
+ val pbReleaseSlots = PbReleaseSlots.parseFrom(message.getPayload)
+ val slotsList = pbReleaseSlots.getSlotsList.asScala.map(pbSlot =>
+ new util.HashMap[String, Integer](pbSlot.getSlotMap)).toList.asJava
+ ReleaseSlots(
+ pbReleaseSlots.getApplicationId,
+ pbReleaseSlots.getShuffleId,
+ new util.ArrayList[String](pbReleaseSlots.getWorkerIdsList),
+ new util.ArrayList[util.Map[String, Integer]](slotsList),
+ pbReleaseSlots.getRequestId)
+
+ case RELEASE_SLOTS_RESPONSE_VALUE =>
+ val pbReleaseSlotsResponse =
PbReleaseSlotsResponse.parseFrom(message.getPayload)
+
ReleaseSlotsResponse(Utils.toStatusCode(pbReleaseSlotsResponse.getStatus))
+
case REGISTER_WORKER_VALUE =>
PbRegisterWorker.parseFrom(message.getPayload)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 865787f10..7ea767b61 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -303,6 +303,10 @@ private[celeborn] class Master(
userResourceConsumption,
requestId))
+ case ReleaseSlots(_, _, _, _, _) =>
+ // keep it for compatible reason
+ context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))
+
case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) =>
logTrace(s"Received RequestSlots request $requestSlots.")
executeWithLeaderChecker(context, handleRequestSlots(context,
requestSlots))