This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 931880a82 [CELEBORN-1112] Inform celeborn application is shutdown,
then celeborn cluster can release resource immediately
931880a82 is described below
commit 931880a82d1024158f0e888d4377f73f8a07effb
Author: Shuang <[email protected]>
AuthorDate: Wed Nov 8 20:46:51 2023 +0800
[CELEBORN-1112] Inform celeborn application is shutdown, then celeborn
cluster can release resource immediately
### What changes were proposed in this pull request?
Unregister application to Celeborn master After Application stopped, then
master will expire the related shuffle resource immediately, resulting in
resource savings.
### Why are the changes needed?
Currently Celeborn master expires the related application shuffle resource
only when application is being checked timeout,
this would greatly delay the release of resources, which is not conducive
to saving resources.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
PASS GA
Closes #2075 from RexXiong/CELEBORN-1112.
Authored-by: Shuang <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../celeborn/client/ApplicationHeartbeater.scala | 32 ++++++++++++++++++++--
.../apache/celeborn/client/LifecycleManager.scala | 8 ++++++
.../org/apache/celeborn/common/CelebornConf.scala | 11 ++++++++
.../apache/celeborn/common/rpc/RpcEndpoint.scala | 2 +-
docs/configuration/client.md | 1 +
.../celeborn/service/deploy/master/Master.scala | 3 +-
6 files changed, 52 insertions(+), 5 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index e117ec3e1..67692da74 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -25,7 +25,7 @@ import scala.concurrent.duration.DurationInt
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.internal.Logging
-import
org.apache.celeborn.common.protocol.message.ControlMessages.{HeartbeatFromApplication,
HeartbeatFromApplicationResponse, ZERO_UUID}
+import
org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost,
ApplicationLostResponse, HeartbeatFromApplication,
HeartbeatFromApplicationResponse, ZERO_UUID}
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.util.{ThreadUtils, Utils}
@@ -36,8 +36,11 @@ class ApplicationHeartbeater(
shuffleMetrics: () => (Long, Long),
workerStatusTracker: WorkerStatusTracker) extends Logging {
+ private var stopped = false
+
// Use independent app heartbeat threads to avoid being blocked by other
operations.
private val appHeartbeatIntervalMs = conf.appHeartbeatIntervalMs
+ private val applicationUnregisterEnabled = conf.applicationUnregisterEnabled
private val appHeartbeatHandlerThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-app-heartbeat")
private var appHeartbeat: ScheduledFuture[_] = _
@@ -96,8 +99,31 @@ class ApplicationHeartbeater(
}
}
+ private def unregisterApplication(): Unit = {
+ try {
+ // Then unregister Application
+ val response = masterClient.askSync[ApplicationLostResponse](
+ ApplicationLost(appId),
+ classOf[ApplicationLostResponse])
+ logInfo(s"Unregister Application $appId with response status:
${response.status}")
+ } catch {
+ case e: Exception =>
+ logWarning("AskSync unRegisterApplication failed.", e)
+ }
+ }
+
def stop(): Unit = {
- appHeartbeat.cancel(true)
- ThreadUtils.shutdown(appHeartbeatHandlerThread, 800.millis)
+ stopped.synchronized {
+ if (!stopped) {
+ // Stop appHeartbeat first
+ logInfo(s"Stop Application heartbeat $appId")
+ appHeartbeat.cancel(true)
+ ThreadUtils.shutdown(appHeartbeatHandlerThread, 800.millis)
+ if (applicationUnregisterEnabled) {
+ unregisterApplication()
+ }
+ stopped = true
+ }
+ }
}
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 9f1513c32..9f0ca9da4 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1202,4 +1202,12 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
// Initialize at the end of LifecycleManager construction.
initialize()
+
+ /**
+ * A convenient method to stop [[RpcEndpoint]].
+ */
+ override def stop(): Unit = {
+ heartbeater.stop()
+ super.stop()
+ }
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 06e815f98..2e31f7827 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -729,6 +729,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
+ def applicationUnregisterEnabled: Boolean =
get(APPLICATION_UNREGISTER_ENABLED)
+
def clientCheckedUseAllocatedWorkers: Boolean =
get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
def clientExcludedWorkerExpireTimeout: Long =
get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
def clientExcludeReplicaOnFailureEnabled: Boolean =
@@ -2900,6 +2902,15 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")
+ val APPLICATION_UNREGISTER_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.application.unregister.enabled")
+ .categories("client")
+ .version("0.3.2")
+ .doc("When true, Celeborn client will inform celeborn master the
application is already shutdown during client " +
+ "exit, this allows the cluster to release resources immediately,
resulting in resource savings.")
+ .booleanConf
+ .createWithDefault(true)
+
val CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.excludePeerWorkerOnFailure.enabled")
.categories("client")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala
index 4dc466af9..a0e080396 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala
@@ -128,7 +128,7 @@ trait RpcEndpoint {
/**
* A convenient method to stop [[RpcEndpoint]].
*/
- final def stop(): Unit = {
+ def stop(): Unit = {
val _self = self
if (_self != null) {
rpcEnv.stop(_self)
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index f16e43193..1c1d4e7a6 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -20,6 +20,7 @@ license: |
| Key | Default | Description | Since |
| --- | ------- | ----------- | ----- |
| celeborn.client.application.heartbeatInterval | 10s | Interval for client to
send heartbeat message to master. | 0.3.0 |
+| celeborn.client.application.unregister.enabled | true | When true, Celeborn
client will inform celeborn master the application is already shutdown during
client exit, this allows the cluster to release resources immediately,
resulting in resource savings. | 0.3.2 |
| celeborn.client.closeIdleConnections | true | Whether client will close idle
connections. | 0.3.0 |
| celeborn.client.commitFiles.ignoreExcludedWorker | false | When true,
LifecycleManager will skip workers which are in the excluded list. | 0.3.0 |
| celeborn.client.eagerlyCreateInputStream.threads | 32 | Threads count for
streamCreatorPool in CelebornShuffleReader. | 0.3.1 |
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 9f2d5d26b..1c03903a7 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -372,7 +372,8 @@ private[celeborn] class Master(
handleUnregisterShuffle(context, applicationId, shuffleId, requestId))
case ApplicationLost(appId, requestId) =>
- logDebug(s"Received ApplicationLost request $requestId, $appId.")
+ logDebug(
+ s"Received ApplicationLost request $requestId, $appId from
${context.senderAddress}.")
executeWithLeaderChecker(context, handleApplicationLost(context, appId,
requestId))
case HeartbeatFromWorker(