This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 70275949f0bb1f51ecd426ed4644e260e6b095bc
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jul 27 17:46:00 2023 +0800

    [CELEBORN-846] Remove unused updateReleaseSlotsMeta in master side
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    
    CELEBORN-791 removed sending the ReleaseSlotsRequest from worker, so Master 
is not required to handle it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1767 from AngersZhuuuu/CELEBORN-846.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Angerszhuuuu <[email protected]>
---
 .../apache/celeborn/client/LifecycleManager.scala  | 40 --------------------
 common/src/main/proto/TransportMessages.proto      | 16 +-------
 .../common/protocol/message/ControlMessages.scala  | 43 ----------------------
 .../master/clustermeta/AbstractMetaManager.java    | 23 ------------
 .../master/clustermeta/IMetadataHandler.java       |  6 ---
 .../clustermeta/SingleMasterMetaManager.java       |  9 -----
 .../master/clustermeta/ha/HAMasterMetaManager.java | 29 ---------------
 .../celeborn/service/deploy/master/Master.scala    | 18 ---------
 .../clustermeta/DefaultMetaSystemSuiteJ.java       |  2 -
 .../ha/RatisMasterStatusSystemSuiteJ.java          |  1 -
 10 files changed, 2 insertions(+), 185 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index bc11519fd..00c2c4847 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -456,9 +456,6 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     if (!reserveSlotsSuccess) {
       logError(s"reserve buffer for $shuffleId failed, reply to all.")
       reply(RegisterShuffleResponse(StatusCode.RESERVE_SLOTS_FAILED, 
Array.empty))
-      // tell Primary to release slots
-      requestPrimaryReleaseSlots(
-        ReleaseSlots(appUniqueId, shuffleId, List.empty.asJava, 
List.empty.asJava))
     } else {
       logInfo(s"ReserveSlots for $shuffleId success!")
       logDebug(s"Allocated Slots: $slots")
@@ -576,8 +573,6 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         partitionLocationInfo.removeAllPrimaryPartitions()
         partitionLocationInfo.removeAllReplicaPartitions()
       }
-      requestPrimaryReleaseSlots(
-        ReleaseSlots(appUniqueId, shuffleId, List.empty.asJava, 
List.empty.asJava))
     }
   }
 
@@ -628,13 +623,6 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       }
     }
 
-    if (shuffleResourceExists(shuffleId)) {
-      logWarning(s"Partition exists for shuffle $shuffleId, " +
-        "maybe caused by task rerun or speculative.")
-      requestPrimaryReleaseSlots(
-        ReleaseSlots(appUniqueId, shuffleId, List.empty.asJava, 
List.empty.asJava))
-    }
-
     // add shuffleKey to delay shuffle removal set
     unregisterShuffleTime.put(shuffleId, System.currentTimeMillis())
 
@@ -762,19 +750,6 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       destroySlotsWithRetry(shuffleId, destroyResource)
       logInfo(s"Destroyed peer partitions for reserve buffer failed workers " +
         s"shuffleId $shuffleId, $destroyResource")
-
-      val workerIds = new util.ArrayList[String]()
-      val workerSlotsPerDisk = new util.ArrayList[util.Map[String, Integer]]()
-      Utils.getSlotsPerDisk(destroyResource).asScala.foreach {
-        case (workerInfo, slotsPerDisk) =>
-          workerIds.add(workerInfo.toUniqueId())
-          workerSlotsPerDisk.add(slotsPerDisk)
-      }
-      val msg = ReleaseSlots(appUniqueId, shuffleId, workerIds, 
workerSlotsPerDisk)
-      requestPrimaryReleaseSlots(msg)
-      logDebug(s"Released slots for reserve buffer failed workers " +
-        s"${workerIds.asScala.mkString(",")}" + 
s"${slots.asScala.mkString(",")}" +
-        s"shuffleId $shuffleId")
     }
   }
 
@@ -1089,16 +1064,6 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
     }
   }
 
-  private def requestPrimaryReleaseSlots(message: ReleaseSlots): 
ReleaseSlotsResponse = {
-    try {
-      masterClient.askSync[ReleaseSlotsResponse](message, 
classOf[ReleaseSlotsResponse])
-    } catch {
-      case e: Exception =>
-        logError(s"AskSync ReleaseSlots for ${message.shuffleId} failed.", e)
-        ReleaseSlotsResponse(StatusCode.REQUEST_FAILED)
-    }
-  }
-
   private def requestPrimaryUnregisterShuffle(message: PbUnregisterShuffle)
       : PbUnregisterShuffleResponse = {
     try {
@@ -1125,11 +1090,6 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
     }
   }
 
-  private def shuffleResourceExists(shuffleId: Int): Boolean = {
-    val workerPartitionInfos = workerSnapshots(shuffleId)
-    workerPartitionInfos != null && 
workerPartitionInfos.values().asScala.exists(!_.isEmpty())
-  }
-
   // Once a partition is released, it will be never needed anymore
   def releasePartition(shuffleId: Int, partitionId: Int): Unit = {
     commitManager.releasePartitionResource(shuffleId, partitionId)
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index a1c693c03..ae37c27a8 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -25,8 +25,8 @@ enum MessageType {
   REGISTER_SHUFFLE = 4;
   REGISTER_SHUFFLE_RESPONSE = 5;
   REQUEST_SLOTS = 6;
-  RELEASE_SLOTS = 7;
-  RELEASE_SLOTS_RESPONSE = 8;
+  //  RELEASE_SLOTS = 7;
+  //  RELEASE_SLOTS_RESPONSE = 8;
   REQUEST_SLOTS_RESPONSE = 9;
   REVIVE = 10;
   CHANGE_LOCATION_RESPONSE = 11;
@@ -191,18 +191,6 @@ message PbSlotInfo {
   map<string, int32> slot = 1;
 }
 
-message PbReleaseSlots {
-  string applicationId = 1;
-  int32 shuffleId = 2;
-  repeated string workerIds = 3;
-  repeated PbSlotInfo slots = 4;
-  string requestId = 6;
-}
-
-message PbReleaseSlotsResponse {
-  int32 status = 1;
-}
-
 message PbRequestSlotsResponse {
   int32 status = 1;
   map<string, PbWorkerResource> workerResource = 2;
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index a0ba251a8..df2df24ad 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -169,17 +169,6 @@ object ControlMessages extends Logging {
       override var requestId: String = ZERO_UUID)
     extends MasterRequestMessage
 
-  case class ReleaseSlots(
-      applicationId: String,
-      shuffleId: Int,
-      workerIds: util.List[String],
-      slots: util.List[util.Map[String, Integer]],
-      override var requestId: String = ZERO_UUID)
-    extends MasterRequestMessage
-
-  case class ReleaseSlotsResponse(status: StatusCode)
-    extends MasterMessage
-
   case class RequestSlotsResponse(
       status: StatusCode,
       workerResource: WorkerResource)
@@ -502,23 +491,6 @@ object ControlMessages extends Logging {
         .build().toByteArray
       new TransportMessage(MessageType.REQUEST_SLOTS, payload)
 
-    case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) =>
-      val pbSlots = slots.asScala.map(slot =>
-        PbSlotInfo.newBuilder().putAllSlot(slot).build()).toList
-      val payload = PbReleaseSlots.newBuilder()
-        .setApplicationId(applicationId)
-        .setShuffleId(shuffleId)
-        .setRequestId(requestId)
-        .addAllWorkerIds(workerIds)
-        .addAllSlots(pbSlots.asJava)
-        .build().toByteArray
-      new TransportMessage(MessageType.RELEASE_SLOTS, payload)
-
-    case ReleaseSlotsResponse(status) =>
-      val payload = PbReleaseSlotsResponse.newBuilder()
-        .setStatus(status.getValue).build().toByteArray
-      new TransportMessage(MessageType.RELEASE_SLOTS_RESPONSE, payload)
-
     case RequestSlotsResponse(status, workerResource) =>
       val builder = PbRequestSlotsResponse.newBuilder()
         .setStatus(status.getValue)
@@ -856,21 +828,6 @@ object ControlMessages extends Logging {
           userIdentifier,
           pbRequestSlots.getRequestId)
 
-      case RELEASE_SLOTS_VALUE =>
-        val pbReleaseSlots = PbReleaseSlots.parseFrom(message.getPayload)
-        val slotsList = pbReleaseSlots.getSlotsList.asScala.map(pbSlot =>
-          new util.HashMap[String, Integer](pbSlot.getSlotMap)).toList.asJava
-        ReleaseSlots(
-          pbReleaseSlots.getApplicationId,
-          pbReleaseSlots.getShuffleId,
-          new util.ArrayList[String](pbReleaseSlots.getWorkerIdsList),
-          new util.ArrayList[util.Map[String, Integer]](slotsList),
-          pbReleaseSlots.getRequestId)
-
-      case RELEASE_SLOTS_RESPONSE_VALUE =>
-        val pbReleaseSlotsResponse = 
PbReleaseSlotsResponse.parseFrom(message.getPayload)
-        
ReleaseSlotsResponse(Utils.toStatusCode(pbReleaseSlotsResponse.getStatus))
-
       case REQUEST_SLOTS_RESPONSE_VALUE =>
         val pbRequestSlotsResponse = 
PbRequestSlotsResponse.parseFrom(message.getPayload)
         RequestSlotsResponse(
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 60e75f8c3..8df57405f 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -95,29 +95,6 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     }
   }
 
-  public void updateReleaseSlotsMeta(String shuffleKey) {
-    updateReleaseSlotsMeta(shuffleKey, null, null);
-  }
-
-  public void updateReleaseSlotsMeta(
-      String shuffleKey, List<String> workerIds, List<Map<String, Integer>> 
slots) {
-    if (workerIds != null && !workerIds.isEmpty()) {
-      for (int i = 0; i < workerIds.size(); i++) {
-        String workerId = workerIds.get(i);
-        WorkerInfo worker = WorkerInfo.fromUniqueId(workerId);
-        for (WorkerInfo w : workers) {
-          if (w.equals(worker)) {
-            Map<String, Integer> slotToRelease = slots.get(i);
-            LOG.info("release slots for worker {}, to release: {}", w, 
slotToRelease);
-            w.releaseSlots(shuffleKey, slotToRelease);
-          }
-        }
-      }
-    } else {
-      workers.forEach(workerInfo -> workerInfo.releaseSlots(shuffleKey));
-    }
-  }
-
   public void updateUnregisterShuffleMeta(String shuffleKey) {
     registeredShuffle.remove(shuffleKey);
   }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 84b654653..6c4c65a73 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -32,12 +32,6 @@ public interface IMetadataHandler {
       Map<String, Map<String, Integer>> workerToAllocatedSlots,
       String requestId);
 
-  void handleReleaseSlots(
-      String shuffleKey,
-      List<String> workerIds,
-      List<Map<String, Integer>> slotStrings,
-      String requestId);
-
   void handleUnRegisterShuffle(String shuffleKey, String requestId);
 
   void handleAppHeartbeat(
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 50d8a66b2..3d12db8b4 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -53,15 +53,6 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
     updateRequestSlotsMeta(shuffleKey, hostName, workerToAllocatedSlots);
   }
 
-  @Override
-  public void handleReleaseSlots(
-      String shuffleKey,
-      List<String> workerIds,
-      List<Map<String, Integer>> slotStrings,
-      String requestId) {
-    updateReleaseSlotsMeta(shuffleKey, workerIds, slotStrings);
-  }
-
   @Override
   public void handleUnRegisterShuffle(String shuffleKey, String requestId) {
     updateUnregisterShuffleMeta(shuffleKey);
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index 352259a2b..f7a10013c 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -85,35 +85,6 @@ public class HAMasterMetaManager extends AbstractMetaManager 
{
     }
   }
 
-  @Override
-  public void handleReleaseSlots(
-      String shuffleKey,
-      List<String> workerIds,
-      List<Map<String, Integer>> slots,
-      String requestId) {
-    try {
-      ratisServer.submitRequest(
-          ResourceRequest.newBuilder()
-              .setCmdType(Type.ReleaseSlots)
-              .setRequestId(requestId)
-              .setReleaseSlotsRequest(
-                  ResourceProtos.ReleaseSlotsRequest.newBuilder()
-                      .setShuffleKey(shuffleKey)
-                      .addAllWorkerIds(workerIds)
-                      .addAllSlots(
-                          slots.stream()
-                              .map(
-                                  slot ->
-                                      
ResourceProtos.SlotInfo.newBuilder().putAllSlot(slot).build())
-                              .collect(Collectors.toList()))
-                      .build())
-              .build());
-    } catch (CelebornRuntimeException e) {
-      LOG.error("Handle release slots for {} failed!", shuffleKey, e);
-      throw e;
-    }
-  }
-
   @Override
   public void handleUnRegisterShuffle(String shuffleKey, String requestId) {
     try {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 690fd30b8..3c34b4ab2 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -305,13 +305,6 @@ private[celeborn] class Master(
       logTrace(s"Received RequestSlots request $requestSlots.")
       executeWithLeaderChecker(context, handleRequestSlots(context, 
requestSlots))
 
-    case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) =>
-      logTrace(s"Received ReleaseSlots request $requestId, $applicationId, 
$shuffleId," +
-        s"workers ${workerIds.asScala.mkString(",")}, slots 
${slots.asScala.mkString(",")}")
-      executeWithLeaderChecker(
-        context,
-        handleReleaseSlots(context, applicationId, shuffleId, workerIds, 
slots, requestId))
-
     case pb: PbUnregisterShuffle =>
       val applicationId = pb.getAppId
       val shuffleId = pb.getShuffleId
@@ -630,17 +623,6 @@ private[celeborn] class Master(
     context.reply(RequestSlotsResponse(StatusCode.SUCCESS, 
slots.asInstanceOf[WorkerResource]))
   }
 
-  def handleReleaseSlots(
-      context: RpcCallContext,
-      applicationId: String,
-      shuffleId: Int,
-      workerIds: util.List[String],
-      slots: util.List[util.Map[String, Integer]],
-      requestId: String): Unit = {
-    // For compatibility, ignore this message
-    context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))
-  }
-
   def handleUnregisterShuffle(
       context: RpcCallContext,
       applicationId: String,
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 665af980d..aaae7861a 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -316,8 +316,6 @@ public class DefaultMetaSystemSuiteJ {
           }
         });
 
-    statusSystem.handleReleaseSlots(SHUFFLEKEY1, workerIds, workerSlots, 
getNewReqeustId());
-
     Assert.assertEquals(
         0,
         statusSystem.workers.stream()
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 3684e095b..1d0e2c17b 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -494,7 +494,6 @@ public class RatisMasterStatusSystemSuiteJ {
           }
         });
 
-    statusSystem.handleReleaseSlots(SHUFFLEKEY1, workerIds, workerSlots, 
getNewReqeustId());
     Thread.sleep(3000L);
 
     // Do not update diskinfo's activeslots

Reply via email to