This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 931880a82 [CELEBORN-1112] Inform celeborn application is shutdown, 
then celeborn cluster can release resource immediately
931880a82 is described below

commit 931880a82d1024158f0e888d4377f73f8a07effb
Author: Shuang <[email protected]>
AuthorDate: Wed Nov 8 20:46:51 2023 +0800

    [CELEBORN-1112] Inform celeborn application is shutdown, then celeborn 
cluster can release resource immediately
    
    ### What changes were proposed in this pull request?
    Unregister application to Celeborn master After Application stopped, then 
master will expire the related shuffle resource immediately, resulting in 
resource savings.
    
    ### Why are the changes needed?
    Currently Celeborn master expires the related application shuffle resource 
only when application is being checked timeout,
    this would greatly delay the release of resources, which is not conducive 
to saving resources.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    PASS GA
    
    Closes #2075 from RexXiong/CELEBORN-1112.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../celeborn/client/ApplicationHeartbeater.scala   | 32 ++++++++++++++++++++--
 .../apache/celeborn/client/LifecycleManager.scala  |  8 ++++++
 .../org/apache/celeborn/common/CelebornConf.scala  | 11 ++++++++
 .../apache/celeborn/common/rpc/RpcEndpoint.scala   |  2 +-
 docs/configuration/client.md                       |  1 +
 .../celeborn/service/deploy/master/Master.scala    |  3 +-
 6 files changed, 52 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index e117ec3e1..67692da74 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -25,7 +25,7 @@ import scala.concurrent.duration.DurationInt
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.client.MasterClient
 import org.apache.celeborn.common.internal.Logging
-import 
org.apache.celeborn.common.protocol.message.ControlMessages.{HeartbeatFromApplication,
 HeartbeatFromApplicationResponse, ZERO_UUID}
+import 
org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, 
ApplicationLostResponse, HeartbeatFromApplication, 
HeartbeatFromApplicationResponse, ZERO_UUID}
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.util.{ThreadUtils, Utils}
 
@@ -36,8 +36,11 @@ class ApplicationHeartbeater(
     shuffleMetrics: () => (Long, Long),
     workerStatusTracker: WorkerStatusTracker) extends Logging {
 
+  private var stopped = false
+
   // Use independent app heartbeat threads to avoid being blocked by other 
operations.
   private val appHeartbeatIntervalMs = conf.appHeartbeatIntervalMs
+  private val applicationUnregisterEnabled = conf.applicationUnregisterEnabled
   private val appHeartbeatHandlerThread =
     
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-app-heartbeat")
   private var appHeartbeat: ScheduledFuture[_] = _
@@ -96,8 +99,31 @@ class ApplicationHeartbeater(
     }
   }
 
+  private def unregisterApplication(): Unit = {
+    try {
+      // Then unregister Application
+      val response = masterClient.askSync[ApplicationLostResponse](
+        ApplicationLost(appId),
+        classOf[ApplicationLostResponse])
+      logInfo(s"Unregister Application $appId with response status: 
${response.status}")
+    } catch {
+      case e: Exception =>
+        logWarning("AskSync unRegisterApplication failed.", e)
+    }
+  }
+
   def stop(): Unit = {
-    appHeartbeat.cancel(true)
-    ThreadUtils.shutdown(appHeartbeatHandlerThread, 800.millis)
+    stopped.synchronized {
+      if (!stopped) {
+        // Stop appHeartbeat first
+        logInfo(s"Stop Application heartbeat $appId")
+        appHeartbeat.cancel(true)
+        ThreadUtils.shutdown(appHeartbeatHandlerThread, 800.millis)
+        if (applicationUnregisterEnabled) {
+          unregisterApplication()
+        }
+        stopped = true
+      }
+    }
   }
 }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 9f1513c32..9f0ca9da4 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1202,4 +1202,12 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
 
   // Initialize at the end of LifecycleManager construction.
   initialize()
+
+  /**
+   * A convenient method to stop [[RpcEndpoint]].
+   */
+  override def stop(): Unit = {
+    heartbeater.stop()
+    super.stop()
+  }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 06e815f98..2e31f7827 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -729,6 +729,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
   def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
   def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
+  def applicationUnregisterEnabled: Boolean = 
get(APPLICATION_UNREGISTER_ENABLED)
+
   def clientCheckedUseAllocatedWorkers: Boolean = 
get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
   def clientExcludedWorkerExpireTimeout: Long = 
get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
   def clientExcludeReplicaOnFailureEnabled: Boolean =
@@ -2900,6 +2902,15 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("10s")
 
+  val APPLICATION_UNREGISTER_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.client.application.unregister.enabled")
+      .categories("client")
+      .version("0.3.2")
+      .doc("When true, Celeborn client will inform celeborn master the 
application is already shutdown during client " +
+        "exit, this allows the cluster to release resources immediately, 
resulting in resource savings.")
+      .booleanConf
+      .createWithDefault(true)
+
   val CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.client.excludePeerWorkerOnFailure.enabled")
       .categories("client")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala
index 4dc466af9..a0e080396 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala
@@ -128,7 +128,7 @@ trait RpcEndpoint {
   /**
    * A convenient method to stop [[RpcEndpoint]].
    */
-  final def stop(): Unit = {
+  def stop(): Unit = {
     val _self = self
     if (_self != null) {
       rpcEnv.stop(_self)
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index f16e43193..1c1d4e7a6 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -20,6 +20,7 @@ license: |
 | Key | Default | Description | Since |
 | --- | ------- | ----------- | ----- |
 | celeborn.client.application.heartbeatInterval | 10s | Interval for client to 
send heartbeat message to master. | 0.3.0 | 
+| celeborn.client.application.unregister.enabled | true | When true, Celeborn 
client will inform celeborn master the application is already shutdown during 
client exit, this allows the cluster to release resources immediately, 
resulting in resource savings. | 0.3.2 | 
 | celeborn.client.closeIdleConnections | true | Whether client will close idle 
connections. | 0.3.0 | 
 | celeborn.client.commitFiles.ignoreExcludedWorker | false | When true, 
LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | 
 | celeborn.client.eagerlyCreateInputStream.threads | 32 | Threads count for 
streamCreatorPool in CelebornShuffleReader. | 0.3.1 | 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 9f2d5d26b..1c03903a7 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -372,7 +372,8 @@ private[celeborn] class Master(
         handleUnregisterShuffle(context, applicationId, shuffleId, requestId))
 
     case ApplicationLost(appId, requestId) =>
-      logDebug(s"Received ApplicationLost request $requestId, $appId.")
+      logDebug(
+        s"Received ApplicationLost request $requestId, $appId from 
${context.senderAddress}.")
       executeWithLeaderChecker(context, handleApplicationLost(context, appId, 
requestId))
 
     case HeartbeatFromWorker(

Reply via email to