This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 7d0e25700 [CELEBORN-846][FOLLOWUP] Fix broken link caused by unknown
RPC
7d0e25700 is described below
commit 7d0e25700145c50557b9ccfa57ee7475ef1204e6
Author: mingji <[email protected]>
AuthorDate: Fri Aug 11 22:00:51 2023 +0800
[CELEBORN-846][FOLLOWUP] Fix broken link caused by unknown RPC
### What changes were proposed in this pull request?
Keep ReleaseSlots RPC to make sure that 0.3 client can worker with
0.3.1-SNAPSHOT and 0.4.0-SNAPSHOT.
This PR will need to merged into main and branch-0.3.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster.
Closes #1794 from FMX/CELEBORN-846-FOLLOWUP.
Lead-authored-by: mingji <[email protected]>
Co-authored-by: Ethan Feng <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
common/src/main/proto/TransportMessages.proto | 21 +++++++++-
.../common/protocol/message/ControlMessages.scala | 48 ++++++++++++++++++++++
.../celeborn/service/deploy/master/Master.scala | 4 ++
3 files changed, 71 insertions(+), 2 deletions(-)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index b59ab2c2a..87a51328a 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -25,8 +25,10 @@ enum MessageType {
REGISTER_SHUFFLE = 4;
REGISTER_SHUFFLE_RESPONSE = 5;
REQUEST_SLOTS = 6;
- // RELEASE_SLOTS = 7;
- // RELEASE_SLOTS_RESPONSE = 8;
+ // keep it for compatible with 0.3 client, will remove in 0.5
+ RELEASE_SLOTS = 7;
+ RELEASE_SLOTS_RESPONSE = 8;
+
REQUEST_SLOTS_RESPONSE = 9;
CHANGE_LOCATION = 10;
CHANGE_LOCATION_RESPONSE = 11;
@@ -192,6 +194,21 @@ message PbSlotInfo {
map<string, int32> slot = 1;
}
+// keep it for compatible reason
+message PbReleaseSlots {
+ string applicationId = 1;
+ int32 shuffleId = 2;
+ repeated string workerIds = 3;
+ repeated PbSlotInfo slots = 4;
+ string requestId = 6;
+}
+
+// keep it for compatible reason
+message PbReleaseSlotsResponse {
+ int32 status = 1;
+}
+
+
message PbRequestSlotsResponse {
int32 status = 1;
map<string, PbWorkerResource> workerResource = 2;
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index c9c6a2f49..d376832a3 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -170,6 +170,21 @@ object ControlMessages extends Logging {
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage
+ // Keep it for compatible reason
+ @deprecated
+ case class ReleaseSlots(
+ applicationId: String,
+ shuffleId: Int,
+ workerIds: util.List[String],
+ slots: util.List[util.Map[String, Integer]],
+ override var requestId: String = ZERO_UUID)
+ extends MasterRequestMessage
+
+ // Keep it for compatible reason
+ @deprecated
+ case class ReleaseSlotsResponse(status: StatusCode)
+ extends MasterMessage
+
case class RequestSlotsResponse(
status: StatusCode,
workerResource: WorkerResource)
@@ -487,6 +502,23 @@ object ControlMessages extends Logging {
.build().toByteArray
new TransportMessage(MessageType.REQUEST_SLOTS, payload)
+ case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) =>
+ val pbSlots = slots.asScala.map(slot =>
+ PbSlotInfo.newBuilder().putAllSlot(slot).build()).toList
+ val payload = PbReleaseSlots.newBuilder()
+ .setApplicationId(applicationId)
+ .setShuffleId(shuffleId)
+ .setRequestId(requestId)
+ .addAllWorkerIds(workerIds)
+ .addAllSlots(pbSlots.asJava)
+ .build().toByteArray
+ new TransportMessage(MessageType.RELEASE_SLOTS, payload)
+
+ case ReleaseSlotsResponse(status) =>
+ val payload = PbReleaseSlotsResponse.newBuilder()
+ .setStatus(status.getValue).build().toByteArray
+ new TransportMessage(MessageType.RELEASE_SLOTS_RESPONSE, payload)
+
case RequestSlotsResponse(status, workerResource) =>
val builder = PbRequestSlotsResponse.newBuilder()
.setStatus(status.getValue)
@@ -746,6 +778,22 @@ object ControlMessages extends Logging {
logError(msg)
throw new UnsupportedOperationException(msg)
+ // keep it for compatible reason
+ case RELEASE_SLOTS_VALUE =>
+ val pbReleaseSlots = PbReleaseSlots.parseFrom(message.getPayload)
+ val slotsList = pbReleaseSlots.getSlotsList.asScala.map(pbSlot =>
+ new util.HashMap[String, Integer](pbSlot.getSlotMap)).toList.asJava
+ ReleaseSlots(
+ pbReleaseSlots.getApplicationId,
+ pbReleaseSlots.getShuffleId,
+ new util.ArrayList[String](pbReleaseSlots.getWorkerIdsList),
+ new util.ArrayList[util.Map[String, Integer]](slotsList),
+ pbReleaseSlots.getRequestId)
+
+ case RELEASE_SLOTS_RESPONSE_VALUE =>
+ val pbReleaseSlotsResponse =
PbReleaseSlotsResponse.parseFrom(message.getPayload)
+
ReleaseSlotsResponse(Utils.toStatusCode(pbReleaseSlotsResponse.getStatus))
+
case REGISTER_WORKER_VALUE =>
PbRegisterWorker.parseFrom(message.getPayload)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index d5ce62827..466e088d1 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -303,6 +303,10 @@ private[celeborn] class Master(
userResourceConsumption,
requestId))
+ case ReleaseSlots(_, _, _, _, _) =>
+ // keep it for compatible reason
+ context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))
+
case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) =>
logTrace(s"Received RequestSlots request $requestSlots.")
executeWithLeaderChecker(context, handleRequestSlots(context,
requestSlots))