This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new df01fadc9 [CELEBORN-1601] Support revise lost shuffles
df01fadc9 is described below

commit df01fadc9fa7dd9c2781105c82962dbb8f096603
Author: mingji <[email protected]>
AuthorDate: Mon Oct 21 16:44:37 2024 +0800

    [CELEBORN-1601] Support revise lost shuffles
    
    ### What changes were proposed in this pull request?
    To support revising lost shuffle IDs in a long-running job such as flink 
batch jobs.
    
    ### Why are the changes needed?
    1. To support revise lost shuffles.
    2. To add an HTTP endpoint to revise lost shuffles manually.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    Cluster tests.
    
    Closes #2746 from FMX/b1600.
    
    Lead-authored-by: mingji <[email protected]>
    Co-authored-by: Ethan Feng <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../apache/celeborn/cli/common/CommonOptions.scala |   7 +
 .../apache/celeborn/cli/master/MasterOptions.scala |  10 ++
 .../celeborn/cli/master/MasterSubcommand.scala     |   7 +
 .../celeborn/cli/master/MasterSubcommandImpl.scala |  18 +++
 .../cli/master/ReviseLostShuffleOptions.scala      |  29 +++++
 .../celeborn/cli/TestCelebornCliCommands.scala     |  26 ++++
 .../celeborn/client/ApplicationHeartbeater.scala   |  38 +++++-
 .../apache/celeborn/client/LifecycleManager.scala  |   3 +-
 .../celeborn/client/WorkerStatusTrackerSuite.scala |   3 +-
 common/src/main/proto/TransportMessages.proto      |  14 ++
 .../org/apache/celeborn/common/CelebornConf.scala  |   9 ++
 .../common/protocol/message/ControlMessages.scala  |  36 +++++-
 .../apache/celeborn/common/util/PbSerDeUtils.scala |   6 +-
 docs/configuration/client.md                       |   1 +
 .../master/clustermeta/AbstractMetaManager.java    |  73 ++++++++---
 .../master/clustermeta/IMetadataHandler.java       |   2 +
 .../clustermeta/SingleMasterMetaManager.java       |   5 +
 .../master/clustermeta/ha/HAMasterMetaManager.java |  19 +++
 .../deploy/master/clustermeta/ha/MetaHandler.java  |  10 ++
 master/src/main/proto/Resource.proto               |   8 ++
 .../celeborn/service/deploy/master/Master.scala    |  51 +++++++-
 .../deploy/master/http/api/ApiMasterResource.scala |   1 +
 .../master/http/api/v1/ApplicationResource.scala   |  43 +++++-
 .../master/http/api/v1/ShuffleResource.scala       |  10 +-
 .../clustermeta/DefaultMetaSystemSuiteJ.java       |  14 +-
 .../ha/RatisMasterStatusSystemSuiteJ.java          |  61 +++++----
 .../celeborn/rest/v1/master/ApplicationApi.java    | 144 +++++++++++++++++++++
 .../celeborn/rest/v1/model/ContainerInfo.java      |   2 +-
 .../src/main/openapi3/master_rest_v1.yaml          |  46 +++++++
 .../celeborn/server/common/HttpService.scala       |   4 +
 .../LifecycleManagerUnregisterShuffleSuite.scala   |  19 +--
 .../celeborn/service/deploy/worker/Worker.scala    |   5 +
 32 files changed, 647 insertions(+), 77 deletions(-)

diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
index 507d5374f..69c243f94 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
@@ -71,4 +71,11 @@ class CommonOptions {
     paramLabel = "username",
     description = Array("The username of the TENANT_USER level."))
   private[cli] var configName: String = _
+
+  @Option(
+    names = Array("--apps"),
+    paramLabel = "appId",
+    description = Array("The application Id list seperated by comma."))
+  private[cli] var apps: String = _
+
 }
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
index d69a2e6bf..840a54d23 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
@@ -110,4 +110,14 @@ final class MasterOptions {
     names = Array("--remove-workers-unavailable-info"),
     description = Array("Remove the workers unavailable info from the 
master."))
   private[master] var removeWorkersUnavailableInfo: Boolean = _
+
+  @Option(
+    names = Array("--revise-lost-shuffles"),
+    description = Array("Revise lost shuffles or remove shuffles for an 
application."))
+  private[master] var reviseLostShuffles: Boolean = _
+
+  @Option(
+    names = Array("--delete-apps"),
+    description = Array("Delete resource of an application."))
+  private[master] var deleteApps: Boolean = _
 }
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
index 222650396..a875621e2 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
@@ -37,6 +37,9 @@ trait MasterSubcommand extends CliLogging {
   @ArgGroup(exclusive = true, multiplicity = "1")
   private[master] var masterOptions: MasterOptions = _
 
+  @ArgGroup(exclusive = false)
+  private[master] var reviseLostShuffleOptions: ReviseLostShuffleOptions = _
+
   @Mixin
   private[master] var commonOptions: CommonOptions = _
 
@@ -110,4 +113,8 @@ trait MasterSubcommand extends CliLogging {
 
   private[master] def runShowThreadDump: ThreadStackResponse
 
+  private[master] def reviseLostShuffles: HandleResponse
+
+  private[master] def deleteApps: HandleResponse
+
 }
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
index 32f540f4b..70bd32c63 100644
--- 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
+++ 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
@@ -51,6 +51,8 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
     if (masterOptions.showContainerInfo) log(runShowContainerInfo)
     if (masterOptions.showDynamicConf) log(runShowDynamicConf)
     if (masterOptions.showThreadDump) log(runShowThreadDump)
+    if (masterOptions.reviseLostShuffles) log(reviseLostShuffles)
+    if (masterOptions.deleteApps) log(deleteApps)
     if (masterOptions.addClusterAlias != null && 
masterOptions.addClusterAlias.nonEmpty)
       runAddClusterAlias
     if (masterOptions.removeClusterAlias != null && 
masterOptions.removeClusterAlias.nonEmpty)
@@ -220,4 +222,20 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
   }
 
   private[master] def runShowContainerInfo: ContainerInfo = 
defaultApi.getContainerInfo
+
+  override private[master] def reviseLostShuffles: HandleResponse = {
+    val app = commonOptions.apps
+    if (app.contains(",")) {
+      throw new ParameterException(
+        spec.commandLine(),
+        "Only one application id can be provided for this command.")
+    }
+    val shuffleIds = reviseLostShuffleOptions.shuffleIds
+    applicationApi.reviseLostShuffles(app, shuffleIds)
+  }
+
+  override private[master] def deleteApps: HandleResponse = {
+    val apps = commonOptions.apps
+    applicationApi.deleteApps(apps)
+  }
 }
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/master/ReviseLostShuffleOptions.scala
 
b/cli/src/main/scala/org/apache/celeborn/cli/master/ReviseLostShuffleOptions.scala
new file mode 100644
index 000000000..a0c4963fe
--- /dev/null
+++ 
b/cli/src/main/scala/org/apache/celeborn/cli/master/ReviseLostShuffleOptions.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.master
+
+import picocli.CommandLine.Option
+
+final class ReviseLostShuffleOptions {
+
+  @Option(
+    names = Array("--shuffleIds"),
+    description = Array("The shuffle ids to manipulate."))
+  private[master] var shuffleIds: String = _
+
+}
diff --git 
a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala 
b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
index 56b4ea213..1f450a19f 100644
--- a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
+++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
@@ -247,6 +247,32 @@ class TestCelebornCliCommands extends CelebornFunSuite 
with MiniClusterFeature {
     captureOutputAndValidateResponse(args, "success: true")
   }
 
+  test("master --delete-apps case1") {
+    val args = prepareMasterArgs() ++ Array(
+      "--delete-apps",
+      "--apps",
+      "app1")
+    captureOutputAndValidateResponse(args, "success: true")
+  }
+
+  test("master --delete-apps case2") {
+    val args = prepareMasterArgs() ++ Array(
+      "--delete-apps",
+      "--apps",
+      "app1,app2")
+    captureOutputAndValidateResponse(args, "success: true")
+  }
+
+  test("master --revise-lost-shuffles case1") {
+    val args = prepareMasterArgs() ++ Array(
+      "--revise-lost-shuffles",
+      "--apps",
+      "app1",
+      "--shuffleIds",
+      "1,2,3,4,5,6")
+    captureOutputAndValidateResponse(args, "success: true")
+  }
+
   private def prepareMasterArgs(): Array[String] = {
     Array(
       "master",
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index d3af38183..0b850ff52 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -17,14 +17,19 @@
 
 package org.apache.celeborn.client
 
-import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util
+import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
+import java.util.function.Consumer
 
 import scala.collection.JavaConverters._
 
+import org.apache.commons.lang3.StringUtils
+
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.client.MasterClient
 import org.apache.celeborn.common.internal.Logging
-import 
org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, 
ApplicationLostResponse, HeartbeatFromApplication, 
HeartbeatFromApplicationResponse, ZERO_UUID}
+import org.apache.celeborn.common.protocol.PbReviseLostShufflesResponse
+import 
org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, 
ApplicationLostResponse, HeartbeatFromApplication, 
HeartbeatFromApplicationResponse, ReviseLostShuffles, ZERO_UUID}
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.util.{ThreadUtils, Utils}
 
@@ -33,9 +38,11 @@ class ApplicationHeartbeater(
     conf: CelebornConf,
     masterClient: MasterClient,
     shuffleMetrics: () => (Long, Long),
-    workerStatusTracker: WorkerStatusTracker) extends Logging {
+    workerStatusTracker: WorkerStatusTracker,
+    registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean]) 
extends Logging {
 
   private var stopped = false
+  private val reviseLostShuffles = conf.reviseLostShufflesEnabled
 
   // Use independent app heartbeat threads to avoid being blocked by other 
operations.
   private val appHeartbeatIntervalMs = conf.appHeartbeatIntervalMs
@@ -68,6 +75,30 @@ class ApplicationHeartbeater(
             if (response.statusCode == StatusCode.SUCCESS) {
               logDebug("Successfully send app heartbeat.")
               workerStatusTracker.handleHeartbeatResponse(response)
+              // revise shuffle id if there are lost shuffles
+              if (reviseLostShuffles) {
+                val masterRecordedShuffleIds = response.registeredShuffles
+                val localOnlyShuffles = new util.ArrayList[Integer]()
+                registeredShuffles.forEach(new Consumer[Int] {
+                  override def accept(key: Int): Unit = {
+                    localOnlyShuffles.add(key)
+                  }
+                })
+                localOnlyShuffles.removeAll(masterRecordedShuffleIds)
+                if (!localOnlyShuffles.isEmpty) {
+                  logWarning(
+                    s"There are lost shuffle found 
${StringUtils.join(localOnlyShuffles, ",")}, revise lost shuffles.")
+                  val reviseLostShufflesResponse = masterClient.askSync(
+                    ReviseLostShuffles.apply(appId, localOnlyShuffles, 
MasterClient.genRequestId()),
+                    classOf[PbReviseLostShufflesResponse])
+                  if (!reviseLostShufflesResponse.getSuccess) {
+                    logWarning(
+                      s"Revise lost shuffles failed. Error message 
:${reviseLostShufflesResponse.getMessage}")
+                  } else {
+                    logInfo("Revise lost shuffles succeed.")
+                  }
+                }
+              }
             }
           } catch {
             case it: InterruptedException =>
@@ -97,6 +128,7 @@ class ApplicationHeartbeater(
           StatusCode.REQUEST_FAILED,
           List.empty.asJava,
           List.empty.asJava,
+          List.empty.asJava,
           List.empty.asJava)
     }
   }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index c4755fbf1..d35802958 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -210,7 +210,8 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       conf,
       masterClient,
       () => commitManager.commitMetrics(),
-      workerStatusTracker)
+      workerStatusTracker,
+      registeredShuffle)
   private val changePartitionManager = new ChangePartitionManager(conf, this)
   private val releasePartitionManager = new ReleasePartitionManager(conf, this)
 
diff --git 
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
 
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
index 1f606dd32..d2c21245d 100644
--- 
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
+++ 
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
@@ -95,7 +95,8 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
       StatusCode.SUCCESS,
       excludedWorkers,
       unknownWorkers,
-      shuttingWorkers)
+      shuttingWorkers,
+      new util.ArrayList[Integer]())
   }
 
   private def mockWorkers(workerHosts: Array[String]): 
util.ArrayList[WorkerInfo] = {
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index 7d5bb2c5e..bc8da08ab 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -109,6 +109,8 @@ enum MessageType {
   NOTIFY_REQUIRED_SEGMENT = 86;
   BATCH_UNREGISTER_SHUFFLES = 87;
   BATCH_UNREGISTER_SHUFFLE_RESPONSE= 88;
+  REVISE_LOST_SHUFFLES = 89;
+  REVISE_LOST_SHUFFLES_RESPONSE = 90;
 }
 
 enum StreamType {
@@ -447,6 +449,7 @@ message PbHeartbeatFromApplicationResponse {
   repeated PbWorkerInfo excludedWorkers = 2;
   repeated PbWorkerInfo unknownWorkers = 3;
   repeated PbWorkerInfo shuttingWorkers = 4;
+  repeated int32 registeredShuffles = 5;
 }
 
 message PbCheckQuota {
@@ -856,3 +859,14 @@ message PbReportWorkerDecommission {
   repeated PbWorkerInfo workers = 1;
   string requestId = 2;
 }
+
+message PbReviseLostShuffles{
+  string appId = 1;
+  repeated int32 lostShuffles = 2;
+  string requestId = 3;
+}
+
+message PbReviseLostShufflesResponse{
+  bool success = 1;
+  string message = 2;
+}
\ No newline at end of file
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index bf1af9b8a..3ec230654 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1071,6 +1071,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
 
   def registerShuffleFilterExcludedWorkerEnabled: Boolean =
     get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED)
+  def reviseLostShufflesEnabled: Boolean = get(REVISE_LOST_SHUFFLES_ENABLED)
 
   // //////////////////////////////////////////////////////
   //                       Worker                        //
@@ -5636,6 +5637,14 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val REVISE_LOST_SHUFFLES_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.client.shuffle.reviseLostShuffles.enabled")
+      .categories("client")
+      .version("0.6.0")
+      .doc("Whether to revise lost shuffles.")
+      .booleanConf
+      .createWithDefault(false)
+
   val NETWORK_IO_SASL_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.<module>.io.saslTimeout")
       .categories("network")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index bf4e7f269..d0eb85b02 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -335,6 +335,26 @@ object ControlMessages extends Logging {
         .build()
   }
 
+  object ReviseLostShuffles {
+    def apply(
+        appId: String,
+        lostShuffles: java.util.List[Integer],
+        requestId: String): PbReviseLostShuffles =
+      PbReviseLostShuffles.newBuilder()
+        .setAppId(appId)
+        .addAllLostShuffles(lostShuffles)
+        .setRequestId(requestId)
+        .build()
+  }
+
+  object ReviseLostShufflesResponse {
+    def apply(success: Boolean, message: String): PbReviseLostShufflesResponse 
=
+      PbReviseLostShufflesResponse.newBuilder()
+        .setSuccess(success)
+        .setMessage(message)
+        .build()
+  }
+
   case class StageEnd(shuffleId: Int) extends MasterMessage
 
   case class StageEndResponse(status: StatusCode)
@@ -400,7 +420,8 @@ object ControlMessages extends Logging {
       statusCode: StatusCode,
       excludedWorkers: util.List[WorkerInfo],
       unknownWorkers: util.List[WorkerInfo],
-      shuttingWorkers: util.List[WorkerInfo]) extends Message
+      shuttingWorkers: util.List[WorkerInfo],
+      registeredShuffles: util.List[Integer]) extends Message
 
   case class CheckQuota(userIdentifier: UserIdentifier) extends Message
 
@@ -565,6 +586,12 @@ object ControlMessages extends Logging {
     case pb: PbReportShuffleFetchFailureResponse =>
       new TransportMessage(MessageType.REPORT_SHUFFLE_FETCH_FAILURE_RESPONSE, 
pb.toByteArray)
 
+    case pb: PbReviseLostShuffles =>
+      new TransportMessage(MessageType.REVISE_LOST_SHUFFLES, pb.toByteArray)
+
+    case pb: PbReviseLostShufflesResponse =>
+      new TransportMessage(MessageType.REVISE_LOST_SHUFFLES_RESPONSE, 
pb.toByteArray)
+
     case pb: PbReportBarrierStageAttemptFailure =>
       new TransportMessage(MessageType.REPORT_BARRIER_STAGE_ATTEMPT_FAILURE, 
pb.toByteArray)
 
@@ -799,7 +826,8 @@ object ControlMessages extends Logging {
           statusCode,
           excludedWorkers,
           unknownWorkers,
-          shuttingWorkers) =>
+          shuttingWorkers,
+          registeredShuffles) =>
       val payload = PbHeartbeatFromApplicationResponse.newBuilder()
         .setStatus(statusCode.getValue)
         .addAllExcludedWorkers(
@@ -808,6 +836,7 @@ object ControlMessages extends Logging {
           unknownWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true, 
true)).toList.asJava)
         .addAllShuttingWorkers(
           shuttingWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true, 
true)).toList.asJava)
+        .addAllRegisteredShuffles(registeredShuffles)
         .build().toByteArray
       new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION_RESPONSE, 
payload)
 
@@ -1191,7 +1220,8 @@ object ControlMessages extends Logging {
           pbHeartbeatFromApplicationResponse.getUnknownWorkersList.asScala
             .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
           pbHeartbeatFromApplicationResponse.getShuttingWorkersList.asScala
-            .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
+            .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
+          pbHeartbeatFromApplicationResponse.getRegisteredShufflesList)
 
       case CHECK_QUOTA_VALUE =>
         val pbCheckAvailable = PbCheckQuota.parseFrom(message.getPayload)
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 30f00f179..f9f01b9a7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -450,7 +450,7 @@ object PbSerDeUtils {
 
   def toPbSnapshotMetaInfo(
       estimatedPartitionSize: java.lang.Long,
-      registeredShuffle: java.util.Set[String],
+      registeredShuffle: java.util.Map[String, java.util.Set[Integer]],
       hostnameSet: java.util.Set[String],
       excludedWorkers: java.util.Set[WorkerInfo],
       manuallyExcludedWorkers: java.util.Set[WorkerInfo],
@@ -468,7 +468,9 @@ object PbSerDeUtils {
       decommissionWorkers: java.util.Set[WorkerInfo]): PbSnapshotMetaInfo = {
     val builder = PbSnapshotMetaInfo.newBuilder()
       .setEstimatedPartitionSize(estimatedPartitionSize)
-      .addAllRegisteredShuffle(registeredShuffle)
+      .addAllRegisteredShuffle(registeredShuffle.asScala.flatMap { 
appIdAndShuffleId =>
+        appIdAndShuffleId._2.asScala.map(i => 
Utils.makeShuffleKey(appIdAndShuffleId._1, i))
+      }.asJava)
       .addAllHostnameSet(hostnameSet)
       .addAllExcludedWorkers(excludedWorkers.asScala.map(toPbWorkerInfo(_, 
true, false)).asJava)
       .addAllManuallyExcludedWorkers(manuallyExcludedWorkers.asScala
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 91d1faac5..ce309a13f 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -103,6 +103,7 @@ license: |
 | celeborn.client.shuffle.partitionSplit.threshold | 1G | false | Shuffle file 
size threshold, if file size exceeds this, trigger split. | 0.3.0 | 
celeborn.shuffle.partitionSplit.threshold | 
 | celeborn.client.shuffle.rangeReadFilter.enabled | false | false | If a spark 
application have skewed partition, this value can set to true to improve 
performance. | 0.2.0 | celeborn.shuffle.rangeReadFilter.enabled | 
 | celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | 
false | Whether to filter excluded worker when register shuffle. | 0.4.0 |  | 
+| celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether 
to revise lost shuffles. | 0.6.0 |  | 
 | celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that 
slots of one shuffle can be allocated on. Will choose the smaller positive one 
from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. 
| 0.3.1 |  | 
 | celeborn.client.spark.fetch.throwsFetchFailure | false | false | client 
throws FetchFailedException instead of CelebornIOException | 0.4.0 |  | 
 | celeborn.client.spark.push.dynamicWriteMode.enabled | false | false | 
Whether to dynamically switch push write mode based on conditions.If true, 
shuffle mode will be only determined by partition count | 0.5.0 |  | 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 1631e90d3..5c36dd23f 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -22,10 +22,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -33,6 +30,7 @@ import java.util.concurrent.atomic.LongAdder;
 import java.util.stream.Collectors;
 
 import scala.Option;
+import scala.Tuple2;
 
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
@@ -63,7 +61,8 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractMetaManager.class);
 
   // Metadata for master service
-  public final Set<String> registeredShuffle = ConcurrentHashMap.newKeySet();
+  public final Map<String, Set<Integer>> registeredAppAndShuffles =
+      JavaUtils.newConcurrentHashMap();
   public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
   public final Set<WorkerInfo> workers = ConcurrentHashMap.newKeySet();
 
@@ -92,9 +91,12 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
 
   public void updateRequestSlotsMeta(
       String shuffleKey, String hostName, Map<String, Map<String, Integer>> 
workerWithAllocations) {
-    registeredShuffle.add(shuffleKey);
+    Tuple2<String, Object> appIdShuffleId = Utils.splitShuffleKey(shuffleKey);
+    registeredAppAndShuffles
+        .computeIfAbsent(appIdShuffleId._1(), v -> new HashSet<>())
+        .add((Integer) appIdShuffleId._2);
 
-    String appId = Utils.splitShuffleKey(shuffleKey)._1;
+    String appId = appIdShuffleId._1;
     appHeartbeatTime.compute(
         appId,
         (applicationId, oldTimestamp) -> {
@@ -111,11 +113,29 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   }
 
   public void updateUnregisterShuffleMeta(String shuffleKey) {
-    registeredShuffle.remove(shuffleKey);
+    Tuple2<String, Object> appIdShuffleId = Utils.splitShuffleKey(shuffleKey);
+    Set<Integer> shuffleIds = 
registeredAppAndShuffles.get(appIdShuffleId._1());
+    if (shuffleIds != null) {
+      shuffleIds.remove(appIdShuffleId._2);
+      registeredAppAndShuffles.compute(
+          appIdShuffleId._1(),
+          (s, shuffles) -> {
+            if (shuffles.size() == 0) {
+              return null;
+            }
+            return shuffles;
+          });
+    }
   }
 
   public void updateBatchUnregisterShuffleMeta(List<String> shuffleKeys) {
-    registeredShuffle.removeAll(shuffleKeys);
+    for (String shuffleKey : shuffleKeys) {
+      Tuple2<String, Object> appIdShuffleId = 
Utils.splitShuffleKey(shuffleKey);
+      String appId = appIdShuffleId._1;
+      if (registeredAppAndShuffles.containsKey(appId)) {
+        registeredAppAndShuffles.get(appId).remove(appIdShuffleId._2);
+      }
+    }
   }
 
   public void updateAppHeartbeatMeta(String appId, long time, long 
totalWritten, long fileCount) {
@@ -125,7 +145,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   }
 
   public void updateAppLostMeta(String appId) {
-    registeredShuffle.removeIf(shuffleKey -> shuffleKey.startsWith(appId));
+    registeredAppAndShuffles.remove(appId);
     appHeartbeatTime.remove(appId);
     applicationMetas.remove(appId);
   }
@@ -136,6 +156,14 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     workersToRemove.forEach(manuallyExcludedWorkers::remove);
   }
 
+  public void reviseLostShuffles(String appId, List<Integer> lostShuffles) {
+    registeredAppAndShuffles.computeIfAbsent(appId, v -> new 
HashSet<>()).addAll(lostShuffles);
+  }
+
+  public void deleteApp(String appId) {
+    registeredAppAndShuffles.remove(appId);
+  }
+
   public void updateWorkerLostMeta(
       String host, int rpcPort, int pushPort, int fetchPort, int 
replicatePort) {
     WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort);
@@ -280,7 +308,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     byte[] snapshotBytes =
         PbSerDeUtils.toPbSnapshotMetaInfo(
                 estimatedPartitionSize,
-                registeredShuffle,
+                registeredAppAndShuffles,
                 hostnameSet,
                 excludedWorkers,
                 manuallyExcludedWorkers,
@@ -313,7 +341,12 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
 
       estimatedPartitionSize = snapshotMetaInfo.getEstimatedPartitionSize();
 
-      registeredShuffle.addAll(snapshotMetaInfo.getRegisteredShuffleList());
+      for (String shuffleKey : snapshotMetaInfo.getRegisteredShuffleList()) {
+        Tuple2<String, Object> appIdShuffleId = 
Utils.splitShuffleKey(shuffleKey);
+        registeredAppAndShuffles
+            .computeIfAbsent(appIdShuffleId._1, v -> new HashSet<>())
+            .add((Integer) appIdShuffleId._2);
+      }
       hostnameSet.addAll(snapshotMetaInfo.getHostnameSetList());
       excludedWorkers.addAll(
           snapshotMetaInfo.getExcludedWorkersList().stream()
@@ -329,9 +362,8 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
               .collect(Collectors.toSet()));
       appHeartbeatTime.putAll(snapshotMetaInfo.getAppHeartbeatTimeMap());
 
-      registeredShuffle.forEach(
-          shuffleKey -> {
-            String appId = Utils.splitShuffleKey(shuffleKey)._1;
+      registeredAppAndShuffles.forEach(
+          (appId, shuffleId) -> {
             if (!appHeartbeatTime.containsKey(appId)) {
               appHeartbeatTime.put(appId, System.currentTimeMillis());
             }
@@ -405,15 +437,16 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     LOG.info(
         "Worker size: {}, Registered shuffle size: {}. Worker excluded list 
size: {}. Manually Excluded list size: {}",
         workers.size(),
-        registeredShuffle.size(),
+        registeredAppAndShuffles.size(),
         excludedWorkers.size(),
         manuallyExcludedWorkers.size());
     workers.forEach(workerInfo -> LOG.info(workerInfo.toString()));
-    registeredShuffle.forEach(shuffle -> LOG.info("RegisteredShuffle {}", 
shuffle));
+    registeredAppAndShuffles.forEach(
+        (appId, shuffleId) -> LOG.info("RegisteredShuffle {}-{}", appId, 
shuffleId));
   }
 
   private void cleanUpState() {
-    registeredShuffle.clear();
+    registeredAppAndShuffles.clear();
     hostnameSet.clear();
     workers.clear();
     lostWorkers.clear();
@@ -504,4 +537,8 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public void updateApplicationMeta(ApplicationMeta applicationMeta) {
     applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta);
   }
+
+  public int registeredShuffleCount() {
+    return 
registeredAppAndShuffles.values().stream().mapToInt(Set::size).sum();
+  }
 }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 7eb72679e..9e17728d6 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -46,6 +46,8 @@ public interface IMetadataHandler {
   void handleWorkerExclude(
       List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove, String 
requestId);
 
+  void handleReviseLostShuffles(String appId, List<Integer> shuffles, String 
requestId);
+
   void handleWorkerLost(
       String host, int rpcPort, int pushPort, int fetchPort, int 
replicatePort, String requestId);
 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index b311b1b45..39bac5330 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -87,6 +87,11 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
     updateWorkerExcludeMeta(workersToAdd, workersToRemove);
   }
 
+  @Override
+  public void handleReviseLostShuffles(String appId, List<Integer> shuffles, 
String requestId) {
+    reviseLostShuffles(appId, shuffles);
+  }
+
   @Override
   public void handleWorkerLost(
       String host, int rpcPort, int pushPort, int fetchPort, int 
replicatePort, String requestId) {
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index d6c95afd4..1d5117991 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -194,6 +194,25 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
     }
   }
 
+  @Override
+  public void handleReviseLostShuffles(String appId, List<Integer> shuffles, 
String requestId) {
+    try {
+      ratisServer.submitRequest(
+          ResourceRequest.newBuilder()
+              .setCmdType(Type.ReviseLostShuffles)
+              .setRequestId(requestId)
+              .setReviseLostShufflesRequest(
+                  ResourceProtos.ReviseLostShufflesRequest.newBuilder()
+                      .setAppId(appId)
+                      .addAllLostShuffles(shuffles)
+                      .build())
+              .build());
+    } catch (CelebornRuntimeException e) {
+      LOG.error("Handle revise lost shuffle failed!", e);
+      throw e;
+    }
+  }
+
   @Override
   public void handleWorkerLost(
       String host, int rpcPort, int pushPort, int fetchPort, int 
replicatePort, String requestId) {
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index a7948dde7..4ec1dac37 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,7 +102,16 @@ public class MetaHandler {
       Map<UserIdentifier, ResourceConsumption> userResourceConsumption;
       Map<String, Long> estimatedAppDiskUsage = new HashMap<>();
       WorkerStatus workerStatus;
+      List<Integer> lostShuffles;
       switch (cmdType) {
+        case ReviseLostShuffles:
+          appId = request.getReviseLostShufflesRequest().getAppId();
+          lostShuffles = 
request.getReviseLostShufflesRequest().getLostShufflesList();
+          LOG.info(
+              "Handle revise lost shuffles for {} {}", appId, 
StringUtils.join(lostShuffles, ","));
+          metaSystem.reviseLostShuffles(appId, lostShuffles);
+          break;
+
         case RequestSlots:
           shuffleKey = request.getRequestSlotsRequest().getShuffleKey();
           LOG.debug("Handle request slots for {}", shuffleKey);
diff --git a/master/src/main/proto/Resource.proto 
b/master/src/main/proto/Resource.proto
index b83599409..acb1d6097 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -41,6 +41,8 @@ enum Type {
   ApplicationMeta = 26;
   ReportWorkerDecommission = 27;
   BatchUnRegisterShuffle = 28;
+
+  ReviseLostShuffles = 29;
 }
 
 enum WorkerEventType {
@@ -75,6 +77,7 @@ message ResourceRequest {
   optional ApplicationMetaRequest applicationMetaRequest = 23;
   optional ReportWorkerDecommissionRequest reportWorkerDecommissionRequest = 
24;
   optional BatchUnregisterShuffleRequest batchUnregisterShuffleRequest = 25;
+  optional ReviseLostShufflesRequest reviseLostShufflesRequest = 102;
 }
 
 message DiskInfo {
@@ -243,3 +246,8 @@ message ApplicationMetaRequest {
   required string appId = 1;
   optional string secret = 2;
 }
+
+message ReviseLostShufflesRequest {
+  required string appId = 1 ;
+  repeated int32 lostShuffles = 2 ;
+}
\ No newline at end of file
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index ec6e550bb..86baaa921 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy.master
 import java.io.IOException
 import java.net.BindException
 import java.util
+import java.util.Collections
 import java.util.concurrent.{ExecutorService, ScheduledFuture, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.function.ToLongFunction
@@ -224,7 +225,7 @@ private[celeborn] class Master(
 
   private var hadoopFs: util.Map[StorageInfo.Type, FileSystem] = _
   masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () =>
-    statusSystem.registeredShuffle.size
+    statusSystem.registeredShuffleCount
   }
   masterSource.addGauge(MasterSource.WORKER_COUNT) { () => 
statusSystem.workers.size }
   masterSource.addGauge(MasterSource.LOST_WORKER_COUNT) { () => 
statusSystem.lostWorkers.size }
@@ -530,6 +531,11 @@ private[celeborn] class Master(
         context,
         handleWorkerDecommission(context, workers, requestId))
 
+    case pb: PbReviseLostShuffles =>
+      executeWithLeaderChecker(
+        context,
+        handleReviseLostShuffle(context, pb.getAppId, pb.getLostShufflesList, 
pb.getRequestId))
+
     case pb: PbWorkerExclude =>
       val workersToAdd = new util.ArrayList[WorkerInfo](pb.getWorkersToAddList
         .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
@@ -684,7 +690,11 @@ private[celeborn] class Master(
 
     val expiredShuffleKeys = new util.HashSet[String]
     activeShuffleKeys.asScala.foreach { shuffleKey =>
-      if (!statusSystem.registeredShuffle.contains(shuffleKey)) {
+      val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
+      val shuffleIds = statusSystem.registeredAppAndShuffles.get(appId)
+      if (shuffleIds == null || !shuffleIds.contains(shuffleId)) {
+        logWarning(
+          s"Shuffle $shuffleKey expired on 
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
         expiredShuffleKeys.add(shuffleKey)
       }
     }
@@ -704,6 +714,23 @@ private[celeborn] class Master(
     }
   }
 
+  private def handleReviseLostShuffle(
+      context: RpcCallContext,
+      appId: String,
+      lostShuffles: java.util.List[Integer],
+      requestId: String) = {
+    try {
+      logInfo(s"Handle lost shuffles for ${appId} ${lostShuffles} ")
+      statusSystem.handleReviseLostShuffles(appId, lostShuffles, requestId);
+      if (context != null) {
+        context.reply(ReviseLostShufflesResponse(true, ""))
+      }
+    } catch {
+      case e: Exception =>
+        context.reply(ReviseLostShufflesResponse(false, e.getMessage))
+    }
+  }
+
   private def handleWorkerExclude(
       context: RpcCallContext,
       workersToAdd: util.List[WorkerInfo],
@@ -1099,13 +1126,16 @@ private[celeborn] class Master(
     if (shouldResponse) {
       // UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
       // during serialization of HeartbeatFromApplicationResponse
+      var appRelatedShuffles =
+        statusSystem.registeredAppAndShuffles.getOrDefault(appId, 
Collections.emptySet())
       context.reply(HeartbeatFromApplicationResponse(
         StatusCode.SUCCESS,
         new util.ArrayList(
           (statusSystem.excludedWorkers.asScala ++ 
statusSystem.manuallyExcludedWorkers.asScala).asJava),
         needCheckedWorkerList,
         new util.ArrayList[WorkerInfo](
-          (statusSystem.shutdownWorkers.asScala ++ 
statusSystem.decommissionWorkers.asScala).asJava)))
+          (statusSystem.shutdownWorkers.asScala ++ 
statusSystem.decommissionWorkers.asScala).asJava),
+        new util.ArrayList(appRelatedShuffles)))
     } else {
       context.reply(OneWayMessageResponse)
     }
@@ -1339,8 +1369,11 @@ private[celeborn] class Master(
   override def getShuffleList: String = {
     val sb = new StringBuilder
     sb.append("======================= Shuffle Key List 
============================\n")
-    statusSystem.registeredShuffle.asScala.foreach { shuffleKey =>
-      sb.append(s"$shuffleKey\n")
+    statusSystem.registeredAppAndShuffles.asScala.foreach { shuffleKey =>
+      val appId = shuffleKey._1
+      shuffleKey._2.asScala.foreach { id =>
+        sb.append(s"$appId-${id}\n")
+      }
     }
     sb.toString()
   }
@@ -1442,6 +1475,14 @@ private[celeborn] class Master(
     }
   }
 
+  override def reviseLostShuffles(appId: String, shuffles: 
java.util.List[Integer]): Unit = {
+    statusSystem.reviseLostShuffles(appId, shuffles)
+  }
+
+  override def deleteApps(appIds: String): Unit = {
+    appIds.split(",").foreach(id => statusSystem.deleteApp(id))
+  }
+
   override def getWorkerEventInfo(): String = {
     val sb = new StringBuilder
     sb.append("======================= Workers Event in Master 
========================\n")
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
index ac326c5fb..447a9b1b2 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
@@ -142,4 +142,5 @@ class ApiMasterResource extends ApiRequestContext {
       workerList)._2)
     sb.toString()
   }
+
 }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
index 7c08e335e..18cab93e8 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
@@ -17,7 +17,8 @@
 
 package org.apache.celeborn.service.deploy.master.http.api.v1
 
-import javax.ws.rs.{Consumes, GET, Path, Produces}
+import java.util
+import javax.ws.rs.{Consumes, GET, Path, Produces, QueryParam}
 import javax.ws.rs.core.MediaType
 
 import scala.collection.JavaConverters._
@@ -27,7 +28,7 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse
 import io.swagger.v3.oas.annotations.tags.Tag
 
 import org.apache.celeborn.common.util.Utils
-import org.apache.celeborn.rest.v1.model.{AppDiskUsageData, 
AppDiskUsageSnapshotData, AppDiskUsageSnapshotsResponse, 
ApplicationHeartbeatData, ApplicationsHeartbeatResponse, HostnamesResponse}
+import org.apache.celeborn.rest.v1.model.{AppDiskUsageData, 
AppDiskUsageSnapshotData, AppDiskUsageSnapshotsResponse, 
ApplicationHeartbeatData, ApplicationsHeartbeatResponse, HandleResponse, 
HostnamesResponse}
 import org.apache.celeborn.server.common.http.api.ApiRequestContext
 import org.apache.celeborn.service.deploy.master.Master
 
@@ -94,4 +95,42 @@ class ApplicationResource extends ApiRequestContext {
   def hostnames(): HostnamesResponse = {
     new 
HostnamesResponse().hostnames(statusSystem.hostnameSet.asScala.toSeq.asJava)
   }
+
+  @Path("/reviseLostShuffles")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON,
+      schema = new Schema(implementation = classOf[HandleResponse]))),
+    description =
+      "Revise lost shuffles")
+  @GET
+  def reviseLostShuffles(
+      @QueryParam("app") appId: String,
+      @QueryParam("shuffleIds") shufflesIds: String): HandleResponse = {
+    val shuffles = new util.ArrayList[Integer]()
+    shufflesIds.split(",").foreach { p =>
+      shuffles.add(Integer.parseInt(p))
+    }
+    if (!shuffles.isEmpty) {
+      httpService.reviseLostShuffles(appId, shuffles)
+    }
+    new HandleResponse().success(true).message("revise lost shuffle done")
+  }
+
+  @Path("/deleteApps")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON,
+      schema = new Schema(implementation = classOf[HandleResponse]))),
+    description =
+      "Delete resource of an app")
+  @GET
+  def deleteApp(
+      @QueryParam("apps") apps: String): HandleResponse = {
+    httpService.deleteApps(apps)
+    new HandleResponse().success(true).message(s"delete shuffles of app 
${apps}")
+  }
+
 }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ShuffleResource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ShuffleResource.scala
index 4b7319d46..fbdcd920e 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ShuffleResource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ShuffleResource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.celeborn.service.deploy.master.http.api.v1
 
+import java.util
 import javax.ws.rs.{Consumes, GET, Produces}
 import javax.ws.rs.core.MediaType
 
@@ -44,6 +45,13 @@ class ShuffleResource extends ApiRequestContext {
       "List all running shuffle keys of the service. It will return all 
running shuffle's key of the cluster.")
   @GET
   def shuffles: ShufflesResponse = {
-    new 
ShufflesResponse().shuffleIds(statusSystem.registeredShuffle.asScala.toSeq.asJava)
+    val shuffles = new util.ArrayList[String]()
+    statusSystem.registeredAppAndShuffles.asScala.foreach { shuffleKey =>
+      val appId = shuffleKey._1
+      shuffleKey._2.asScala.foreach { id =>
+        shuffles.add(s"$appId-${id}")
+      }
+    }
+    new ShufflesResponse().shuffleIds(shuffles)
   }
 }
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index b571e49f6..65660158e 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -472,11 +472,11 @@ public class DefaultMetaSystemSuiteJ {
     statusSystem.handleApplicationMeta(new ApplicationMeta(APPID1, 
"testSecret"));
     statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, 
getNewReqeustId());
 
-    assertEquals(1, statusSystem.registeredShuffle.size());
+    assertEquals(1, statusSystem.registeredAppAndShuffles.size());
     assertEquals(1, statusSystem.applicationMetas.size());
     statusSystem.handleAppLost(APPID1, getNewReqeustId());
 
-    assertTrue(statusSystem.registeredShuffle.isEmpty());
+    assertTrue(statusSystem.registeredAppAndShuffles.isEmpty());
     assertTrue(statusSystem.applicationMetas.isEmpty());
   }
 
@@ -545,11 +545,11 @@ public class DefaultMetaSystemSuiteJ {
 
     statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, 
getNewReqeustId());
 
-    assertEquals(1, statusSystem.registeredShuffle.size());
+    assertEquals(1, statusSystem.registeredAppAndShuffles.size());
 
     statusSystem.handleUnRegisterShuffle(SHUFFLEKEY1, getNewReqeustId());
 
-    assertTrue(statusSystem.registeredShuffle.isEmpty());
+    assertTrue(statusSystem.registeredAppAndShuffles.isEmpty());
   }
 
   @Test
@@ -621,17 +621,17 @@ public class DefaultMetaSystemSuiteJ {
       shuffleKeysAll.add(shuffleKey);
       statusSystem.handleRequestSlots(shuffleKey, HOSTNAME1, 
workersToAllocate, getNewReqeustId());
     }
-    Assert.assertEquals(4, statusSystem.registeredShuffle.size());
+    Assert.assertEquals(4, statusSystem.registeredShuffleCount());
 
     List<String> shuffleKeys1 = new ArrayList<>();
     shuffleKeys1.add(shuffleKeysAll.get(0));
 
     statusSystem.handleBatchUnRegisterShuffles(shuffleKeys1, 
getNewReqeustId());
-    Assert.assertEquals(3, statusSystem.registeredShuffle.size());
+    Assert.assertEquals(3, statusSystem.registeredShuffleCount());
 
     statusSystem.handleBatchUnRegisterShuffles(shuffleKeysAll, 
getNewReqeustId());
 
-    Assert.assertTrue(statusSystem.registeredShuffle.isEmpty());
+    Assert.assertTrue(statusSystem.registeredShuffleCount() == 0);
   }
 
   @Test
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 851d87c1c..4f2fae78a 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -751,16 +751,16 @@ public class RatisMasterStatusSystemSuiteJ {
     statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, 
getNewReqeustId());
     Thread.sleep(3000L);
 
-    Assert.assertEquals(1, STATUSSYSTEM1.registeredShuffle.size());
-    Assert.assertEquals(1, STATUSSYSTEM2.registeredShuffle.size());
-    Assert.assertEquals(1, STATUSSYSTEM3.registeredShuffle.size());
+    Assert.assertEquals(1, STATUSSYSTEM1.registeredAppAndShuffles.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.registeredAppAndShuffles.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.registeredAppAndShuffles.size());
 
     statusSystem.handleAppLost(APPID1, getNewReqeustId());
     Thread.sleep(3000L);
 
-    Assert.assertTrue(STATUSSYSTEM1.registeredShuffle.isEmpty());
-    Assert.assertTrue(STATUSSYSTEM2.registeredShuffle.isEmpty());
-    Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty());
+    Assert.assertTrue(STATUSSYSTEM1.registeredAppAndShuffles.isEmpty());
+    Assert.assertTrue(STATUSSYSTEM2.registeredAppAndShuffles.isEmpty());
+    Assert.assertTrue(STATUSSYSTEM3.registeredAppAndShuffles.isEmpty());
   }
 
   @Test
@@ -832,16 +832,16 @@ public class RatisMasterStatusSystemSuiteJ {
     statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, 
getNewReqeustId());
     Thread.sleep(3000L);
 
-    Assert.assertEquals(1, STATUSSYSTEM1.registeredShuffle.size());
-    Assert.assertEquals(1, STATUSSYSTEM2.registeredShuffle.size());
-    Assert.assertEquals(1, STATUSSYSTEM3.registeredShuffle.size());
+    Assert.assertEquals(1, STATUSSYSTEM1.registeredAppAndShuffles.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.registeredAppAndShuffles.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.registeredAppAndShuffles.size());
 
     statusSystem.handleUnRegisterShuffle(SHUFFLEKEY1, getNewReqeustId());
     Thread.sleep(3000L);
 
-    Assert.assertTrue(STATUSSYSTEM1.registeredShuffle.isEmpty());
-    Assert.assertTrue(STATUSSYSTEM2.registeredShuffle.isEmpty());
-    Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty());
+    Assert.assertTrue(STATUSSYSTEM1.registeredAppAndShuffles.isEmpty());
+    Assert.assertTrue(STATUSSYSTEM2.registeredAppAndShuffles.isEmpty());
+    Assert.assertTrue(STATUSSYSTEM3.registeredAppAndShuffles.isEmpty());
   }
 
   @Test
@@ -919,25 +919,25 @@ public class RatisMasterStatusSystemSuiteJ {
 
     Thread.sleep(3000L);
 
-    Assert.assertEquals(4, STATUSSYSTEM1.registeredShuffle.size());
-    Assert.assertEquals(4, STATUSSYSTEM2.registeredShuffle.size());
-    Assert.assertEquals(4, STATUSSYSTEM3.registeredShuffle.size());
+    Assert.assertEquals(4, STATUSSYSTEM1.registeredShuffleCount());
+    Assert.assertEquals(4, STATUSSYSTEM2.registeredShuffleCount());
+    Assert.assertEquals(4, STATUSSYSTEM3.registeredShuffleCount());
 
     List<String> shuffleKeys1 = new ArrayList<>();
     shuffleKeys1.add(shuffleKeysAll.get(0));
 
     statusSystem.handleBatchUnRegisterShuffles(shuffleKeys1, 
getNewReqeustId());
     Thread.sleep(3000L);
-    Assert.assertEquals(3, STATUSSYSTEM1.registeredShuffle.size());
-    Assert.assertEquals(3, STATUSSYSTEM2.registeredShuffle.size());
-    Assert.assertEquals(3, STATUSSYSTEM3.registeredShuffle.size());
+    Assert.assertEquals(3, STATUSSYSTEM1.registeredShuffleCount());
+    Assert.assertEquals(3, STATUSSYSTEM2.registeredShuffleCount());
+    Assert.assertEquals(3, STATUSSYSTEM3.registeredShuffleCount());
 
     statusSystem.handleBatchUnRegisterShuffles(shuffleKeysAll, 
getNewReqeustId());
     Thread.sleep(3000L);
 
-    Assert.assertTrue(STATUSSYSTEM1.registeredShuffle.isEmpty());
-    Assert.assertTrue(STATUSSYSTEM2.registeredShuffle.isEmpty());
-    Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty());
+    Assert.assertTrue(STATUSSYSTEM1.registeredShuffleCount() == 0);
+    Assert.assertTrue(STATUSSYSTEM2.registeredShuffleCount() == 0);
+    Assert.assertTrue(STATUSSYSTEM3.registeredShuffleCount() == 0);
   }
 
   @Test
@@ -1085,21 +1085,21 @@ public class RatisMasterStatusSystemSuiteJ {
 
   @Before
   public void resetStatus() {
-    STATUSSYSTEM1.registeredShuffle.clear();
+    STATUSSYSTEM1.registeredAppAndShuffles.clear();
     STATUSSYSTEM1.hostnameSet.clear();
     STATUSSYSTEM1.workers.clear();
     STATUSSYSTEM1.appHeartbeatTime.clear();
     STATUSSYSTEM1.excludedWorkers.clear();
     STATUSSYSTEM1.workerLostEvents.clear();
 
-    STATUSSYSTEM2.registeredShuffle.clear();
+    STATUSSYSTEM2.registeredAppAndShuffles.clear();
     STATUSSYSTEM2.hostnameSet.clear();
     STATUSSYSTEM2.workers.clear();
     STATUSSYSTEM2.appHeartbeatTime.clear();
     STATUSSYSTEM2.excludedWorkers.clear();
     STATUSSYSTEM2.workerLostEvents.clear();
 
-    STATUSSYSTEM3.registeredShuffle.clear();
+    STATUSSYSTEM3.registeredAppAndShuffles.clear();
     STATUSSYSTEM3.hostnameSet.clear();
     STATUSSYSTEM3.workers.clear();
     STATUSSYSTEM3.appHeartbeatTime.clear();
@@ -1414,6 +1414,19 @@ public class RatisMasterStatusSystemSuiteJ {
         STATUSSYSTEM1.workerEventInfos.get(workerInfo2).getEventType());
   }
 
+  @Test
+  public void testReviseShuffles() throws InterruptedException {
+    AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+    Assert.assertNotNull(statusSystem);
+
+    statusSystem.handleReviseLostShuffles(
+        "app-1", Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), getNewReqeustId());
+    Thread.sleep(1000l);
+    Assert.assertEquals(STATUSSYSTEM1.registeredShuffleCount(), 8);
+    Assert.assertEquals(STATUSSYSTEM2.registeredShuffleCount(), 8);
+    Assert.assertEquals(STATUSSYSTEM3.registeredShuffleCount(), 8);
+  }
+
   @AfterClass
   public static void testNotifyLogFailed() {
     List<HARaftServer> list = Arrays.asList(RATISSERVER1, RATISSERVER2, 
RATISSERVER3);
diff --git 
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
index 281b821b2..2afac4729 100644
--- 
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
+++ 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
@@ -27,6 +27,7 @@ import org.apache.celeborn.rest.v1.master.invoker.Pair;
 
 import org.apache.celeborn.rest.v1.model.AppDiskUsageSnapshotsResponse;
 import org.apache.celeborn.rest.v1.model.ApplicationsHeartbeatResponse;
+import org.apache.celeborn.rest.v1.model.HandleResponse;
 import org.apache.celeborn.rest.v1.model.HostnamesResponse;
 
 
@@ -48,6 +49,76 @@ public class ApplicationApi extends BaseApi {
     super(apiClient);
   }
 
+  /**
+   * 
+   * Delete resource of apps
+   * @param apps  (optional)
+   * @return HandleResponse
+   * @throws ApiException if fails to make API call
+   */
+  public HandleResponse deleteApps(String apps) throws ApiException {
+    return this.deleteApps(apps, Collections.emptyMap());
+  }
+
+
+  /**
+   * 
+   * Delete resource of apps
+   * @param apps  (optional)
+   * @param additionalHeaders additionalHeaders for this call
+   * @return HandleResponse
+   * @throws ApiException if fails to make API call
+   */
+  public HandleResponse deleteApps(String apps, Map<String, String> 
additionalHeaders) throws ApiException {
+    Object localVarPostBody = null;
+    
+    // create path and map variables
+    String localVarPath = "/api/v1/applications/deleteApps";
+
+    StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
+    String localVarQueryParameterBaseName;
+    List<Pair> localVarQueryParams = new ArrayList<Pair>();
+    List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
+    Map<String, String> localVarHeaderParams = new HashMap<String, String>();
+    Map<String, String> localVarCookieParams = new HashMap<String, String>();
+    Map<String, Object> localVarFormParams = new HashMap<String, Object>();
+
+    localVarQueryParams.addAll(apiClient.parameterToPair("apps", apps));
+    
+    localVarHeaderParams.putAll(additionalHeaders);
+
+    
+    
+    final String[] localVarAccepts = {
+      "application/json"
+    };
+    final String localVarAccept = 
apiClient.selectHeaderAccept(localVarAccepts);
+
+    final String[] localVarContentTypes = {
+      
+    };
+    final String localVarContentType = 
apiClient.selectHeaderContentType(localVarContentTypes);
+
+    String[] localVarAuthNames = new String[] { "basic" };
+
+    TypeReference<HandleResponse> localVarReturnType = new 
TypeReference<HandleResponse>() {};
+    return apiClient.invokeAPI(
+        localVarPath,
+        "GET",
+        localVarQueryParams,
+        localVarCollectionQueryParams,
+        localVarQueryStringJoiner.toString(),
+        localVarPostBody,
+        localVarHeaderParams,
+        localVarCookieParams,
+        localVarFormParams,
+        localVarAccept,
+        localVarContentType,
+        localVarAuthNames,
+        localVarReturnType
+    );
+  }
+
   /**
    * 
    * List all running application&#39;s LifecycleManager&#39;s hostnames of 
the cluster.
@@ -249,6 +320,79 @@ public class ApplicationApi extends BaseApi {
     );
   }
 
+  /**
+   * 
+   * Revise lost shuffles or delete shuffles of an application.
+   * @param app  (optional)
+   * @param shuffleIds  (optional)
+   * @return HandleResponse
+   * @throws ApiException if fails to make API call
+   */
+  public HandleResponse reviseLostShuffles(String app, String shuffleIds) 
throws ApiException {
+    return this.reviseLostShuffles(app, shuffleIds, Collections.emptyMap());
+  }
+
+
+  /**
+   * 
+   * Revise lost shuffles or delete shuffles of an application.
+   * @param app  (optional)
+   * @param shuffleIds  (optional)
+   * @param additionalHeaders additionalHeaders for this call
+   * @return HandleResponse
+   * @throws ApiException if fails to make API call
+   */
+  public HandleResponse reviseLostShuffles(String app, String shuffleIds, 
Map<String, String> additionalHeaders) throws ApiException {
+    Object localVarPostBody = null;
+    
+    // create path and map variables
+    String localVarPath = "/api/v1/applications/reviseLostShuffles";
+
+    StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
+    String localVarQueryParameterBaseName;
+    List<Pair> localVarQueryParams = new ArrayList<Pair>();
+    List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
+    Map<String, String> localVarHeaderParams = new HashMap<String, String>();
+    Map<String, String> localVarCookieParams = new HashMap<String, String>();
+    Map<String, Object> localVarFormParams = new HashMap<String, Object>();
+
+    localVarQueryParams.addAll(apiClient.parameterToPair("app", app));
+    localVarQueryParams.addAll(apiClient.parameterToPair("shuffleIds", 
shuffleIds));
+    
+    localVarHeaderParams.putAll(additionalHeaders);
+
+    
+    
+    final String[] localVarAccepts = {
+      "application/json"
+    };
+    final String localVarAccept = 
apiClient.selectHeaderAccept(localVarAccepts);
+
+    final String[] localVarContentTypes = {
+      
+    };
+    final String localVarContentType = 
apiClient.selectHeaderContentType(localVarContentTypes);
+
+    String[] localVarAuthNames = new String[] { "basic" };
+
+    TypeReference<HandleResponse> localVarReturnType = new 
TypeReference<HandleResponse>() {};
+    return apiClient.invokeAPI(
+        localVarPath,
+        "GET",
+        localVarQueryParams,
+        localVarCollectionQueryParams,
+        localVarQueryStringJoiner.toString(),
+        localVarPostBody,
+        localVarHeaderParams,
+        localVarCookieParams,
+        localVarFormParams,
+        localVarAccept,
+        localVarContentType,
+        localVarAuthNames,
+        localVarReturnType
+    );
+  }
+
   @Override
   public <T> T invokeAPI(String url, String method, Object request, 
TypeReference<T> returnType, Map<String, String> additionalHeaders) throws 
ApiException {
     String localVarPath = url.replace(apiClient.getBaseURL(), "");
diff --git 
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ContainerInfo.java
 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ContainerInfo.java
index e1ef4fb8c..4c9dfe81f 100644
--- 
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ContainerInfo.java
+++ 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ContainerInfo.java
@@ -44,7 +44,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
   ContainerInfo.JSON_PROPERTY_CONTAINER_CLUSTER,
   ContainerInfo.JSON_PROPERTY_CONTAINER_TAGS
 })
[email protected](value = 
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator 
version: 7.7.0")
[email protected](value = 
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator 
version: 7.8.0")
 public class ContainerInfo {
   public static final String JSON_PROPERTY_CONTAINER_NAME = "containerName";
   private String containerName;
diff --git a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml 
b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
index 7c8d5cd67..60528efcb 100644
--- a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
@@ -270,6 +270,52 @@ paths:
               schema:
                 $ref: '#/components/schemas/HostnamesResponse'
 
+  /api/v1/applications/reviseLostShuffles:
+    get:
+      tags:
+        - Application
+      operationId: reviseLostShuffles
+      description: Revise lost shuffles or delete shuffles of an application.
+      parameters:
+        - name: app
+          in: query
+          required: false
+          schema:
+            type: string
+        - name: shuffleIds
+          in: query
+          required: false
+          schema:
+            type: string
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HandleResponse'
+
+  /api/v1/applications/deleteApps:
+    get:
+      tags:
+        - Application
+      operationId: deleteApps
+      description: Delete resource of apps
+      parameters:
+        - name: apps
+          in: query
+          required: false
+          schema:
+            type: string
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HandleResponse'
+
+
   /api/v1/ratis/election/transfer:
     post:
       tags:
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index bb98e88f6..e8610dfdc 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -194,6 +194,10 @@ abstract class HttpService extends Service with Logging {
 
   def getWorkerEventInfo(): String = throw new UnsupportedOperationException()
 
+  def reviseLostShuffles(appId: String, shuffles: java.util.List[Integer])
+
+  def deleteApps(appIds: String)
+
   def startHttpServer(): Unit = {
     httpServer = HttpServer(
       serviceName,
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala
index 3b7d74df8..d689ead7e 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala
@@ -28,7 +28,6 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 import org.apache.celeborn.client.{LifecycleManager, WithShuffleClientSuite}
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.protocol.message.StatusCode
-import org.apache.celeborn.common.util.Utils
 import org.apache.celeborn.service.deploy.MiniClusterFeature
 
 class LifecycleManagerUnregisterShuffleSuite extends WithShuffleClientSuite
@@ -60,8 +59,8 @@ class LifecycleManagerUnregisterShuffleSuite extends 
WithShuffleClientSuite
       assert(res.status == StatusCode.SUCCESS)
       lifecycleManager.registeredShuffle.add(shuffleId)
       assert(lifecycleManager.registeredShuffle.contains(shuffleId))
-      val shuffleKey = Utils.makeShuffleKey(APP, shuffleId)
-      assert(masterInfo._1.statusSystem.registeredShuffle.contains(shuffleKey))
+      
assert(masterInfo._1.statusSystem.registeredAppAndShuffles.containsKey(APP))
+      
assert(masterInfo._1.statusSystem.registeredAppAndShuffles.get(APP).contains(shuffleId))
       lifecycleManager.commitManager.setStageEnd(shuffleId)
     }
 
@@ -71,9 +70,10 @@ class LifecycleManagerUnregisterShuffleSuite extends 
WithShuffleClientSuite
     // after unregister shuffle
     eventually(timeout(120.seconds), interval(2.seconds)) {
       shuffleIds.foreach { shuffleId: Int =>
-        val shuffleKey = Utils.makeShuffleKey(APP, shuffleId)
         assert(!lifecycleManager.registeredShuffle.contains(shuffleId))
-        
assert(!masterInfo._1.statusSystem.registeredShuffle.contains(shuffleKey))
+        val containShuffleKey = 
masterInfo._1.statusSystem.registeredAppAndShuffles.containsKey(
+          APP) && 
masterInfo._1.statusSystem.registeredAppAndShuffles.get(APP).contains(shuffleId)
+        assert(!containShuffleKey)
       }
     }
 
@@ -93,8 +93,8 @@ class LifecycleManagerUnregisterShuffleSuite extends 
WithShuffleClientSuite
       assert(res.status == StatusCode.SUCCESS)
       lifecycleManager.registeredShuffle.add(shuffleId)
       assert(lifecycleManager.registeredShuffle.contains(shuffleId))
-      val shuffleKey = Utils.makeShuffleKey(APP, shuffleId)
-      assert(masterInfo._1.statusSystem.registeredShuffle.contains(shuffleKey))
+      
assert(masterInfo._1.statusSystem.registeredAppAndShuffles.containsKey(APP))
+      
assert(masterInfo._1.statusSystem.registeredAppAndShuffles.get(APP).contains(shuffleId))
       lifecycleManager.commitManager.setStageEnd(shuffleId)
     }
     val previousTime = System.currentTimeMillis()
@@ -104,9 +104,10 @@ class LifecycleManagerUnregisterShuffleSuite extends 
WithShuffleClientSuite
     // after unregister shuffle
     eventually(timeout(120.seconds), interval(2.seconds)) {
       shuffleIds.foreach { shuffleId: Int =>
-        val shuffleKey = Utils.makeShuffleKey(APP, shuffleId)
         assert(!lifecycleManager.registeredShuffle.contains(shuffleId))
-        
assert(!masterInfo._1.statusSystem.registeredShuffle.contains(shuffleKey))
+        val containShuffleKey = 
masterInfo._1.statusSystem.registeredAppAndShuffles.containsKey(
+          APP) && 
masterInfo._1.statusSystem.registeredAppAndShuffles.get(APP).contains(shuffleId)
+        assert(!containShuffleKey)
       }
     }
     val currentTime = System.currentTimeMillis()
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index b9bfff05b..62602cd06 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -866,6 +866,11 @@ private[celeborn] class Worker(
     sb.toString()
   }
 
+  override def reviseLostShuffles(appId: String, shuffles: 
java.util.List[Integer]): Unit =
+    throw new UnsupportedOperationException()
+
+  override def deleteApps(appIds: String): Unit = throw new 
UnsupportedOperationException()
+
   override def exit(exitType: String): String = {
     exitType.toUpperCase(Locale.ROOT) match {
       case "DECOMMISSION" =>

Reply via email to