This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
     new 85b3b2d  [DS-7686][Server]fix restart server after kill force (#7689)
85b3b2d is described below

commit 85b3b2d29d374016dd9a6b2d49153c0371ab8c1f
Author: wind <[email protected]>
AuthorDate: Tue Dec 28 22:34:14 2021 +0800

    [DS-7686][Server]fix restart server after kill force (#7689)
    
    * [DS-7686][Server]fix restart server after kill force
    
    * update registry logic
    
    Co-authored-by: caishunfeng <[email protected]>
---
 .../master/registry/MasterRegistryClient.java      | 21 +++++++++------
 .../worker/registry/WorkerRegistryClient.java      | 30 +++++++++++++---------
 2 files changed, 31 insertions(+), 20 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 425d6e9..2243e90 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -119,14 +119,6 @@ public class MasterRegistryClient {
             registryClient.getLock(nodeLock);
             // master registry
             registry();
-            String registryPath = getMasterPath();
-            
registryClient.handleDeadServer(Collections.singleton(registryPath), 
NodeType.MASTER, Constants.DELETE_OP);
-
-            // init system node
-
-            while (!registryClient.checkNodeExists(NetUtils.getHost(), 
NodeType.MASTER)) {
-                ThreadUtils.sleep(SLEEP_TIME_MILLIS);
-            }
 
             registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new 
MasterRegistryDataListener());
         } catch (Exception e) {
@@ -524,7 +516,20 @@ public class MasterRegistryClient {
                 Constants.MASTER_TYPE,
                 registryClient);
 
+        // remove before persist
+        registryClient.remove(localNodePath);
         registryClient.persistEphemeral(localNodePath, 
heartBeatTask.getHeartBeatInfo());
+
+        while (!registryClient.checkNodeExists(NetUtils.getHost(), 
NodeType.MASTER)) {
+            ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+        }
+
+        // sleep 1s, waiting master failover remove
+        ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+
+        // delete dead server
+        registryClient.handleDeadServer(Collections.singleton(localNodePath), 
NodeType.MASTER, Constants.DELETE_OP);
+
         registryClient.addConnectionStateListener(this::handleConnectionState);
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
         logger.info("master node : {} registry to ZK successfully with 
heartBeatInterval : {}s", address, masterHeartbeatInterval);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 74af482..7f72d1a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -102,8 +102,21 @@ public class WorkerRegistryClient {
         Set<String> workerZkPaths = getWorkerZkPaths();
         int workerHeartbeatInterval = 
workerConfig.getWorkerHeartbeatInterval();
 
+        HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
+                workerConfig.getWorkerMaxCpuloadAvg(),
+                workerConfig.getWorkerReservedMemory(),
+                workerConfig.getHostWeight(),
+                workerZkPaths,
+                Constants.WORKER_TYPE,
+                registryClient,
+                workerConfig.getWorkerExecThreads(),
+                workerManagerThread
+        );
+
         for (String workerZKPath : workerZkPaths) {
-            registryClient.persistEphemeral(workerZKPath, "");
+            // remove before persist
+            registryClient.remove(workerZKPath);
+            registryClient.persistEphemeral(workerZKPath, 
heartBeatTask.getHeartBeatInfo());
             logger.info("worker node : {} registry to ZK {} successfully", 
address, workerZKPath);
         }
 
@@ -111,21 +124,14 @@ public class WorkerRegistryClient {
             ThreadUtils.sleep(SLEEP_TIME_MILLIS);
         }
 
+        // sleep 1s, waiting master failover remove
+        ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+
+        // delete dead server
         this.handleDeadServer(workerZkPaths, NodeType.WORKER, 
Constants.DELETE_OP);
 
         registryClient.addConnectionStateListener(this::handleConnectionState);
 
-        HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                workerConfig.getWorkerMaxCpuloadAvg(),
-                workerConfig.getWorkerReservedMemory(),
-                workerConfig.getHostWeight(),
-                workerZkPaths,
-                Constants.WORKER_TYPE,
-                registryClient,
-                workerConfig.getWorkerExecThreads(),
-                workerManagerThread
-        );
-
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
         logger.info("worker node : {} heartbeat interval {} s", address, 
workerHeartbeatInterval);
     }

Reply via email to