This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 94d7c83ba [CELEBORN-932] Fix worker register after gracefaully restart
94d7c83ba is described below

commit 94d7c83ba00d9bd5d4e1a7c6fb8455c13eab5ae9
Author: onebox-li <[email protected]>
AuthorDate: Mon Sep 11 21:23:28 2023 +0800

    [CELEBORN-932] Fix worker register after gracefaully restart
    
    ### What changes were proposed in this pull request?
    Worker will firstly register failed after worker gracefully restart in HA 
mode, it will be really registered after one heartbeat.
    <img width="889" alt="image" 
src="https://github.com/apache/incubator-celeborn/assets/19429353/371aa0e0-b2e9-4c1f-9e40-276dc1460219";>
    This is because master here uses same `requestId` to submit request,  
causing the second request not be processed correctly, due to Ratis 
`RetryCache`.
    Master logs like below:
    (worker gracefully stop)
    Master: Receive ReportNodeFailure
    (worker start)
    Master: Received RegisterWorker request
    Master: Received heartbeat from unknown worker
    Master: Registered worker
    
    So here improve AbstractMetaManager#updateRegisterWorkerMeta to cover 
`WorkerRemove` logic. For back compatibility and possible inconsistencies 
during rolling upgrade, temporarily fix duplicate requestId and keep remove 
function. And we can try to remove `WorkerRemove` logic in the future version.
    
    ### Why are the changes needed?
    Ditto
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Cluster test
    
    Closes #1863 from onebox-li/fix-restart-register.
    
    Authored-by: onebox-li <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 0e53a3d55231967e1bbd8e47ab99906836f4f550)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../service/deploy/master/clustermeta/AbstractMetaManager.java       | 5 +++--
 .../scala/org/apache/celeborn/service/deploy/master/Master.scala     | 4 +++-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index fbf10c2ab..36e60d89a 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -192,9 +192,10 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     synchronized (workers) {
       if (!workers.contains(workerInfo)) {
         workers.add(workerInfo);
-        shutdownWorkers.remove(workerInfo);
-        lostWorkers.remove(workerInfo);
       }
+      shutdownWorkers.remove(workerInfo);
+      lostWorkers.remove(workerInfo);
+      excludedWorkers.remove(workerInfo);
     }
   }
 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 3bdb2fdbb..3ecd8e92a 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -526,7 +526,9 @@ private[celeborn] class Master(
     if (workersSnapShot.contains(workerToRegister)) {
       logWarning(s"Receive RegisterWorker while worker" +
         s" ${workerToRegister.toString()} already exists, re-register.")
+      // TODO: remove `WorkerRemove` because we have improve register logic to 
cover `WorkerRemove`
       statusSystem.handleWorkerRemove(host, rpcPort, pushPort, fetchPort, 
replicatePort, requestId)
+      val newRequestId = MasterClient.genRequestId()
       statusSystem.handleRegisterWorker(
         host,
         rpcPort,
@@ -535,7 +537,7 @@ private[celeborn] class Master(
         replicatePort,
         disks,
         userResourceConsumption,
-        requestId)
+        newRequestId)
       context.reply(RegisterWorkerResponse(true, "Worker in snapshot, 
re-register."))
     } else if (statusSystem.workerLostEvents.contains(workerToRegister)) {
       logWarning(s"Receive RegisterWorker while worker $workerToRegister " +

Reply via email to