This is an automated email from the ASF dual-hosted git repository. zhouky pushed a commit to branch branch-0.3 in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 70275949f0bb1f51ecd426ed4644e260e6b095bc Author: Angerszhuuuu <[email protected]> AuthorDate: Thu Jul 27 17:46:00 2023 +0800 [CELEBORN-846] Remove unused updateReleaseSlotsMeta in master side ### What changes were proposed in this pull request? As title ### Why are the changes needed? CELEBORN-791 removed sending the ReleaseSlotsRequest from worker, so Master is not required to handle it. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1767 from AngersZhuuuu/CELEBORN-846. Authored-by: Angerszhuuuu <[email protected]> Signed-off-by: Angerszhuuuu <[email protected]> --- .../apache/celeborn/client/LifecycleManager.scala | 40 -------------------- common/src/main/proto/TransportMessages.proto | 16 +------- .../common/protocol/message/ControlMessages.scala | 43 ---------------------- .../master/clustermeta/AbstractMetaManager.java | 23 ------------ .../master/clustermeta/IMetadataHandler.java | 6 --- .../clustermeta/SingleMasterMetaManager.java | 9 ----- .../master/clustermeta/ha/HAMasterMetaManager.java | 29 --------------- .../celeborn/service/deploy/master/Master.scala | 18 --------- .../clustermeta/DefaultMetaSystemSuiteJ.java | 2 - .../ha/RatisMasterStatusSystemSuiteJ.java | 1 - 10 files changed, 2 insertions(+), 185 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index bc11519fd..00c2c4847 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -456,9 +456,6 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends if (!reserveSlotsSuccess) { logError(s"reserve buffer for $shuffleId failed, reply to all.") reply(RegisterShuffleResponse(StatusCode.RESERVE_SLOTS_FAILED, Array.empty)) - // tell Primary to release slots - requestPrimaryReleaseSlots( - ReleaseSlots(appUniqueId, shuffleId, List.empty.asJava, List.empty.asJava)) } else { logInfo(s"ReserveSlots for $shuffleId success!") logDebug(s"Allocated Slots: $slots") @@ -576,8 +573,6 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends partitionLocationInfo.removeAllPrimaryPartitions() partitionLocationInfo.removeAllReplicaPartitions() } - requestPrimaryReleaseSlots( - ReleaseSlots(appUniqueId, shuffleId, List.empty.asJava, List.empty.asJava)) } } @@ -628,13 +623,6 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends } } - if (shuffleResourceExists(shuffleId)) { - logWarning(s"Partition exists for shuffle $shuffleId, " + - "maybe caused by task rerun or speculative.") - requestPrimaryReleaseSlots( - ReleaseSlots(appUniqueId, shuffleId, List.empty.asJava, List.empty.asJava)) - } - // add shuffleKey to delay shuffle removal set unregisterShuffleTime.put(shuffleId, System.currentTimeMillis()) @@ -762,19 +750,6 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends destroySlotsWithRetry(shuffleId, destroyResource) logInfo(s"Destroyed peer partitions for reserve buffer failed workers " + s"shuffleId $shuffleId, $destroyResource") - - val workerIds = new util.ArrayList[String]() - val workerSlotsPerDisk = new util.ArrayList[util.Map[String, Integer]]() - Utils.getSlotsPerDisk(destroyResource).asScala.foreach { - case (workerInfo, slotsPerDisk) => - workerIds.add(workerInfo.toUniqueId()) - workerSlotsPerDisk.add(slotsPerDisk) - } - val msg = ReleaseSlots(appUniqueId, shuffleId, workerIds, workerSlotsPerDisk) - requestPrimaryReleaseSlots(msg) - logDebug(s"Released slots for reserve buffer failed workers " + - s"${workerIds.asScala.mkString(",")}" + s"${slots.asScala.mkString(",")}" + - s"shuffleId $shuffleId") } } @@ -1089,16 +1064,6 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends } } - private def requestPrimaryReleaseSlots(message: ReleaseSlots): ReleaseSlotsResponse = { - try { - masterClient.askSync[ReleaseSlotsResponse](message, classOf[ReleaseSlotsResponse]) - } catch { - case e: Exception => - logError(s"AskSync ReleaseSlots for ${message.shuffleId} failed.", e) - ReleaseSlotsResponse(StatusCode.REQUEST_FAILED) - } - } - private def requestPrimaryUnregisterShuffle(message: PbUnregisterShuffle) : PbUnregisterShuffleResponse = { try { @@ -1125,11 +1090,6 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends } } - private def shuffleResourceExists(shuffleId: Int): Boolean = { - val workerPartitionInfos = workerSnapshots(shuffleId) - workerPartitionInfos != null && workerPartitionInfos.values().asScala.exists(!_.isEmpty()) - } - // Once a partition is released, it will be never needed anymore def releasePartition(shuffleId: Int, partitionId: Int): Unit = { commitManager.releasePartitionResource(shuffleId, partitionId) diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index a1c693c03..ae37c27a8 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -25,8 +25,8 @@ enum MessageType { REGISTER_SHUFFLE = 4; REGISTER_SHUFFLE_RESPONSE = 5; REQUEST_SLOTS = 6; - RELEASE_SLOTS = 7; - RELEASE_SLOTS_RESPONSE = 8; + // RELEASE_SLOTS = 7; + // RELEASE_SLOTS_RESPONSE = 8; REQUEST_SLOTS_RESPONSE = 9; REVIVE = 10; CHANGE_LOCATION_RESPONSE = 11; @@ -191,18 +191,6 @@ message PbSlotInfo { map<string, int32> slot = 1; } -message PbReleaseSlots { - string applicationId = 1; - int32 shuffleId = 2; - repeated string workerIds = 3; - repeated PbSlotInfo slots = 4; - string requestId = 6; -} - -message PbReleaseSlotsResponse { - int32 status = 1; -} - message PbRequestSlotsResponse { int32 status = 1; map<string, PbWorkerResource> workerResource = 2; diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index a0ba251a8..df2df24ad 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -169,17 +169,6 @@ object ControlMessages extends Logging { override var requestId: String = ZERO_UUID) extends MasterRequestMessage - case class ReleaseSlots( - applicationId: String, - shuffleId: Int, - workerIds: util.List[String], - slots: util.List[util.Map[String, Integer]], - override var requestId: String = ZERO_UUID) - extends MasterRequestMessage - - case class ReleaseSlotsResponse(status: StatusCode) - extends MasterMessage - case class RequestSlotsResponse( status: StatusCode, workerResource: WorkerResource) @@ -502,23 +491,6 @@ object ControlMessages extends Logging { .build().toByteArray new TransportMessage(MessageType.REQUEST_SLOTS, payload) - case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) => - val pbSlots = slots.asScala.map(slot => - PbSlotInfo.newBuilder().putAllSlot(slot).build()).toList - val payload = PbReleaseSlots.newBuilder() - .setApplicationId(applicationId) - .setShuffleId(shuffleId) - .setRequestId(requestId) - .addAllWorkerIds(workerIds) - .addAllSlots(pbSlots.asJava) - .build().toByteArray - new TransportMessage(MessageType.RELEASE_SLOTS, payload) - - case ReleaseSlotsResponse(status) => - val payload = PbReleaseSlotsResponse.newBuilder() - .setStatus(status.getValue).build().toByteArray - new TransportMessage(MessageType.RELEASE_SLOTS_RESPONSE, payload) - case RequestSlotsResponse(status, workerResource) => val builder = PbRequestSlotsResponse.newBuilder() .setStatus(status.getValue) @@ -856,21 +828,6 @@ object ControlMessages extends Logging { userIdentifier, pbRequestSlots.getRequestId) - case RELEASE_SLOTS_VALUE => - val pbReleaseSlots = PbReleaseSlots.parseFrom(message.getPayload) - val slotsList = pbReleaseSlots.getSlotsList.asScala.map(pbSlot => - new util.HashMap[String, Integer](pbSlot.getSlotMap)).toList.asJava - ReleaseSlots( - pbReleaseSlots.getApplicationId, - pbReleaseSlots.getShuffleId, - new util.ArrayList[String](pbReleaseSlots.getWorkerIdsList), - new util.ArrayList[util.Map[String, Integer]](slotsList), - pbReleaseSlots.getRequestId) - - case RELEASE_SLOTS_RESPONSE_VALUE => - val pbReleaseSlotsResponse = PbReleaseSlotsResponse.parseFrom(message.getPayload) - ReleaseSlotsResponse(Utils.toStatusCode(pbReleaseSlotsResponse.getStatus)) - case REQUEST_SLOTS_RESPONSE_VALUE => val pbRequestSlotsResponse = PbRequestSlotsResponse.parseFrom(message.getPayload) RequestSlotsResponse( diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 60e75f8c3..8df57405f 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -95,29 +95,6 @@ public abstract class AbstractMetaManager implements IMetadataHandler { } } - public void updateReleaseSlotsMeta(String shuffleKey) { - updateReleaseSlotsMeta(shuffleKey, null, null); - } - - public void updateReleaseSlotsMeta( - String shuffleKey, List<String> workerIds, List<Map<String, Integer>> slots) { - if (workerIds != null && !workerIds.isEmpty()) { - for (int i = 0; i < workerIds.size(); i++) { - String workerId = workerIds.get(i); - WorkerInfo worker = WorkerInfo.fromUniqueId(workerId); - for (WorkerInfo w : workers) { - if (w.equals(worker)) { - Map<String, Integer> slotToRelease = slots.get(i); - LOG.info("release slots for worker {}, to release: {}", w, slotToRelease); - w.releaseSlots(shuffleKey, slotToRelease); - } - } - } - } else { - workers.forEach(workerInfo -> workerInfo.releaseSlots(shuffleKey)); - } - } - public void updateUnregisterShuffleMeta(String shuffleKey) { registeredShuffle.remove(shuffleKey); } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java index 84b654653..6c4c65a73 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java @@ -32,12 +32,6 @@ public interface IMetadataHandler { Map<String, Map<String, Integer>> workerToAllocatedSlots, String requestId); - void handleReleaseSlots( - String shuffleKey, - List<String> workerIds, - List<Map<String, Integer>> slotStrings, - String requestId); - void handleUnRegisterShuffle(String shuffleKey, String requestId); void handleAppHeartbeat( diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java index 50d8a66b2..3d12db8b4 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -53,15 +53,6 @@ public class SingleMasterMetaManager extends AbstractMetaManager { updateRequestSlotsMeta(shuffleKey, hostName, workerToAllocatedSlots); } - @Override - public void handleReleaseSlots( - String shuffleKey, - List<String> workerIds, - List<Map<String, Integer>> slotStrings, - String requestId) { - updateReleaseSlotsMeta(shuffleKey, workerIds, slotStrings); - } - @Override public void handleUnRegisterShuffle(String shuffleKey, String requestId) { updateUnregisterShuffleMeta(shuffleKey); diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index 352259a2b..f7a10013c 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -85,35 +85,6 @@ public class HAMasterMetaManager extends AbstractMetaManager { } } - @Override - public void handleReleaseSlots( - String shuffleKey, - List<String> workerIds, - List<Map<String, Integer>> slots, - String requestId) { - try { - ratisServer.submitRequest( - ResourceRequest.newBuilder() - .setCmdType(Type.ReleaseSlots) - .setRequestId(requestId) - .setReleaseSlotsRequest( - ResourceProtos.ReleaseSlotsRequest.newBuilder() - .setShuffleKey(shuffleKey) - .addAllWorkerIds(workerIds) - .addAllSlots( - slots.stream() - .map( - slot -> - ResourceProtos.SlotInfo.newBuilder().putAllSlot(slot).build()) - .collect(Collectors.toList())) - .build()) - .build()); - } catch (CelebornRuntimeException e) { - LOG.error("Handle release slots for {} failed!", shuffleKey, e); - throw e; - } - } - @Override public void handleUnRegisterShuffle(String shuffleKey, String requestId) { try { diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 690fd30b8..3c34b4ab2 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -305,13 +305,6 @@ private[celeborn] class Master( logTrace(s"Received RequestSlots request $requestSlots.") executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots)) - case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) => - logTrace(s"Received ReleaseSlots request $requestId, $applicationId, $shuffleId," + - s"workers ${workerIds.asScala.mkString(",")}, slots ${slots.asScala.mkString(",")}") - executeWithLeaderChecker( - context, - handleReleaseSlots(context, applicationId, shuffleId, workerIds, slots, requestId)) - case pb: PbUnregisterShuffle => val applicationId = pb.getAppId val shuffleId = pb.getShuffleId @@ -630,17 +623,6 @@ private[celeborn] class Master( context.reply(RequestSlotsResponse(StatusCode.SUCCESS, slots.asInstanceOf[WorkerResource])) } - def handleReleaseSlots( - context: RpcCallContext, - applicationId: String, - shuffleId: Int, - workerIds: util.List[String], - slots: util.List[util.Map[String, Integer]], - requestId: String): Unit = { - // For compatibility, ignore this message - context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS)) - } - def handleUnregisterShuffle( context: RpcCallContext, applicationId: String, diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index 665af980d..aaae7861a 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -316,8 +316,6 @@ public class DefaultMetaSystemSuiteJ { } }); - statusSystem.handleReleaseSlots(SHUFFLEKEY1, workerIds, workerSlots, getNewReqeustId()); - Assert.assertEquals( 0, statusSystem.workers.stream() diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index 3684e095b..1d0e2c17b 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -494,7 +494,6 @@ public class RatisMasterStatusSystemSuiteJ { } }); - statusSystem.handleReleaseSlots(SHUFFLEKEY1, workerIds, workerSlots, getNewReqeustId()); Thread.sleep(3000L); // Do not update diskinfo's activeslots
