This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 7773db9a7 [CELEBORN-668] Report WorkerLost instead of
WorkerUnavailable if grac…
7773db9a7 is described below
commit 7773db9a7ddceb7a6e5cf1f148facb4583aa18de
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Tue Jun 13 11:30:59 2023 +0800
[CELEBORN-668] Report WorkerLost instead of WorkerUnavailable if grac…
…eful is disabled
### What changes were proposed in this pull request?
Worker should report WorkerLost instead of WorkerUnavailable in it's
shutdown hook if graceful shutdown is disabled.
### Why are the changes needed?
To avoid unnecessary commit file requests from lifecycle manager since it's
not graceful shutdown.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Closes #1580 from waitinfuture/668.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 76831e805d764432ba7b55fc4b83736ce126a075)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../celeborn/service/deploy/master/Master.scala | 21 +++++++++++++++++----
.../celeborn/service/deploy/worker/Worker.scala | 20 ++++++++++++++++----
2 files changed, 33 insertions(+), 8 deletions(-)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index a137c6f0b..714b78a11 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -224,7 +224,7 @@ private[celeborn] class Master(
val fetchPort = pb.getFetchPort
val replicatePort = pb.getReplicatePort
val requestId = pb.getRequestId
- logDebug(s"Received worker lost $host:$rpcPort:$pushPort:$fetchPort.")
+ logDebug(s"Received worker lost
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
executeWithLeaderChecker(
null,
handleWorkerLost(null, host, rpcPort, pushPort, fetchPort,
replicatePort, requestId))
@@ -310,7 +310,7 @@ private[celeborn] class Master(
estimatedAppDiskUsage,
requestId) =>
logDebug(s"Received heartbeat from" +
- s" worker $host:$rpcPort:$pushPort:$fetchPort with $disks.")
+ s" worker $host:$rpcPort:$pushPort:$fetchPort:$replicatePort with
$disks.")
executeWithLeaderChecker(
context,
handleHeartbeatFromWorker(
@@ -334,6 +334,18 @@ private[celeborn] class Master(
context,
handleReportNodeUnavailable(context, failedWorkers, requestId))
+ case pb: PbWorkerLost =>
+ val host = pb.getHost
+ val rpcPort = pb.getRpcPort
+ val pushPort = pb.getPushPort
+ val fetchPort = pb.getFetchPort
+ val replicatePort = pb.getReplicatePort
+ val requestId = pb.getRequestId
+ logInfo(s"Received worker lost
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
+ executeWithLeaderChecker(
+ context,
+ handleWorkerLost(context, host, rpcPort, pushPort, fetchPort,
replicatePort, requestId))
+
case CheckQuota(userIdentifier) =>
executeWithLeaderChecker(context, handleCheckQuota(userIdentifier,
context))
}
@@ -420,7 +432,8 @@ private[celeborn] class Master(
val expiredShuffleKeys = new util.HashSet[String]
activeShuffleKeys.asScala.foreach { shuffleKey =>
if (!statusSystem.registeredShuffle.contains(shuffleKey)) {
- logWarning(s"Shuffle $shuffleKey expired on
$host:$rpcPort:$pushPort:$fetchPort.")
+ logWarning(
+ s"Shuffle $shuffleKey expired on
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
expiredShuffleKeys.add(shuffleKey)
}
}
@@ -449,7 +462,7 @@ private[celeborn] class Master(
.find(_ == targetWorker)
.orNull
if (worker == null) {
- logWarning(s"Unknown worker $host:$rpcPort:$pushPort:$fetchPort" +
+ logWarning(s"Unknown worker
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort" +
s" for WorkerLost handler!")
return
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 55cbf737a..361a715ad 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -38,7 +38,7 @@ import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo,
WorkerPartitionLoc
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource,
SystemMiscSource}
import org.apache.celeborn.common.network.TransportContext
-import org.apache.celeborn.common.protocol.{PartitionType,
PbRegisterWorkerResponse, RpcNameConstants, TransportModuleConstants}
+import org.apache.celeborn.common.protocol.{PartitionType,
PbRegisterWorkerResponse, PbWorkerLostResponse, RpcNameConstants,
TransportModuleConstants}
import org.apache.celeborn.common.protocol.message.ControlMessages._
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.rpc._
@@ -538,9 +538,21 @@ private[celeborn] class Worker(
// add this worker to master's blacklist. When restart, register
worker will
// make master remove this worker from blacklist.
try {
- rssHARetryClient.askSync(
- ReportWorkerUnavailable(List(workerInfo).asJava),
- OneWayMessageResponse.getClass)
+ if (gracefulShutdown) {
+ rssHARetryClient.askSync(
+ ReportWorkerUnavailable(List(workerInfo).asJava),
+ OneWayMessageResponse.getClass)
+ } else {
+ rssHARetryClient.askSync[PbWorkerLostResponse](
+ WorkerLost(
+ host,
+ rpcPort,
+ pushPort,
+ fetchPort,
+ replicatePort,
+ RssHARetryClient.genRequestId()),
+ classOf[PbWorkerLostResponse])
+ }
} catch {
case e: Throwable =>
logError(