This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e82a8e899 [CELEBORN-846] Remove unused updateReleaseSlotsMeta in
master side
e82a8e899 is described below
commit e82a8e8992da26c02391090b9150faf6de922371
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jul 27 17:46:00 2023 +0800
[CELEBORN-846] Remove unused updateReleaseSlotsMeta in master side
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
CELEBORN-791 removed sending the ReleaseSlotsRequest from worker, so Master
is not required to handle it.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1767 from AngersZhuuuu/CELEBORN-846.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
---
.../apache/celeborn/client/LifecycleManager.scala | 40 --------------------
common/src/main/proto/TransportMessages.proto | 16 +-------
.../common/protocol/message/ControlMessages.scala | 43 ----------------------
.../master/clustermeta/AbstractMetaManager.java | 23 ------------
.../master/clustermeta/IMetadataHandler.java | 6 ---
.../clustermeta/SingleMasterMetaManager.java | 9 -----
.../master/clustermeta/ha/HAMasterMetaManager.java | 29 ---------------
.../celeborn/service/deploy/master/Master.scala | 18 ---------
.../clustermeta/DefaultMetaSystemSuiteJ.java | 2 -
.../ha/RatisMasterStatusSystemSuiteJ.java | 1 -
10 files changed, 2 insertions(+), 185 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index bc11519fd..00c2c4847 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -456,9 +456,6 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
if (!reserveSlotsSuccess) {
logError(s"reserve buffer for $shuffleId failed, reply to all.")
reply(RegisterShuffleResponse(StatusCode.RESERVE_SLOTS_FAILED,
Array.empty))
- // tell Primary to release slots
- requestPrimaryReleaseSlots(
- ReleaseSlots(appUniqueId, shuffleId, List.empty.asJava,
List.empty.asJava))
} else {
logInfo(s"ReserveSlots for $shuffleId success!")
logDebug(s"Allocated Slots: $slots")
@@ -576,8 +573,6 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
partitionLocationInfo.removeAllPrimaryPartitions()
partitionLocationInfo.removeAllReplicaPartitions()
}
- requestPrimaryReleaseSlots(
- ReleaseSlots(appUniqueId, shuffleId, List.empty.asJava,
List.empty.asJava))
}
}
@@ -628,13 +623,6 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
}
}
- if (shuffleResourceExists(shuffleId)) {
- logWarning(s"Partition exists for shuffle $shuffleId, " +
- "maybe caused by task rerun or speculative.")
- requestPrimaryReleaseSlots(
- ReleaseSlots(appUniqueId, shuffleId, List.empty.asJava,
List.empty.asJava))
- }
-
// add shuffleKey to delay shuffle removal set
unregisterShuffleTime.put(shuffleId, System.currentTimeMillis())
@@ -762,19 +750,6 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
destroySlotsWithRetry(shuffleId, destroyResource)
logInfo(s"Destroyed peer partitions for reserve buffer failed workers " +
s"shuffleId $shuffleId, $destroyResource")
-
- val workerIds = new util.ArrayList[String]()
- val workerSlotsPerDisk = new util.ArrayList[util.Map[String, Integer]]()
- Utils.getSlotsPerDisk(destroyResource).asScala.foreach {
- case (workerInfo, slotsPerDisk) =>
- workerIds.add(workerInfo.toUniqueId())
- workerSlotsPerDisk.add(slotsPerDisk)
- }
- val msg = ReleaseSlots(appUniqueId, shuffleId, workerIds,
workerSlotsPerDisk)
- requestPrimaryReleaseSlots(msg)
- logDebug(s"Released slots for reserve buffer failed workers " +
- s"${workerIds.asScala.mkString(",")}" +
s"${slots.asScala.mkString(",")}" +
- s"shuffleId $shuffleId")
}
}
@@ -1089,16 +1064,6 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
}
}
- private def requestPrimaryReleaseSlots(message: ReleaseSlots):
ReleaseSlotsResponse = {
- try {
- masterClient.askSync[ReleaseSlotsResponse](message,
classOf[ReleaseSlotsResponse])
- } catch {
- case e: Exception =>
- logError(s"AskSync ReleaseSlots for ${message.shuffleId} failed.", e)
- ReleaseSlotsResponse(StatusCode.REQUEST_FAILED)
- }
- }
-
private def requestPrimaryUnregisterShuffle(message: PbUnregisterShuffle)
: PbUnregisterShuffleResponse = {
try {
@@ -1125,11 +1090,6 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
}
}
- private def shuffleResourceExists(shuffleId: Int): Boolean = {
- val workerPartitionInfos = workerSnapshots(shuffleId)
- workerPartitionInfos != null &&
workerPartitionInfos.values().asScala.exists(!_.isEmpty())
- }
-
// Once a partition is released, it will be never needed anymore
def releasePartition(shuffleId: Int, partitionId: Int): Unit = {
commitManager.releasePartitionResource(shuffleId, partitionId)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 0d9b67bcb..0454ceff1 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -25,8 +25,8 @@ enum MessageType {
REGISTER_SHUFFLE = 4;
REGISTER_SHUFFLE_RESPONSE = 5;
REQUEST_SLOTS = 6;
- RELEASE_SLOTS = 7;
- RELEASE_SLOTS_RESPONSE = 8;
+ // RELEASE_SLOTS = 7;
+ // RELEASE_SLOTS_RESPONSE = 8;
REQUEST_SLOTS_RESPONSE = 9;
CHANGE_LOCATION = 10;
CHANGE_LOCATION_RESPONSE = 11;
@@ -189,18 +189,6 @@ message PbSlotInfo {
map<string, int32> slot = 1;
}
-message PbReleaseSlots {
- string applicationId = 1;
- int32 shuffleId = 2;
- repeated string workerIds = 3;
- repeated PbSlotInfo slots = 4;
- string requestId = 6;
-}
-
-message PbReleaseSlotsResponse {
- int32 status = 1;
-}
-
message PbRequestSlotsResponse {
int32 status = 1;
map<string, PbWorkerResource> workerResource = 2;
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 1be450005..faeb02b17 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -169,17 +169,6 @@ object ControlMessages extends Logging {
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage
- case class ReleaseSlots(
- applicationId: String,
- shuffleId: Int,
- workerIds: util.List[String],
- slots: util.List[util.Map[String, Integer]],
- override var requestId: String = ZERO_UUID)
- extends MasterRequestMessage
-
- case class ReleaseSlotsResponse(status: StatusCode)
- extends MasterMessage
-
case class RequestSlotsResponse(
status: StatusCode,
workerResource: WorkerResource)
@@ -495,23 +484,6 @@ object ControlMessages extends Logging {
.build().toByteArray
new TransportMessage(MessageType.REQUEST_SLOTS, payload)
- case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) =>
- val pbSlots = slots.asScala.map(slot =>
- PbSlotInfo.newBuilder().putAllSlot(slot).build()).toList
- val payload = PbReleaseSlots.newBuilder()
- .setApplicationId(applicationId)
- .setShuffleId(shuffleId)
- .setRequestId(requestId)
- .addAllWorkerIds(workerIds)
- .addAllSlots(pbSlots.asJava)
- .build().toByteArray
- new TransportMessage(MessageType.RELEASE_SLOTS, payload)
-
- case ReleaseSlotsResponse(status) =>
- val payload = PbReleaseSlotsResponse.newBuilder()
- .setStatus(status.getValue).build().toByteArray
- new TransportMessage(MessageType.RELEASE_SLOTS_RESPONSE, payload)
-
case RequestSlotsResponse(status, workerResource) =>
val builder = PbRequestSlotsResponse.newBuilder()
.setStatus(status.getValue)
@@ -830,21 +802,6 @@ object ControlMessages extends Logging {
userIdentifier,
pbRequestSlots.getRequestId)
- case RELEASE_SLOTS_VALUE =>
- val pbReleaseSlots = PbReleaseSlots.parseFrom(message.getPayload)
- val slotsList = pbReleaseSlots.getSlotsList.asScala.map(pbSlot =>
- new util.HashMap[String, Integer](pbSlot.getSlotMap)).toList.asJava
- ReleaseSlots(
- pbReleaseSlots.getApplicationId,
- pbReleaseSlots.getShuffleId,
- new util.ArrayList[String](pbReleaseSlots.getWorkerIdsList),
- new util.ArrayList[util.Map[String, Integer]](slotsList),
- pbReleaseSlots.getRequestId)
-
- case RELEASE_SLOTS_RESPONSE_VALUE =>
- val pbReleaseSlotsResponse =
PbReleaseSlotsResponse.parseFrom(message.getPayload)
-
ReleaseSlotsResponse(Utils.toStatusCode(pbReleaseSlotsResponse.getStatus))
-
case REQUEST_SLOTS_RESPONSE_VALUE =>
val pbRequestSlotsResponse =
PbRequestSlotsResponse.parseFrom(message.getPayload)
RequestSlotsResponse(
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 60e75f8c3..8df57405f 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -95,29 +95,6 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
}
}
- public void updateReleaseSlotsMeta(String shuffleKey) {
- updateReleaseSlotsMeta(shuffleKey, null, null);
- }
-
- public void updateReleaseSlotsMeta(
- String shuffleKey, List<String> workerIds, List<Map<String, Integer>>
slots) {
- if (workerIds != null && !workerIds.isEmpty()) {
- for (int i = 0; i < workerIds.size(); i++) {
- String workerId = workerIds.get(i);
- WorkerInfo worker = WorkerInfo.fromUniqueId(workerId);
- for (WorkerInfo w : workers) {
- if (w.equals(worker)) {
- Map<String, Integer> slotToRelease = slots.get(i);
- LOG.info("release slots for worker {}, to release: {}", w,
slotToRelease);
- w.releaseSlots(shuffleKey, slotToRelease);
- }
- }
- }
- } else {
- workers.forEach(workerInfo -> workerInfo.releaseSlots(shuffleKey));
- }
- }
-
public void updateUnregisterShuffleMeta(String shuffleKey) {
registeredShuffle.remove(shuffleKey);
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 84b654653..6c4c65a73 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -32,12 +32,6 @@ public interface IMetadataHandler {
Map<String, Map<String, Integer>> workerToAllocatedSlots,
String requestId);
- void handleReleaseSlots(
- String shuffleKey,
- List<String> workerIds,
- List<Map<String, Integer>> slotStrings,
- String requestId);
-
void handleUnRegisterShuffle(String shuffleKey, String requestId);
void handleAppHeartbeat(
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 50d8a66b2..3d12db8b4 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -53,15 +53,6 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
updateRequestSlotsMeta(shuffleKey, hostName, workerToAllocatedSlots);
}
- @Override
- public void handleReleaseSlots(
- String shuffleKey,
- List<String> workerIds,
- List<Map<String, Integer>> slotStrings,
- String requestId) {
- updateReleaseSlotsMeta(shuffleKey, workerIds, slotStrings);
- }
-
@Override
public void handleUnRegisterShuffle(String shuffleKey, String requestId) {
updateUnregisterShuffleMeta(shuffleKey);
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index 352259a2b..f7a10013c 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -85,35 +85,6 @@ public class HAMasterMetaManager extends AbstractMetaManager
{
}
}
- @Override
- public void handleReleaseSlots(
- String shuffleKey,
- List<String> workerIds,
- List<Map<String, Integer>> slots,
- String requestId) {
- try {
- ratisServer.submitRequest(
- ResourceRequest.newBuilder()
- .setCmdType(Type.ReleaseSlots)
- .setRequestId(requestId)
- .setReleaseSlotsRequest(
- ResourceProtos.ReleaseSlotsRequest.newBuilder()
- .setShuffleKey(shuffleKey)
- .addAllWorkerIds(workerIds)
- .addAllSlots(
- slots.stream()
- .map(
- slot ->
-
ResourceProtos.SlotInfo.newBuilder().putAllSlot(slot).build())
- .collect(Collectors.toList()))
- .build())
- .build());
- } catch (CelebornRuntimeException e) {
- LOG.error("Handle release slots for {} failed!", shuffleKey, e);
- throw e;
- }
- }
-
@Override
public void handleUnRegisterShuffle(String shuffleKey, String requestId) {
try {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 828889820..04d98a217 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -305,13 +305,6 @@ private[celeborn] class Master(
logTrace(s"Received RequestSlots request $requestSlots.")
executeWithLeaderChecker(context, handleRequestSlots(context,
requestSlots))
- case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) =>
- logTrace(s"Received ReleaseSlots request $requestId, $applicationId,
$shuffleId," +
- s"workers ${workerIds.asScala.mkString(",")}, slots
${slots.asScala.mkString(",")}")
- executeWithLeaderChecker(
- context,
- handleReleaseSlots(context, applicationId, shuffleId, workerIds,
slots, requestId))
-
case pb: PbUnregisterShuffle =>
val applicationId = pb.getAppId
val shuffleId = pb.getShuffleId
@@ -627,17 +620,6 @@ private[celeborn] class Master(
context.reply(RequestSlotsResponse(StatusCode.SUCCESS,
slots.asInstanceOf[WorkerResource]))
}
- def handleReleaseSlots(
- context: RpcCallContext,
- applicationId: String,
- shuffleId: Int,
- workerIds: util.List[String],
- slots: util.List[util.Map[String, Integer]],
- requestId: String): Unit = {
- // For compatibility, ignore this message
- context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))
- }
-
def handleUnregisterShuffle(
context: RpcCallContext,
applicationId: String,
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 665af980d..aaae7861a 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -316,8 +316,6 @@ public class DefaultMetaSystemSuiteJ {
}
});
- statusSystem.handleReleaseSlots(SHUFFLEKEY1, workerIds, workerSlots,
getNewReqeustId());
-
Assert.assertEquals(
0,
statusSystem.workers.stream()
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 3684e095b..1d0e2c17b 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -494,7 +494,6 @@ public class RatisMasterStatusSystemSuiteJ {
}
});
- statusSystem.handleReleaseSlots(SHUFFLEKEY1, workerIds, workerSlots,
getNewReqeustId());
Thread.sleep(3000L);
// Do not update diskinfo's activeslots