This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 0e53a3d55 [CELEBORN-932] Fix worker register after gracefaully restart
0e53a3d55 is described below
commit 0e53a3d55231967e1bbd8e47ab99906836f4f550
Author: onebox-li <[email protected]>
AuthorDate: Mon Sep 11 21:23:28 2023 +0800
[CELEBORN-932] Fix worker register after gracefaully restart
### What changes were proposed in this pull request?
Worker will firstly register failed after worker gracefully restart in HA
mode, it will be really registered after one heartbeat.
<img width="889" alt="image"
src="https://github.com/apache/incubator-celeborn/assets/19429353/371aa0e0-b2e9-4c1f-9e40-276dc1460219">
This is because master here uses same `requestId` to submit request,
causing the second request not be processed correctly, due to Ratis
`RetryCache`.
Master logs like below:
(worker gracefully stop)
Master: Receive ReportNodeFailure
(worker start)
Master: Received RegisterWorker request
Master: Received heartbeat from unknown worker
Master: Registered worker
So here improve AbstractMetaManager#updateRegisterWorkerMeta to cover
`WorkerRemove` logic. For back compatibility and possible inconsistencies
during rolling upgrade, temporarily fix duplicate requestId and keep remove
function. And we can try to remove `WorkerRemove` logic in the future version.
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Cluster test
Closes #1863 from onebox-li/fix-restart-register.
Authored-by: onebox-li <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../service/deploy/master/clustermeta/AbstractMetaManager.java | 5 +++--
.../scala/org/apache/celeborn/service/deploy/master/Master.scala | 4 +++-
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index fbf10c2ab..36e60d89a 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -192,9 +192,10 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
synchronized (workers) {
if (!workers.contains(workerInfo)) {
workers.add(workerInfo);
- shutdownWorkers.remove(workerInfo);
- lostWorkers.remove(workerInfo);
}
+ shutdownWorkers.remove(workerInfo);
+ lostWorkers.remove(workerInfo);
+ excludedWorkers.remove(workerInfo);
}
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 39e030e98..b06d4684c 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -523,7 +523,9 @@ private[celeborn] class Master(
if (workersSnapShot.contains(workerToRegister)) {
logWarning(s"Receive RegisterWorker while worker" +
s" ${workerToRegister.toString()} already exists, re-register.")
+ // TODO: remove `WorkerRemove` because we have improve register logic to
cover `WorkerRemove`
statusSystem.handleWorkerRemove(host, rpcPort, pushPort, fetchPort,
replicatePort, requestId)
+ val newRequestId = MasterClient.genRequestId()
statusSystem.handleRegisterWorker(
host,
rpcPort,
@@ -532,7 +534,7 @@ private[celeborn] class Master(
replicatePort,
disks,
userResourceConsumption,
- requestId)
+ newRequestId)
context.reply(RegisterWorkerResponse(true, "Worker in snapshot,
re-register."))
} else if (statusSystem.workerLostEvents.contains(workerToRegister)) {
logWarning(s"Receive RegisterWorker while worker $workerToRegister " +