This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 7773db9a7 [CELEBORN-668] Report WorkerLost instead of 
WorkerUnavailable if grac…
7773db9a7 is described below

commit 7773db9a7ddceb7a6e5cf1f148facb4583aa18de
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Tue Jun 13 11:30:59 2023 +0800

    [CELEBORN-668] Report WorkerLost instead of WorkerUnavailable if grac…
    
    …eful is disabled
    
    ### What changes were proposed in this pull request?
    Worker should report WorkerLost instead of WorkerUnavailable in it's 
shutdown hook if graceful shutdown is disabled.
    
    ### Why are the changes needed?
    To avoid unnecessary commit file requests from lifecycle manager since it's 
not graceful shutdown.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    Closes #1580 from waitinfuture/668.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 76831e805d764432ba7b55fc4b83736ce126a075)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../celeborn/service/deploy/master/Master.scala     | 21 +++++++++++++++++----
 .../celeborn/service/deploy/worker/Worker.scala     | 20 ++++++++++++++++----
 2 files changed, 33 insertions(+), 8 deletions(-)

diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index a137c6f0b..714b78a11 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -224,7 +224,7 @@ private[celeborn] class Master(
       val fetchPort = pb.getFetchPort
       val replicatePort = pb.getReplicatePort
       val requestId = pb.getRequestId
-      logDebug(s"Received worker lost $host:$rpcPort:$pushPort:$fetchPort.")
+      logDebug(s"Received worker lost 
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
       executeWithLeaderChecker(
         null,
         handleWorkerLost(null, host, rpcPort, pushPort, fetchPort, 
replicatePort, requestId))
@@ -310,7 +310,7 @@ private[celeborn] class Master(
           estimatedAppDiskUsage,
           requestId) =>
       logDebug(s"Received heartbeat from" +
-        s" worker $host:$rpcPort:$pushPort:$fetchPort with $disks.")
+        s" worker $host:$rpcPort:$pushPort:$fetchPort:$replicatePort with 
$disks.")
       executeWithLeaderChecker(
         context,
         handleHeartbeatFromWorker(
@@ -334,6 +334,18 @@ private[celeborn] class Master(
         context,
         handleReportNodeUnavailable(context, failedWorkers, requestId))
 
+    case pb: PbWorkerLost =>
+      val host = pb.getHost
+      val rpcPort = pb.getRpcPort
+      val pushPort = pb.getPushPort
+      val fetchPort = pb.getFetchPort
+      val replicatePort = pb.getReplicatePort
+      val requestId = pb.getRequestId
+      logInfo(s"Received worker lost 
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
+      executeWithLeaderChecker(
+        context,
+        handleWorkerLost(context, host, rpcPort, pushPort, fetchPort, 
replicatePort, requestId))
+
     case CheckQuota(userIdentifier) =>
       executeWithLeaderChecker(context, handleCheckQuota(userIdentifier, 
context))
   }
@@ -420,7 +432,8 @@ private[celeborn] class Master(
     val expiredShuffleKeys = new util.HashSet[String]
     activeShuffleKeys.asScala.foreach { shuffleKey =>
       if (!statusSystem.registeredShuffle.contains(shuffleKey)) {
-        logWarning(s"Shuffle $shuffleKey expired on 
$host:$rpcPort:$pushPort:$fetchPort.")
+        logWarning(
+          s"Shuffle $shuffleKey expired on 
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
         expiredShuffleKeys.add(shuffleKey)
       }
     }
@@ -449,7 +462,7 @@ private[celeborn] class Master(
       .find(_ == targetWorker)
       .orNull
     if (worker == null) {
-      logWarning(s"Unknown worker $host:$rpcPort:$pushPort:$fetchPort" +
+      logWarning(s"Unknown worker 
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort" +
         s" for WorkerLost handler!")
       return
     }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 55cbf737a..361a715ad 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -38,7 +38,7 @@ import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, 
WorkerPartitionLoc
 import org.apache.celeborn.common.metrics.MetricsSystem
 import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, 
SystemMiscSource}
 import org.apache.celeborn.common.network.TransportContext
-import org.apache.celeborn.common.protocol.{PartitionType, 
PbRegisterWorkerResponse, RpcNameConstants, TransportModuleConstants}
+import org.apache.celeborn.common.protocol.{PartitionType, 
PbRegisterWorkerResponse, PbWorkerLostResponse, RpcNameConstants, 
TransportModuleConstants}
 import org.apache.celeborn.common.protocol.message.ControlMessages._
 import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.rpc._
@@ -538,9 +538,21 @@ private[celeborn] class Worker(
         // add this worker to master's blacklist. When restart, register 
worker will
         // make master remove this worker from blacklist.
         try {
-          rssHARetryClient.askSync(
-            ReportWorkerUnavailable(List(workerInfo).asJava),
-            OneWayMessageResponse.getClass)
+          if (gracefulShutdown) {
+            rssHARetryClient.askSync(
+              ReportWorkerUnavailable(List(workerInfo).asJava),
+              OneWayMessageResponse.getClass)
+          } else {
+            rssHARetryClient.askSync[PbWorkerLostResponse](
+              WorkerLost(
+                host,
+                rpcPort,
+                pushPort,
+                fetchPort,
+                replicatePort,
+                RssHARetryClient.genRequestId()),
+              classOf[PbWorkerLostResponse])
+          }
         } catch {
           case e: Throwable =>
             logError(

Reply via email to