This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new df01fadc9 [CELEBORN-1601] Support revise lost shuffles
df01fadc9 is described below
commit df01fadc9fa7dd9c2781105c82962dbb8f096603
Author: mingji <[email protected]>
AuthorDate: Mon Oct 21 16:44:37 2024 +0800
[CELEBORN-1601] Support revise lost shuffles
### What changes were proposed in this pull request?
To support revising lost shuffle IDs in a long-running job such as flink
batch jobs.
### Why are the changes needed?
1. To support revise lost shuffles.
2. To add an HTTP endpoint to revise lost shuffles manually.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Cluster tests.
Closes #2746 from FMX/b1600.
Lead-authored-by: mingji <[email protected]>
Co-authored-by: Ethan Feng <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../apache/celeborn/cli/common/CommonOptions.scala | 7 +
.../apache/celeborn/cli/master/MasterOptions.scala | 10 ++
.../celeborn/cli/master/MasterSubcommand.scala | 7 +
.../celeborn/cli/master/MasterSubcommandImpl.scala | 18 +++
.../cli/master/ReviseLostShuffleOptions.scala | 29 +++++
.../celeborn/cli/TestCelebornCliCommands.scala | 26 ++++
.../celeborn/client/ApplicationHeartbeater.scala | 38 +++++-
.../apache/celeborn/client/LifecycleManager.scala | 3 +-
.../celeborn/client/WorkerStatusTrackerSuite.scala | 3 +-
common/src/main/proto/TransportMessages.proto | 14 ++
.../org/apache/celeborn/common/CelebornConf.scala | 9 ++
.../common/protocol/message/ControlMessages.scala | 36 +++++-
.../apache/celeborn/common/util/PbSerDeUtils.scala | 6 +-
docs/configuration/client.md | 1 +
.../master/clustermeta/AbstractMetaManager.java | 73 ++++++++---
.../master/clustermeta/IMetadataHandler.java | 2 +
.../clustermeta/SingleMasterMetaManager.java | 5 +
.../master/clustermeta/ha/HAMasterMetaManager.java | 19 +++
.../deploy/master/clustermeta/ha/MetaHandler.java | 10 ++
master/src/main/proto/Resource.proto | 8 ++
.../celeborn/service/deploy/master/Master.scala | 51 +++++++-
.../deploy/master/http/api/ApiMasterResource.scala | 1 +
.../master/http/api/v1/ApplicationResource.scala | 43 +++++-
.../master/http/api/v1/ShuffleResource.scala | 10 +-
.../clustermeta/DefaultMetaSystemSuiteJ.java | 14 +-
.../ha/RatisMasterStatusSystemSuiteJ.java | 61 +++++----
.../celeborn/rest/v1/master/ApplicationApi.java | 144 +++++++++++++++++++++
.../celeborn/rest/v1/model/ContainerInfo.java | 2 +-
.../src/main/openapi3/master_rest_v1.yaml | 46 +++++++
.../celeborn/server/common/HttpService.scala | 4 +
.../LifecycleManagerUnregisterShuffleSuite.scala | 19 +--
.../celeborn/service/deploy/worker/Worker.scala | 5 +
32 files changed, 647 insertions(+), 77 deletions(-)
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
index 507d5374f..69c243f94 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
@@ -71,4 +71,11 @@ class CommonOptions {
paramLabel = "username",
description = Array("The username of the TENANT_USER level."))
private[cli] var configName: String = _
+
+ @Option(
+ names = Array("--apps"),
+ paramLabel = "appId",
+ description = Array("The application Id list seperated by comma."))
+ private[cli] var apps: String = _
+
}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
index d69a2e6bf..840a54d23 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
@@ -110,4 +110,14 @@ final class MasterOptions {
names = Array("--remove-workers-unavailable-info"),
description = Array("Remove the workers unavailable info from the
master."))
private[master] var removeWorkersUnavailableInfo: Boolean = _
+
+ @Option(
+ names = Array("--revise-lost-shuffles"),
+ description = Array("Revise lost shuffles or remove shuffles for an
application."))
+ private[master] var reviseLostShuffles: Boolean = _
+
+ @Option(
+ names = Array("--delete-apps"),
+ description = Array("Delete resource of an application."))
+ private[master] var deleteApps: Boolean = _
}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
index 222650396..a875621e2 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
@@ -37,6 +37,9 @@ trait MasterSubcommand extends CliLogging {
@ArgGroup(exclusive = true, multiplicity = "1")
private[master] var masterOptions: MasterOptions = _
+ @ArgGroup(exclusive = false)
+ private[master] var reviseLostShuffleOptions: ReviseLostShuffleOptions = _
+
@Mixin
private[master] var commonOptions: CommonOptions = _
@@ -110,4 +113,8 @@ trait MasterSubcommand extends CliLogging {
private[master] def runShowThreadDump: ThreadStackResponse
+ private[master] def reviseLostShuffles: HandleResponse
+
+ private[master] def deleteApps: HandleResponse
+
}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
index 32f540f4b..70bd32c63 100644
---
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
+++
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
@@ -51,6 +51,8 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
if (masterOptions.showContainerInfo) log(runShowContainerInfo)
if (masterOptions.showDynamicConf) log(runShowDynamicConf)
if (masterOptions.showThreadDump) log(runShowThreadDump)
+ if (masterOptions.reviseLostShuffles) log(reviseLostShuffles)
+ if (masterOptions.deleteApps) log(deleteApps)
if (masterOptions.addClusterAlias != null &&
masterOptions.addClusterAlias.nonEmpty)
runAddClusterAlias
if (masterOptions.removeClusterAlias != null &&
masterOptions.removeClusterAlias.nonEmpty)
@@ -220,4 +222,20 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
}
private[master] def runShowContainerInfo: ContainerInfo =
defaultApi.getContainerInfo
+
+ override private[master] def reviseLostShuffles: HandleResponse = {
+ val app = commonOptions.apps
+ if (app.contains(",")) {
+ throw new ParameterException(
+ spec.commandLine(),
+ "Only one application id can be provided for this command.")
+ }
+ val shuffleIds = reviseLostShuffleOptions.shuffleIds
+ applicationApi.reviseLostShuffles(app, shuffleIds)
+ }
+
+ override private[master] def deleteApps: HandleResponse = {
+ val apps = commonOptions.apps
+ applicationApi.deleteApps(apps)
+ }
}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/ReviseLostShuffleOptions.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/ReviseLostShuffleOptions.scala
new file mode 100644
index 000000000..a0c4963fe
--- /dev/null
+++
b/cli/src/main/scala/org/apache/celeborn/cli/master/ReviseLostShuffleOptions.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.master
+
+import picocli.CommandLine.Option
+
+final class ReviseLostShuffleOptions {
+
+ @Option(
+ names = Array("--shuffleIds"),
+ description = Array("The shuffle ids to manipulate."))
+ private[master] var shuffleIds: String = _
+
+}
diff --git
a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
index 56b4ea213..1f450a19f 100644
--- a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
+++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
@@ -247,6 +247,32 @@ class TestCelebornCliCommands extends CelebornFunSuite
with MiniClusterFeature {
captureOutputAndValidateResponse(args, "success: true")
}
+ test("master --delete-apps case1") {
+ val args = prepareMasterArgs() ++ Array(
+ "--delete-apps",
+ "--apps",
+ "app1")
+ captureOutputAndValidateResponse(args, "success: true")
+ }
+
+ test("master --delete-apps case2") {
+ val args = prepareMasterArgs() ++ Array(
+ "--delete-apps",
+ "--apps",
+ "app1,app2")
+ captureOutputAndValidateResponse(args, "success: true")
+ }
+
+ test("master --revise-lost-shuffles case1") {
+ val args = prepareMasterArgs() ++ Array(
+ "--revise-lost-shuffles",
+ "--apps",
+ "app1",
+ "--shuffleIds",
+ "1,2,3,4,5,6")
+ captureOutputAndValidateResponse(args, "success: true")
+ }
+
private def prepareMasterArgs(): Array[String] = {
Array(
"master",
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index d3af38183..0b850ff52 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -17,14 +17,19 @@
package org.apache.celeborn.client
-import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util
+import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
+import java.util.function.Consumer
import scala.collection.JavaConverters._
+import org.apache.commons.lang3.StringUtils
+
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.internal.Logging
-import
org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost,
ApplicationLostResponse, HeartbeatFromApplication,
HeartbeatFromApplicationResponse, ZERO_UUID}
+import org.apache.celeborn.common.protocol.PbReviseLostShufflesResponse
+import
org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost,
ApplicationLostResponse, HeartbeatFromApplication,
HeartbeatFromApplicationResponse, ReviseLostShuffles, ZERO_UUID}
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.util.{ThreadUtils, Utils}
@@ -33,9 +38,11 @@ class ApplicationHeartbeater(
conf: CelebornConf,
masterClient: MasterClient,
shuffleMetrics: () => (Long, Long),
- workerStatusTracker: WorkerStatusTracker) extends Logging {
+ workerStatusTracker: WorkerStatusTracker,
+ registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean])
extends Logging {
private var stopped = false
+ private val reviseLostShuffles = conf.reviseLostShufflesEnabled
// Use independent app heartbeat threads to avoid being blocked by other
operations.
private val appHeartbeatIntervalMs = conf.appHeartbeatIntervalMs
@@ -68,6 +75,30 @@ class ApplicationHeartbeater(
if (response.statusCode == StatusCode.SUCCESS) {
logDebug("Successfully send app heartbeat.")
workerStatusTracker.handleHeartbeatResponse(response)
+ // revise shuffle id if there are lost shuffles
+ if (reviseLostShuffles) {
+ val masterRecordedShuffleIds = response.registeredShuffles
+ val localOnlyShuffles = new util.ArrayList[Integer]()
+ registeredShuffles.forEach(new Consumer[Int] {
+ override def accept(key: Int): Unit = {
+ localOnlyShuffles.add(key)
+ }
+ })
+ localOnlyShuffles.removeAll(masterRecordedShuffleIds)
+ if (!localOnlyShuffles.isEmpty) {
+ logWarning(
+ s"There are lost shuffle found
${StringUtils.join(localOnlyShuffles, ",")}, revise lost shuffles.")
+ val reviseLostShufflesResponse = masterClient.askSync(
+ ReviseLostShuffles.apply(appId, localOnlyShuffles,
MasterClient.genRequestId()),
+ classOf[PbReviseLostShufflesResponse])
+ if (!reviseLostShufflesResponse.getSuccess) {
+ logWarning(
+ s"Revise lost shuffles failed. Error message
:${reviseLostShufflesResponse.getMessage}")
+ } else {
+ logInfo("Revise lost shuffles succeed.")
+ }
+ }
+ }
}
} catch {
case it: InterruptedException =>
@@ -97,6 +128,7 @@ class ApplicationHeartbeater(
StatusCode.REQUEST_FAILED,
List.empty.asJava,
List.empty.asJava,
+ List.empty.asJava,
List.empty.asJava)
}
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index c4755fbf1..d35802958 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -210,7 +210,8 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
conf,
masterClient,
() => commitManager.commitMetrics(),
- workerStatusTracker)
+ workerStatusTracker,
+ registeredShuffle)
private val changePartitionManager = new ChangePartitionManager(conf, this)
private val releasePartitionManager = new ReleasePartitionManager(conf, this)
diff --git
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
index 1f606dd32..d2c21245d 100644
---
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
+++
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
@@ -95,7 +95,8 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
StatusCode.SUCCESS,
excludedWorkers,
unknownWorkers,
- shuttingWorkers)
+ shuttingWorkers,
+ new util.ArrayList[Integer]())
}
private def mockWorkers(workerHosts: Array[String]):
util.ArrayList[WorkerInfo] = {
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 7d5bb2c5e..bc8da08ab 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -109,6 +109,8 @@ enum MessageType {
NOTIFY_REQUIRED_SEGMENT = 86;
BATCH_UNREGISTER_SHUFFLES = 87;
BATCH_UNREGISTER_SHUFFLE_RESPONSE= 88;
+ REVISE_LOST_SHUFFLES = 89;
+ REVISE_LOST_SHUFFLES_RESPONSE = 90;
}
enum StreamType {
@@ -447,6 +449,7 @@ message PbHeartbeatFromApplicationResponse {
repeated PbWorkerInfo excludedWorkers = 2;
repeated PbWorkerInfo unknownWorkers = 3;
repeated PbWorkerInfo shuttingWorkers = 4;
+ repeated int32 registeredShuffles = 5;
}
message PbCheckQuota {
@@ -856,3 +859,14 @@ message PbReportWorkerDecommission {
repeated PbWorkerInfo workers = 1;
string requestId = 2;
}
+
+message PbReviseLostShuffles{
+ string appId = 1;
+ repeated int32 lostShuffles = 2;
+ string requestId = 3;
+}
+
+message PbReviseLostShufflesResponse{
+ bool success = 1;
+ string message = 2;
+}
\ No newline at end of file
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index bf1af9b8a..3ec230654 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1071,6 +1071,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def registerShuffleFilterExcludedWorkerEnabled: Boolean =
get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED)
+ def reviseLostShufflesEnabled: Boolean = get(REVISE_LOST_SHUFFLES_ENABLED)
// //////////////////////////////////////////////////////
// Worker //
@@ -5636,6 +5637,14 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
+ val REVISE_LOST_SHUFFLES_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.shuffle.reviseLostShuffles.enabled")
+ .categories("client")
+ .version("0.6.0")
+ .doc("Whether to revise lost shuffles.")
+ .booleanConf
+ .createWithDefault(false)
+
val NETWORK_IO_SASL_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.saslTimeout")
.categories("network")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index bf4e7f269..d0eb85b02 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -335,6 +335,26 @@ object ControlMessages extends Logging {
.build()
}
+ object ReviseLostShuffles {
+ def apply(
+ appId: String,
+ lostShuffles: java.util.List[Integer],
+ requestId: String): PbReviseLostShuffles =
+ PbReviseLostShuffles.newBuilder()
+ .setAppId(appId)
+ .addAllLostShuffles(lostShuffles)
+ .setRequestId(requestId)
+ .build()
+ }
+
+ object ReviseLostShufflesResponse {
+ def apply(success: Boolean, message: String): PbReviseLostShufflesResponse
=
+ PbReviseLostShufflesResponse.newBuilder()
+ .setSuccess(success)
+ .setMessage(message)
+ .build()
+ }
+
case class StageEnd(shuffleId: Int) extends MasterMessage
case class StageEndResponse(status: StatusCode)
@@ -400,7 +420,8 @@ object ControlMessages extends Logging {
statusCode: StatusCode,
excludedWorkers: util.List[WorkerInfo],
unknownWorkers: util.List[WorkerInfo],
- shuttingWorkers: util.List[WorkerInfo]) extends Message
+ shuttingWorkers: util.List[WorkerInfo],
+ registeredShuffles: util.List[Integer]) extends Message
case class CheckQuota(userIdentifier: UserIdentifier) extends Message
@@ -565,6 +586,12 @@ object ControlMessages extends Logging {
case pb: PbReportShuffleFetchFailureResponse =>
new TransportMessage(MessageType.REPORT_SHUFFLE_FETCH_FAILURE_RESPONSE,
pb.toByteArray)
+ case pb: PbReviseLostShuffles =>
+ new TransportMessage(MessageType.REVISE_LOST_SHUFFLES, pb.toByteArray)
+
+ case pb: PbReviseLostShufflesResponse =>
+ new TransportMessage(MessageType.REVISE_LOST_SHUFFLES_RESPONSE,
pb.toByteArray)
+
case pb: PbReportBarrierStageAttemptFailure =>
new TransportMessage(MessageType.REPORT_BARRIER_STAGE_ATTEMPT_FAILURE,
pb.toByteArray)
@@ -799,7 +826,8 @@ object ControlMessages extends Logging {
statusCode,
excludedWorkers,
unknownWorkers,
- shuttingWorkers) =>
+ shuttingWorkers,
+ registeredShuffles) =>
val payload = PbHeartbeatFromApplicationResponse.newBuilder()
.setStatus(statusCode.getValue)
.addAllExcludedWorkers(
@@ -808,6 +836,7 @@ object ControlMessages extends Logging {
unknownWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true,
true)).toList.asJava)
.addAllShuttingWorkers(
shuttingWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true,
true)).toList.asJava)
+ .addAllRegisteredShuffles(registeredShuffles)
.build().toByteArray
new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION_RESPONSE,
payload)
@@ -1191,7 +1220,8 @@ object ControlMessages extends Logging {
pbHeartbeatFromApplicationResponse.getUnknownWorkersList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
pbHeartbeatFromApplicationResponse.getShuttingWorkersList.asScala
- .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
+ .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
+ pbHeartbeatFromApplicationResponse.getRegisteredShufflesList)
case CHECK_QUOTA_VALUE =>
val pbCheckAvailable = PbCheckQuota.parseFrom(message.getPayload)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 30f00f179..f9f01b9a7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -450,7 +450,7 @@ object PbSerDeUtils {
def toPbSnapshotMetaInfo(
estimatedPartitionSize: java.lang.Long,
- registeredShuffle: java.util.Set[String],
+ registeredShuffle: java.util.Map[String, java.util.Set[Integer]],
hostnameSet: java.util.Set[String],
excludedWorkers: java.util.Set[WorkerInfo],
manuallyExcludedWorkers: java.util.Set[WorkerInfo],
@@ -468,7 +468,9 @@ object PbSerDeUtils {
decommissionWorkers: java.util.Set[WorkerInfo]): PbSnapshotMetaInfo = {
val builder = PbSnapshotMetaInfo.newBuilder()
.setEstimatedPartitionSize(estimatedPartitionSize)
- .addAllRegisteredShuffle(registeredShuffle)
+ .addAllRegisteredShuffle(registeredShuffle.asScala.flatMap {
appIdAndShuffleId =>
+ appIdAndShuffleId._2.asScala.map(i =>
Utils.makeShuffleKey(appIdAndShuffleId._1, i))
+ }.asJava)
.addAllHostnameSet(hostnameSet)
.addAllExcludedWorkers(excludedWorkers.asScala.map(toPbWorkerInfo(_,
true, false)).asJava)
.addAllManuallyExcludedWorkers(manuallyExcludedWorkers.asScala
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 91d1faac5..ce309a13f 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -103,6 +103,7 @@ license: |
| celeborn.client.shuffle.partitionSplit.threshold | 1G | false | Shuffle file
size threshold, if file size exceeds this, trigger split. | 0.3.0 |
celeborn.shuffle.partitionSplit.threshold |
| celeborn.client.shuffle.rangeReadFilter.enabled | false | false | If a spark
application have skewed partition, this value can set to true to improve
performance. | 0.2.0 | celeborn.shuffle.rangeReadFilter.enabled |
| celeborn.client.shuffle.register.filterExcludedWorker.enabled | false |
false | Whether to filter excluded worker when register shuffle. | 0.4.0 | |
+| celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether
to revise lost shuffles. | 0.6.0 | |
| celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that
slots of one shuffle can be allocated on. Will choose the smaller positive one
from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`.
| 0.3.1 | |
| celeborn.client.spark.fetch.throwsFetchFailure | false | false | client
throws FetchFailedException instead of CelebornIOException | 0.4.0 | |
| celeborn.client.spark.push.dynamicWriteMode.enabled | false | false |
Whether to dynamically switch push write mode based on conditions.If true,
shuffle mode will be only determined by partition count | 0.5.0 | |
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 1631e90d3..5c36dd23f 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -22,10 +22,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -33,6 +30,7 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import scala.Option;
+import scala.Tuple2;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@@ -63,7 +61,8 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractMetaManager.class);
// Metadata for master service
- public final Set<String> registeredShuffle = ConcurrentHashMap.newKeySet();
+ public final Map<String, Set<Integer>> registeredAppAndShuffles =
+ JavaUtils.newConcurrentHashMap();
public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
public final Set<WorkerInfo> workers = ConcurrentHashMap.newKeySet();
@@ -92,9 +91,12 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public void updateRequestSlotsMeta(
String shuffleKey, String hostName, Map<String, Map<String, Integer>>
workerWithAllocations) {
- registeredShuffle.add(shuffleKey);
+ Tuple2<String, Object> appIdShuffleId = Utils.splitShuffleKey(shuffleKey);
+ registeredAppAndShuffles
+ .computeIfAbsent(appIdShuffleId._1(), v -> new HashSet<>())
+ .add((Integer) appIdShuffleId._2);
- String appId = Utils.splitShuffleKey(shuffleKey)._1;
+ String appId = appIdShuffleId._1;
appHeartbeatTime.compute(
appId,
(applicationId, oldTimestamp) -> {
@@ -111,11 +113,29 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
}
public void updateUnregisterShuffleMeta(String shuffleKey) {
- registeredShuffle.remove(shuffleKey);
+ Tuple2<String, Object> appIdShuffleId = Utils.splitShuffleKey(shuffleKey);
+ Set<Integer> shuffleIds =
registeredAppAndShuffles.get(appIdShuffleId._1());
+ if (shuffleIds != null) {
+ shuffleIds.remove(appIdShuffleId._2);
+ registeredAppAndShuffles.compute(
+ appIdShuffleId._1(),
+ (s, shuffles) -> {
+ if (shuffles.size() == 0) {
+ return null;
+ }
+ return shuffles;
+ });
+ }
}
public void updateBatchUnregisterShuffleMeta(List<String> shuffleKeys) {
- registeredShuffle.removeAll(shuffleKeys);
+ for (String shuffleKey : shuffleKeys) {
+ Tuple2<String, Object> appIdShuffleId =
Utils.splitShuffleKey(shuffleKey);
+ String appId = appIdShuffleId._1;
+ if (registeredAppAndShuffles.containsKey(appId)) {
+ registeredAppAndShuffles.get(appId).remove(appIdShuffleId._2);
+ }
+ }
}
public void updateAppHeartbeatMeta(String appId, long time, long
totalWritten, long fileCount) {
@@ -125,7 +145,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
}
public void updateAppLostMeta(String appId) {
- registeredShuffle.removeIf(shuffleKey -> shuffleKey.startsWith(appId));
+ registeredAppAndShuffles.remove(appId);
appHeartbeatTime.remove(appId);
applicationMetas.remove(appId);
}
@@ -136,6 +156,14 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
workersToRemove.forEach(manuallyExcludedWorkers::remove);
}
+ public void reviseLostShuffles(String appId, List<Integer> lostShuffles) {
+ registeredAppAndShuffles.computeIfAbsent(appId, v -> new
HashSet<>()).addAll(lostShuffles);
+ }
+
+ public void deleteApp(String appId) {
+ registeredAppAndShuffles.remove(appId);
+ }
+
public void updateWorkerLostMeta(
String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort) {
WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort);
@@ -280,7 +308,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
byte[] snapshotBytes =
PbSerDeUtils.toPbSnapshotMetaInfo(
estimatedPartitionSize,
- registeredShuffle,
+ registeredAppAndShuffles,
hostnameSet,
excludedWorkers,
manuallyExcludedWorkers,
@@ -313,7 +341,12 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
estimatedPartitionSize = snapshotMetaInfo.getEstimatedPartitionSize();
- registeredShuffle.addAll(snapshotMetaInfo.getRegisteredShuffleList());
+ for (String shuffleKey : snapshotMetaInfo.getRegisteredShuffleList()) {
+ Tuple2<String, Object> appIdShuffleId =
Utils.splitShuffleKey(shuffleKey);
+ registeredAppAndShuffles
+ .computeIfAbsent(appIdShuffleId._1, v -> new HashSet<>())
+ .add((Integer) appIdShuffleId._2);
+ }
hostnameSet.addAll(snapshotMetaInfo.getHostnameSetList());
excludedWorkers.addAll(
snapshotMetaInfo.getExcludedWorkersList().stream()
@@ -329,9 +362,8 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
.collect(Collectors.toSet()));
appHeartbeatTime.putAll(snapshotMetaInfo.getAppHeartbeatTimeMap());
- registeredShuffle.forEach(
- shuffleKey -> {
- String appId = Utils.splitShuffleKey(shuffleKey)._1;
+ registeredAppAndShuffles.forEach(
+ (appId, shuffleId) -> {
if (!appHeartbeatTime.containsKey(appId)) {
appHeartbeatTime.put(appId, System.currentTimeMillis());
}
@@ -405,15 +437,16 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
LOG.info(
"Worker size: {}, Registered shuffle size: {}. Worker excluded list
size: {}. Manually Excluded list size: {}",
workers.size(),
- registeredShuffle.size(),
+ registeredAppAndShuffles.size(),
excludedWorkers.size(),
manuallyExcludedWorkers.size());
workers.forEach(workerInfo -> LOG.info(workerInfo.toString()));
- registeredShuffle.forEach(shuffle -> LOG.info("RegisteredShuffle {}",
shuffle));
+ registeredAppAndShuffles.forEach(
+ (appId, shuffleId) -> LOG.info("RegisteredShuffle {}-{}", appId,
shuffleId));
}
private void cleanUpState() {
- registeredShuffle.clear();
+ registeredAppAndShuffles.clear();
hostnameSet.clear();
workers.clear();
lostWorkers.clear();
@@ -504,4 +537,8 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public void updateApplicationMeta(ApplicationMeta applicationMeta) {
applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta);
}
+
+ public int registeredShuffleCount() {
+ return
registeredAppAndShuffles.values().stream().mapToInt(Set::size).sum();
+ }
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 7eb72679e..9e17728d6 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -46,6 +46,8 @@ public interface IMetadataHandler {
void handleWorkerExclude(
List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove, String
requestId);
+ void handleReviseLostShuffles(String appId, List<Integer> shuffles, String
requestId);
+
void handleWorkerLost(
String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort, String requestId);
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index b311b1b45..39bac5330 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -87,6 +87,11 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
updateWorkerExcludeMeta(workersToAdd, workersToRemove);
}
+ @Override
+ public void handleReviseLostShuffles(String appId, List<Integer> shuffles,
String requestId) {
+ reviseLostShuffles(appId, shuffles);
+ }
+
@Override
public void handleWorkerLost(
String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort, String requestId) {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index d6c95afd4..1d5117991 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -194,6 +194,25 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
}
}
+ @Override
+ public void handleReviseLostShuffles(String appId, List<Integer> shuffles,
String requestId) {
+ try {
+ ratisServer.submitRequest(
+ ResourceRequest.newBuilder()
+ .setCmdType(Type.ReviseLostShuffles)
+ .setRequestId(requestId)
+ .setReviseLostShufflesRequest(
+ ResourceProtos.ReviseLostShufflesRequest.newBuilder()
+ .setAppId(appId)
+ .addAllLostShuffles(shuffles)
+ .build())
+ .build());
+ } catch (CelebornRuntimeException e) {
+ LOG.error("Handle revise lost shuffle failed!", e);
+ throw e;
+ }
+ }
+
@Override
public void handleWorkerLost(
String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort, String requestId) {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index a7948dde7..4ec1dac37 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,7 +102,16 @@ public class MetaHandler {
Map<UserIdentifier, ResourceConsumption> userResourceConsumption;
Map<String, Long> estimatedAppDiskUsage = new HashMap<>();
WorkerStatus workerStatus;
+ List<Integer> lostShuffles;
switch (cmdType) {
+ case ReviseLostShuffles:
+ appId = request.getReviseLostShufflesRequest().getAppId();
+ lostShuffles =
request.getReviseLostShufflesRequest().getLostShufflesList();
+ LOG.info(
+ "Handle revise lost shuffles for {} {}", appId,
StringUtils.join(lostShuffles, ","));
+ metaSystem.reviseLostShuffles(appId, lostShuffles);
+ break;
+
case RequestSlots:
shuffleKey = request.getRequestSlotsRequest().getShuffleKey();
LOG.debug("Handle request slots for {}", shuffleKey);
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index b83599409..acb1d6097 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -41,6 +41,8 @@ enum Type {
ApplicationMeta = 26;
ReportWorkerDecommission = 27;
BatchUnRegisterShuffle = 28;
+
+ ReviseLostShuffles = 29;
}
enum WorkerEventType {
@@ -75,6 +77,7 @@ message ResourceRequest {
optional ApplicationMetaRequest applicationMetaRequest = 23;
optional ReportWorkerDecommissionRequest reportWorkerDecommissionRequest =
24;
optional BatchUnregisterShuffleRequest batchUnregisterShuffleRequest = 25;
+ optional ReviseLostShufflesRequest reviseLostShufflesRequest = 102;
}
message DiskInfo {
@@ -243,3 +246,8 @@ message ApplicationMetaRequest {
required string appId = 1;
optional string secret = 2;
}
+
+message ReviseLostShufflesRequest {
+ required string appId = 1 ;
+ repeated int32 lostShuffles = 2 ;
+}
\ No newline at end of file
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index ec6e550bb..86baaa921 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy.master
import java.io.IOException
import java.net.BindException
import java.util
+import java.util.Collections
import java.util.concurrent.{ExecutorService, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.ToLongFunction
@@ -224,7 +225,7 @@ private[celeborn] class Master(
private var hadoopFs: util.Map[StorageInfo.Type, FileSystem] = _
masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () =>
- statusSystem.registeredShuffle.size
+ statusSystem.registeredShuffleCount
}
masterSource.addGauge(MasterSource.WORKER_COUNT) { () =>
statusSystem.workers.size }
masterSource.addGauge(MasterSource.LOST_WORKER_COUNT) { () =>
statusSystem.lostWorkers.size }
@@ -530,6 +531,11 @@ private[celeborn] class Master(
context,
handleWorkerDecommission(context, workers, requestId))
+ case pb: PbReviseLostShuffles =>
+ executeWithLeaderChecker(
+ context,
+ handleReviseLostShuffle(context, pb.getAppId, pb.getLostShufflesList,
pb.getRequestId))
+
case pb: PbWorkerExclude =>
val workersToAdd = new util.ArrayList[WorkerInfo](pb.getWorkersToAddList
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
@@ -684,7 +690,11 @@ private[celeborn] class Master(
val expiredShuffleKeys = new util.HashSet[String]
activeShuffleKeys.asScala.foreach { shuffleKey =>
- if (!statusSystem.registeredShuffle.contains(shuffleKey)) {
+ val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
+ val shuffleIds = statusSystem.registeredAppAndShuffles.get(appId)
+ if (shuffleIds == null || !shuffleIds.contains(shuffleId)) {
+ logWarning(
+ s"Shuffle $shuffleKey expired on
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
expiredShuffleKeys.add(shuffleKey)
}
}
@@ -704,6 +714,23 @@ private[celeborn] class Master(
}
}
+ private def handleReviseLostShuffle(
+ context: RpcCallContext,
+ appId: String,
+ lostShuffles: java.util.List[Integer],
+ requestId: String) = {
+ try {
+ logInfo(s"Handle lost shuffles for ${appId} ${lostShuffles} ")
+ statusSystem.handleReviseLostShuffles(appId, lostShuffles, requestId);
+ if (context != null) {
+ context.reply(ReviseLostShufflesResponse(true, ""))
+ }
+ } catch {
+ case e: Exception =>
+ context.reply(ReviseLostShufflesResponse(false, e.getMessage))
+ }
+ }
+
private def handleWorkerExclude(
context: RpcCallContext,
workersToAdd: util.List[WorkerInfo],
@@ -1099,13 +1126,16 @@ private[celeborn] class Master(
if (shouldResponse) {
// UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
// during serialization of HeartbeatFromApplicationResponse
+ var appRelatedShuffles =
+ statusSystem.registeredAppAndShuffles.getOrDefault(appId,
Collections.emptySet())
context.reply(HeartbeatFromApplicationResponse(
StatusCode.SUCCESS,
new util.ArrayList(
(statusSystem.excludedWorkers.asScala ++
statusSystem.manuallyExcludedWorkers.asScala).asJava),
needCheckedWorkerList,
new util.ArrayList[WorkerInfo](
- (statusSystem.shutdownWorkers.asScala ++
statusSystem.decommissionWorkers.asScala).asJava)))
+ (statusSystem.shutdownWorkers.asScala ++
statusSystem.decommissionWorkers.asScala).asJava),
+ new util.ArrayList(appRelatedShuffles)))
} else {
context.reply(OneWayMessageResponse)
}
@@ -1339,8 +1369,11 @@ private[celeborn] class Master(
override def getShuffleList: String = {
val sb = new StringBuilder
sb.append("======================= Shuffle Key List
============================\n")
- statusSystem.registeredShuffle.asScala.foreach { shuffleKey =>
- sb.append(s"$shuffleKey\n")
+ statusSystem.registeredAppAndShuffles.asScala.foreach { shuffleKey =>
+ val appId = shuffleKey._1
+ shuffleKey._2.asScala.foreach { id =>
+ sb.append(s"$appId-${id}\n")
+ }
}
sb.toString()
}
@@ -1442,6 +1475,14 @@ private[celeborn] class Master(
}
}
+ override def reviseLostShuffles(appId: String, shuffles:
java.util.List[Integer]): Unit = {
+ statusSystem.reviseLostShuffles(appId, shuffles)
+ }
+
+ override def deleteApps(appIds: String): Unit = {
+ appIds.split(",").foreach(id => statusSystem.deleteApp(id))
+ }
+
override def getWorkerEventInfo(): String = {
val sb = new StringBuilder
sb.append("======================= Workers Event in Master
========================\n")
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
index ac326c5fb..447a9b1b2 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
@@ -142,4 +142,5 @@ class ApiMasterResource extends ApiRequestContext {
workerList)._2)
sb.toString()
}
+
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
index 7c08e335e..18cab93e8 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
@@ -17,7 +17,8 @@
package org.apache.celeborn.service.deploy.master.http.api.v1
-import javax.ws.rs.{Consumes, GET, Path, Produces}
+import java.util
+import javax.ws.rs.{Consumes, GET, Path, Produces, QueryParam}
import javax.ws.rs.core.MediaType
import scala.collection.JavaConverters._
@@ -27,7 +28,7 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.celeborn.common.util.Utils
-import org.apache.celeborn.rest.v1.model.{AppDiskUsageData,
AppDiskUsageSnapshotData, AppDiskUsageSnapshotsResponse,
ApplicationHeartbeatData, ApplicationsHeartbeatResponse, HostnamesResponse}
+import org.apache.celeborn.rest.v1.model.{AppDiskUsageData,
AppDiskUsageSnapshotData, AppDiskUsageSnapshotsResponse,
ApplicationHeartbeatData, ApplicationsHeartbeatResponse, HandleResponse,
HostnamesResponse}
import org.apache.celeborn.server.common.http.api.ApiRequestContext
import org.apache.celeborn.service.deploy.master.Master
@@ -94,4 +95,42 @@ class ApplicationResource extends ApiRequestContext {
def hostnames(): HostnamesResponse = {
new
HostnamesResponse().hostnames(statusSystem.hostnameSet.asScala.toSeq.asJava)
}
+
+ @Path("/reviseLostShuffles")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON,
+ schema = new Schema(implementation = classOf[HandleResponse]))),
+ description =
+ "Revise lost shuffles")
+ @GET
+ def reviseLostShuffles(
+ @QueryParam("app") appId: String,
+ @QueryParam("shuffleIds") shufflesIds: String): HandleResponse = {
+ val shuffles = new util.ArrayList[Integer]()
+ shufflesIds.split(",").foreach { p =>
+ shuffles.add(Integer.parseInt(p))
+ }
+ if (!shuffles.isEmpty) {
+ httpService.reviseLostShuffles(appId, shuffles)
+ }
+ new HandleResponse().success(true).message("revise lost shuffle done")
+ }
+
+ @Path("/deleteApps")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON,
+ schema = new Schema(implementation = classOf[HandleResponse]))),
+ description =
+ "Delete resource of an app")
+ @GET
+ def deleteApp(
+ @QueryParam("apps") apps: String): HandleResponse = {
+ httpService.deleteApps(apps)
+ new HandleResponse().success(true).message(s"delete shuffles of app
${apps}")
+ }
+
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ShuffleResource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ShuffleResource.scala
index 4b7319d46..fbdcd920e 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ShuffleResource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ShuffleResource.scala
@@ -17,6 +17,7 @@
package org.apache.celeborn.service.deploy.master.http.api.v1
+import java.util
import javax.ws.rs.{Consumes, GET, Produces}
import javax.ws.rs.core.MediaType
@@ -44,6 +45,13 @@ class ShuffleResource extends ApiRequestContext {
"List all running shuffle keys of the service. It will return all
running shuffle's key of the cluster.")
@GET
def shuffles: ShufflesResponse = {
- new
ShufflesResponse().shuffleIds(statusSystem.registeredShuffle.asScala.toSeq.asJava)
+ val shuffles = new util.ArrayList[String]()
+ statusSystem.registeredAppAndShuffles.asScala.foreach { shuffleKey =>
+ val appId = shuffleKey._1
+ shuffleKey._2.asScala.foreach { id =>
+ shuffles.add(s"$appId-${id}")
+ }
+ }
+ new ShufflesResponse().shuffleIds(shuffles)
}
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index b571e49f6..65660158e 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -472,11 +472,11 @@ public class DefaultMetaSystemSuiteJ {
statusSystem.handleApplicationMeta(new ApplicationMeta(APPID1,
"testSecret"));
statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate,
getNewReqeustId());
- assertEquals(1, statusSystem.registeredShuffle.size());
+ assertEquals(1, statusSystem.registeredAppAndShuffles.size());
assertEquals(1, statusSystem.applicationMetas.size());
statusSystem.handleAppLost(APPID1, getNewReqeustId());
- assertTrue(statusSystem.registeredShuffle.isEmpty());
+ assertTrue(statusSystem.registeredAppAndShuffles.isEmpty());
assertTrue(statusSystem.applicationMetas.isEmpty());
}
@@ -545,11 +545,11 @@ public class DefaultMetaSystemSuiteJ {
statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate,
getNewReqeustId());
- assertEquals(1, statusSystem.registeredShuffle.size());
+ assertEquals(1, statusSystem.registeredAppAndShuffles.size());
statusSystem.handleUnRegisterShuffle(SHUFFLEKEY1, getNewReqeustId());
- assertTrue(statusSystem.registeredShuffle.isEmpty());
+ assertTrue(statusSystem.registeredAppAndShuffles.isEmpty());
}
@Test
@@ -621,17 +621,17 @@ public class DefaultMetaSystemSuiteJ {
shuffleKeysAll.add(shuffleKey);
statusSystem.handleRequestSlots(shuffleKey, HOSTNAME1,
workersToAllocate, getNewReqeustId());
}
- Assert.assertEquals(4, statusSystem.registeredShuffle.size());
+ Assert.assertEquals(4, statusSystem.registeredShuffleCount());
List<String> shuffleKeys1 = new ArrayList<>();
shuffleKeys1.add(shuffleKeysAll.get(0));
statusSystem.handleBatchUnRegisterShuffles(shuffleKeys1,
getNewReqeustId());
- Assert.assertEquals(3, statusSystem.registeredShuffle.size());
+ Assert.assertEquals(3, statusSystem.registeredShuffleCount());
statusSystem.handleBatchUnRegisterShuffles(shuffleKeysAll,
getNewReqeustId());
- Assert.assertTrue(statusSystem.registeredShuffle.isEmpty());
+ Assert.assertTrue(statusSystem.registeredShuffleCount() == 0);
}
@Test
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 851d87c1c..4f2fae78a 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -751,16 +751,16 @@ public class RatisMasterStatusSystemSuiteJ {
statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate,
getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertEquals(1, STATUSSYSTEM1.registeredShuffle.size());
- Assert.assertEquals(1, STATUSSYSTEM2.registeredShuffle.size());
- Assert.assertEquals(1, STATUSSYSTEM3.registeredShuffle.size());
+ Assert.assertEquals(1, STATUSSYSTEM1.registeredAppAndShuffles.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.registeredAppAndShuffles.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.registeredAppAndShuffles.size());
statusSystem.handleAppLost(APPID1, getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertTrue(STATUSSYSTEM1.registeredShuffle.isEmpty());
- Assert.assertTrue(STATUSSYSTEM2.registeredShuffle.isEmpty());
- Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty());
+ Assert.assertTrue(STATUSSYSTEM1.registeredAppAndShuffles.isEmpty());
+ Assert.assertTrue(STATUSSYSTEM2.registeredAppAndShuffles.isEmpty());
+ Assert.assertTrue(STATUSSYSTEM3.registeredAppAndShuffles.isEmpty());
}
@Test
@@ -832,16 +832,16 @@ public class RatisMasterStatusSystemSuiteJ {
statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate,
getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertEquals(1, STATUSSYSTEM1.registeredShuffle.size());
- Assert.assertEquals(1, STATUSSYSTEM2.registeredShuffle.size());
- Assert.assertEquals(1, STATUSSYSTEM3.registeredShuffle.size());
+ Assert.assertEquals(1, STATUSSYSTEM1.registeredAppAndShuffles.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.registeredAppAndShuffles.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.registeredAppAndShuffles.size());
statusSystem.handleUnRegisterShuffle(SHUFFLEKEY1, getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertTrue(STATUSSYSTEM1.registeredShuffle.isEmpty());
- Assert.assertTrue(STATUSSYSTEM2.registeredShuffle.isEmpty());
- Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty());
+ Assert.assertTrue(STATUSSYSTEM1.registeredAppAndShuffles.isEmpty());
+ Assert.assertTrue(STATUSSYSTEM2.registeredAppAndShuffles.isEmpty());
+ Assert.assertTrue(STATUSSYSTEM3.registeredAppAndShuffles.isEmpty());
}
@Test
@@ -919,25 +919,25 @@ public class RatisMasterStatusSystemSuiteJ {
Thread.sleep(3000L);
- Assert.assertEquals(4, STATUSSYSTEM1.registeredShuffle.size());
- Assert.assertEquals(4, STATUSSYSTEM2.registeredShuffle.size());
- Assert.assertEquals(4, STATUSSYSTEM3.registeredShuffle.size());
+ Assert.assertEquals(4, STATUSSYSTEM1.registeredShuffleCount());
+ Assert.assertEquals(4, STATUSSYSTEM2.registeredShuffleCount());
+ Assert.assertEquals(4, STATUSSYSTEM3.registeredShuffleCount());
List<String> shuffleKeys1 = new ArrayList<>();
shuffleKeys1.add(shuffleKeysAll.get(0));
statusSystem.handleBatchUnRegisterShuffles(shuffleKeys1,
getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertEquals(3, STATUSSYSTEM1.registeredShuffle.size());
- Assert.assertEquals(3, STATUSSYSTEM2.registeredShuffle.size());
- Assert.assertEquals(3, STATUSSYSTEM3.registeredShuffle.size());
+ Assert.assertEquals(3, STATUSSYSTEM1.registeredShuffleCount());
+ Assert.assertEquals(3, STATUSSYSTEM2.registeredShuffleCount());
+ Assert.assertEquals(3, STATUSSYSTEM3.registeredShuffleCount());
statusSystem.handleBatchUnRegisterShuffles(shuffleKeysAll,
getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertTrue(STATUSSYSTEM1.registeredShuffle.isEmpty());
- Assert.assertTrue(STATUSSYSTEM2.registeredShuffle.isEmpty());
- Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty());
+ Assert.assertTrue(STATUSSYSTEM1.registeredShuffleCount() == 0);
+ Assert.assertTrue(STATUSSYSTEM2.registeredShuffleCount() == 0);
+ Assert.assertTrue(STATUSSYSTEM3.registeredShuffleCount() == 0);
}
@Test
@@ -1085,21 +1085,21 @@ public class RatisMasterStatusSystemSuiteJ {
@Before
public void resetStatus() {
- STATUSSYSTEM1.registeredShuffle.clear();
+ STATUSSYSTEM1.registeredAppAndShuffles.clear();
STATUSSYSTEM1.hostnameSet.clear();
STATUSSYSTEM1.workers.clear();
STATUSSYSTEM1.appHeartbeatTime.clear();
STATUSSYSTEM1.excludedWorkers.clear();
STATUSSYSTEM1.workerLostEvents.clear();
- STATUSSYSTEM2.registeredShuffle.clear();
+ STATUSSYSTEM2.registeredAppAndShuffles.clear();
STATUSSYSTEM2.hostnameSet.clear();
STATUSSYSTEM2.workers.clear();
STATUSSYSTEM2.appHeartbeatTime.clear();
STATUSSYSTEM2.excludedWorkers.clear();
STATUSSYSTEM2.workerLostEvents.clear();
- STATUSSYSTEM3.registeredShuffle.clear();
+ STATUSSYSTEM3.registeredAppAndShuffles.clear();
STATUSSYSTEM3.hostnameSet.clear();
STATUSSYSTEM3.workers.clear();
STATUSSYSTEM3.appHeartbeatTime.clear();
@@ -1414,6 +1414,19 @@ public class RatisMasterStatusSystemSuiteJ {
STATUSSYSTEM1.workerEventInfos.get(workerInfo2).getEventType());
}
+ @Test
+ public void testReviseShuffles() throws InterruptedException {
+ AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+ Assert.assertNotNull(statusSystem);
+
+ statusSystem.handleReviseLostShuffles(
+ "app-1", Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), getNewReqeustId());
+ Thread.sleep(1000l);
+ Assert.assertEquals(STATUSSYSTEM1.registeredShuffleCount(), 8);
+ Assert.assertEquals(STATUSSYSTEM2.registeredShuffleCount(), 8);
+ Assert.assertEquals(STATUSSYSTEM3.registeredShuffleCount(), 8);
+ }
+
@AfterClass
public static void testNotifyLogFailed() {
List<HARaftServer> list = Arrays.asList(RATISSERVER1, RATISSERVER2,
RATISSERVER3);
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
index 281b821b2..2afac4729 100644
---
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
@@ -27,6 +27,7 @@ import org.apache.celeborn.rest.v1.master.invoker.Pair;
import org.apache.celeborn.rest.v1.model.AppDiskUsageSnapshotsResponse;
import org.apache.celeborn.rest.v1.model.ApplicationsHeartbeatResponse;
+import org.apache.celeborn.rest.v1.model.HandleResponse;
import org.apache.celeborn.rest.v1.model.HostnamesResponse;
@@ -48,6 +49,76 @@ public class ApplicationApi extends BaseApi {
super(apiClient);
}
+ /**
+ *
+ * Delete resource of apps
+ * @param apps (optional)
+ * @return HandleResponse
+ * @throws ApiException if fails to make API call
+ */
+ public HandleResponse deleteApps(String apps) throws ApiException {
+ return this.deleteApps(apps, Collections.emptyMap());
+ }
+
+
+ /**
+ *
+ * Delete resource of apps
+ * @param apps (optional)
+ * @param additionalHeaders additionalHeaders for this call
+ * @return HandleResponse
+ * @throws ApiException if fails to make API call
+ */
+ public HandleResponse deleteApps(String apps, Map<String, String>
additionalHeaders) throws ApiException {
+ Object localVarPostBody = null;
+
+ // create path and map variables
+ String localVarPath = "/api/v1/applications/deleteApps";
+
+ StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
+ String localVarQueryParameterBaseName;
+ List<Pair> localVarQueryParams = new ArrayList<Pair>();
+ List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
+ Map<String, String> localVarHeaderParams = new HashMap<String, String>();
+ Map<String, String> localVarCookieParams = new HashMap<String, String>();
+ Map<String, Object> localVarFormParams = new HashMap<String, Object>();
+
+ localVarQueryParams.addAll(apiClient.parameterToPair("apps", apps));
+
+ localVarHeaderParams.putAll(additionalHeaders);
+
+
+
+ final String[] localVarAccepts = {
+ "application/json"
+ };
+ final String localVarAccept =
apiClient.selectHeaderAccept(localVarAccepts);
+
+ final String[] localVarContentTypes = {
+
+ };
+ final String localVarContentType =
apiClient.selectHeaderContentType(localVarContentTypes);
+
+ String[] localVarAuthNames = new String[] { "basic" };
+
+ TypeReference<HandleResponse> localVarReturnType = new
TypeReference<HandleResponse>() {};
+ return apiClient.invokeAPI(
+ localVarPath,
+ "GET",
+ localVarQueryParams,
+ localVarCollectionQueryParams,
+ localVarQueryStringJoiner.toString(),
+ localVarPostBody,
+ localVarHeaderParams,
+ localVarCookieParams,
+ localVarFormParams,
+ localVarAccept,
+ localVarContentType,
+ localVarAuthNames,
+ localVarReturnType
+ );
+ }
+
/**
*
* List all running application's LifecycleManager's hostnames of
the cluster.
@@ -249,6 +320,79 @@ public class ApplicationApi extends BaseApi {
);
}
+ /**
+ *
+ * Revise lost shuffles or delete shuffles of an application.
+ * @param app (optional)
+ * @param shuffleIds (optional)
+ * @return HandleResponse
+ * @throws ApiException if fails to make API call
+ */
+ public HandleResponse reviseLostShuffles(String app, String shuffleIds)
throws ApiException {
+ return this.reviseLostShuffles(app, shuffleIds, Collections.emptyMap());
+ }
+
+
+ /**
+ *
+ * Revise lost shuffles or delete shuffles of an application.
+ * @param app (optional)
+ * @param shuffleIds (optional)
+ * @param additionalHeaders additionalHeaders for this call
+ * @return HandleResponse
+ * @throws ApiException if fails to make API call
+ */
+ public HandleResponse reviseLostShuffles(String app, String shuffleIds,
Map<String, String> additionalHeaders) throws ApiException {
+ Object localVarPostBody = null;
+
+ // create path and map variables
+ String localVarPath = "/api/v1/applications/reviseLostShuffles";
+
+ StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
+ String localVarQueryParameterBaseName;
+ List<Pair> localVarQueryParams = new ArrayList<Pair>();
+ List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
+ Map<String, String> localVarHeaderParams = new HashMap<String, String>();
+ Map<String, String> localVarCookieParams = new HashMap<String, String>();
+ Map<String, Object> localVarFormParams = new HashMap<String, Object>();
+
+ localVarQueryParams.addAll(apiClient.parameterToPair("app", app));
+ localVarQueryParams.addAll(apiClient.parameterToPair("shuffleIds",
shuffleIds));
+
+ localVarHeaderParams.putAll(additionalHeaders);
+
+
+
+ final String[] localVarAccepts = {
+ "application/json"
+ };
+ final String localVarAccept =
apiClient.selectHeaderAccept(localVarAccepts);
+
+ final String[] localVarContentTypes = {
+
+ };
+ final String localVarContentType =
apiClient.selectHeaderContentType(localVarContentTypes);
+
+ String[] localVarAuthNames = new String[] { "basic" };
+
+ TypeReference<HandleResponse> localVarReturnType = new
TypeReference<HandleResponse>() {};
+ return apiClient.invokeAPI(
+ localVarPath,
+ "GET",
+ localVarQueryParams,
+ localVarCollectionQueryParams,
+ localVarQueryStringJoiner.toString(),
+ localVarPostBody,
+ localVarHeaderParams,
+ localVarCookieParams,
+ localVarFormParams,
+ localVarAccept,
+ localVarContentType,
+ localVarAuthNames,
+ localVarReturnType
+ );
+ }
+
@Override
public <T> T invokeAPI(String url, String method, Object request,
TypeReference<T> returnType, Map<String, String> additionalHeaders) throws
ApiException {
String localVarPath = url.replace(apiClient.getBaseURL(), "");
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ContainerInfo.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ContainerInfo.java
index e1ef4fb8c..4c9dfe81f 100644
---
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ContainerInfo.java
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ContainerInfo.java
@@ -44,7 +44,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
ContainerInfo.JSON_PROPERTY_CONTAINER_CLUSTER,
ContainerInfo.JSON_PROPERTY_CONTAINER_TAGS
})
[email protected](value =
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator
version: 7.7.0")
[email protected](value =
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator
version: 7.8.0")
public class ContainerInfo {
public static final String JSON_PROPERTY_CONTAINER_NAME = "containerName";
private String containerName;
diff --git a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
index 7c8d5cd67..60528efcb 100644
--- a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
@@ -270,6 +270,52 @@ paths:
schema:
$ref: '#/components/schemas/HostnamesResponse'
+ /api/v1/applications/reviseLostShuffles:
+ get:
+ tags:
+ - Application
+ operationId: reviseLostShuffles
+ description: Revise lost shuffles or delete shuffles of an application.
+ parameters:
+ - name: app
+ in: query
+ required: false
+ schema:
+ type: string
+ - name: shuffleIds
+ in: query
+ required: false
+ schema:
+ type: string
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HandleResponse'
+
+ /api/v1/applications/deleteApps:
+ get:
+ tags:
+ - Application
+ operationId: deleteApps
+ description: Delete resource of apps
+ parameters:
+ - name: apps
+ in: query
+ required: false
+ schema:
+ type: string
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HandleResponse'
+
+
/api/v1/ratis/election/transfer:
post:
tags:
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index bb98e88f6..e8610dfdc 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -194,6 +194,10 @@ abstract class HttpService extends Service with Logging {
def getWorkerEventInfo(): String = throw new UnsupportedOperationException()
+ def reviseLostShuffles(appId: String, shuffles: java.util.List[Integer])
+
+ def deleteApps(appIds: String)
+
def startHttpServer(): Unit = {
httpServer = HttpServer(
serviceName,
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala
index 3b7d74df8..d689ead7e 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala
@@ -28,7 +28,6 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.celeborn.client.{LifecycleManager, WithShuffleClientSuite}
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.protocol.message.StatusCode
-import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.service.deploy.MiniClusterFeature
class LifecycleManagerUnregisterShuffleSuite extends WithShuffleClientSuite
@@ -60,8 +59,8 @@ class LifecycleManagerUnregisterShuffleSuite extends
WithShuffleClientSuite
assert(res.status == StatusCode.SUCCESS)
lifecycleManager.registeredShuffle.add(shuffleId)
assert(lifecycleManager.registeredShuffle.contains(shuffleId))
- val shuffleKey = Utils.makeShuffleKey(APP, shuffleId)
- assert(masterInfo._1.statusSystem.registeredShuffle.contains(shuffleKey))
+
assert(masterInfo._1.statusSystem.registeredAppAndShuffles.containsKey(APP))
+
assert(masterInfo._1.statusSystem.registeredAppAndShuffles.get(APP).contains(shuffleId))
lifecycleManager.commitManager.setStageEnd(shuffleId)
}
@@ -71,9 +70,10 @@ class LifecycleManagerUnregisterShuffleSuite extends
WithShuffleClientSuite
// after unregister shuffle
eventually(timeout(120.seconds), interval(2.seconds)) {
shuffleIds.foreach { shuffleId: Int =>
- val shuffleKey = Utils.makeShuffleKey(APP, shuffleId)
assert(!lifecycleManager.registeredShuffle.contains(shuffleId))
-
assert(!masterInfo._1.statusSystem.registeredShuffle.contains(shuffleKey))
+ val containShuffleKey =
masterInfo._1.statusSystem.registeredAppAndShuffles.containsKey(
+ APP) &&
masterInfo._1.statusSystem.registeredAppAndShuffles.get(APP).contains(shuffleId)
+ assert(!containShuffleKey)
}
}
@@ -93,8 +93,8 @@ class LifecycleManagerUnregisterShuffleSuite extends
WithShuffleClientSuite
assert(res.status == StatusCode.SUCCESS)
lifecycleManager.registeredShuffle.add(shuffleId)
assert(lifecycleManager.registeredShuffle.contains(shuffleId))
- val shuffleKey = Utils.makeShuffleKey(APP, shuffleId)
- assert(masterInfo._1.statusSystem.registeredShuffle.contains(shuffleKey))
+
assert(masterInfo._1.statusSystem.registeredAppAndShuffles.containsKey(APP))
+
assert(masterInfo._1.statusSystem.registeredAppAndShuffles.get(APP).contains(shuffleId))
lifecycleManager.commitManager.setStageEnd(shuffleId)
}
val previousTime = System.currentTimeMillis()
@@ -104,9 +104,10 @@ class LifecycleManagerUnregisterShuffleSuite extends
WithShuffleClientSuite
// after unregister shuffle
eventually(timeout(120.seconds), interval(2.seconds)) {
shuffleIds.foreach { shuffleId: Int =>
- val shuffleKey = Utils.makeShuffleKey(APP, shuffleId)
assert(!lifecycleManager.registeredShuffle.contains(shuffleId))
-
assert(!masterInfo._1.statusSystem.registeredShuffle.contains(shuffleKey))
+ val containShuffleKey =
masterInfo._1.statusSystem.registeredAppAndShuffles.containsKey(
+ APP) &&
masterInfo._1.statusSystem.registeredAppAndShuffles.get(APP).contains(shuffleId)
+ assert(!containShuffleKey)
}
}
val currentTime = System.currentTimeMillis()
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index b9bfff05b..62602cd06 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -866,6 +866,11 @@ private[celeborn] class Worker(
sb.toString()
}
+ override def reviseLostShuffles(appId: String, shuffles:
java.util.List[Integer]): Unit =
+ throw new UnsupportedOperationException()
+
+ override def deleteApps(appIds: String): Unit = throw new
UnsupportedOperationException()
+
override def exit(exitType: String): String = {
exitType.toUpperCase(Locale.ROOT) match {
case "DECOMMISSION" =>