This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 43ba29a  [Improvement-4624] When the server exist in the dead server 
list of zk,need stop service byself (#4626)
43ba29a is described below

commit 43ba29a2dd72a480d582258323ec5d062333fe96
Author: lgcareer <[email protected]>
AuthorDate: Tue Mar 2 14:05:00 2021 +0800

    [Improvement-4624] When the server exist in the dead server list of zk,need 
stop service byself (#4626)
    
    * [Improvement-4624] When the server exist in the dead server list of 
zk,need stop service byself
    
    * [Improvement-4624]fix check style and add MaterRegistryTest
    
    * [Improvement-4624]fix check style and add ZookeeperRegistryCenterTest
    
    * [Improvement-4624]fix check style and add ZookeeperRegistryCenterTest
    
    * [Improvement-4624]add RegisterOperatorTest
    
    * [Improvement-4624]update RegisterOperatorTest
    
    * [Improvement-4624]resolve code smell
    
    * [Improvement-4624]revert LICENSE-@form-create-element-ui
---
 .../server/master/MasterServer.java                |  69 ++++++---
 .../dispatch/host/LowerWeightHostManager.java      |   2 +-
 .../server/master/registry/MasterRegistry.java     |  38 ++---
 .../server/registry/HeartBeatTask.java             |  33 ++++-
 .../server/registry/ZookeeperNodeManager.java      |   4 +-
 .../server/registry/ZookeeperRegistryCenter.java   |  92 +++++++++---
 .../server/worker/WorkerServer.java                |  74 ++++++----
 .../server/worker/registry/WorkerRegistry.java     |  36 +++--
 .../dolphinscheduler/server/zk/ZKMasterClient.java |   3 +
 .../consumer/TaskPriorityQueueConsumerTest.java    |   4 +-
 .../server/master/registry/MasterRegistryTest.java |   7 +-
 .../registry/ZookeeperRegistryCenterTest.java      |  61 ++++++++
 .../worker/processor/TaskCallbackServiceTest.java  |   4 +-
 .../server/worker/registry/WorkerRegistryTest.java |  44 ++++--
 .../service/zk/AbstractZKClient.java               |  83 +----------
 .../service/zk/RegisterOperator.java               | 155 +++++++++++++++++++++
 .../service/zk/ZookeeperCachedOperator.java        |   3 +
 .../service/zk/RegisterOperatorTest.java           | 116 +++++++++++++++
 pom.xml                                            |   2 +
 19 files changed, 619 insertions(+), 211 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 18882a2..c2ea2c4 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -14,9 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.master;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -25,6 +27,7 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
+import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
 import org.apache.dolphinscheduler.server.worker.WorkerServer;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
@@ -42,13 +45,10 @@ import 
org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.FilterType;
 
-
-
-
 @ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
         @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = 
{WorkerServer.class})
 })
-public class MasterServer {
+public class MasterServer implements IStoppable {
 
     /**
      * logger of MasterServer
@@ -62,8 +62,8 @@ public class MasterServer {
     private MasterConfig masterConfig;
 
     /**
-     *  spring application context
-     *  only use it for initialization
+     * spring application context
+     * only use it for initialization
      */
     @Autowired
     private SpringApplicationContext springApplicationContext;
@@ -74,6 +74,12 @@ public class MasterServer {
     private NettyRemotingServer nettyRemotingServer;
 
     /**
+     * master registry
+     */
+    @Autowired
+    private MasterRegistry masterRegistry;
+
+    /**
      * zk master client
      */
     @Autowired
@@ -87,8 +93,9 @@ public class MasterServer {
 
     /**
      * master server startup
-     *
+     * <p>
      * master server not use web service
+     *
      * @param args arguments
      */
     public static void main(String[] args) {
@@ -100,16 +107,23 @@ public class MasterServer {
      * run master server
      */
     @PostConstruct
-    public void run(){
+    public void run() {
+        try {
+            //init remoting server
+            NettyServerConfig serverConfig = new NettyServerConfig();
+            serverConfig.setListenPort(masterConfig.getListenPort());
+            this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+            
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
new TaskResponseProcessor());
+            
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new 
TaskAckProcessor());
+            
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new 
TaskKillResponseProcessor());
+            this.nettyRemotingServer.start();
 
-        //init remoting server
-        NettyServerConfig serverConfig = new NettyServerConfig();
-        serverConfig.setListenPort(masterConfig.getListenPort());
-        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
new TaskResponseProcessor());
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new 
TaskAckProcessor());
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new 
TaskKillResponseProcessor());
-        this.nettyRemotingServer.start();
+            
this.masterRegistry.getZookeeperRegistryCenter().setStoppable(this);
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
 
         // self tolerant
         this.zkMasterClient.start();
@@ -137,7 +151,9 @@ public class MasterServer {
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
-                close("shutdownHook");
+                if (Stopper.isRunning()) {
+                    close("shutdownHook");
+                }
             }
         }));
 
@@ -145,13 +161,14 @@ public class MasterServer {
 
     /**
      * gracefully close
+     *
      * @param cause close cause
      */
     public void close(String cause) {
 
         try {
             //execute only once
-            if(Stopper.isStopped()){
+            if (Stopper.isStopped()) {
                 return;
             }
 
@@ -163,24 +180,32 @@ public class MasterServer {
             try {
                 //thread sleep 3 seconds for thread quietly stop
                 Thread.sleep(3000L);
-            }catch (Exception e){
+            } catch (Exception e) {
                 logger.warn("thread sleep exception ", e);
             }
             //
             this.masterSchedulerService.close();
             this.nettyRemotingServer.close();
+            this.masterRegistry.unRegistry();
             this.zkMasterClient.close();
             //close quartz
-            try{
+            try {
                 QuartzExecutors.getInstance().shutdown();
                 logger.info("Quartz service stopped");
-            }catch (Exception e){
-                logger.warn("Quartz service stopped 
exception:{}",e.getMessage());
+            } catch (Exception e) {
+                logger.warn("Quartz service stopped exception:{}", 
e.getMessage());
             }
+
         } catch (Exception e) {
             logger.error("master server stop exception ", e);
+        } finally {
             System.exit(-1);
         }
     }
+
+    @Override
+    public void stop(String cause) {
+        close(cause);
+    }
 }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 1872ae0..ac7d8b0 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -149,7 +149,7 @@ public class LowerWeightHostManager extends 
CommonHostManager {
                     String workerGroupPath = 
registryCenter.getWorkerGroupPath(workerGroup);
                     Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
                     for(String node : nodes){
-                        String heartbeat = 
registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node);
+                        String heartbeat = 
registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
                         if(StringUtils.isNotEmpty(heartbeat)
                                 && heartbeat.split(COMMA).length == 
Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
                             String[] parts = heartbeat.split(COMMA);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index 37d6e72..b492395 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.registry;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
@@ -24,9 +25,7 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
 
 import java.util.Date;
 import java.util.concurrent.Executors;
@@ -84,30 +83,29 @@ public class MasterRegistry {
     public void registry() {
         String address = NetUtils.getHost();
         String localNodePath = getMasterPath();
-        
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath,
 "");
-        
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new
 ConnectionStateListener() {
-            @Override
-            public void stateChanged(CuratorFramework client, ConnectionState 
newState) {
+        
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, 
"");
+        
zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
+            (client, newState) -> {
                 if (newState == ConnectionState.LOST) {
                     logger.error("master : {} connection lost from zookeeper", 
address);
                 } else if (newState == ConnectionState.RECONNECTED) {
                     logger.info("master : {} reconnected to zookeeper", 
address);
-                    
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath,
 "");
+                    
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, 
"");
                 } else if (newState == ConnectionState.SUSPENDED) {
                     logger.warn("master : {} connection SUSPENDED ", address);
+                    
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, 
"");
                 }
-            }
-        });
+            });
         int masterHeartbeatInterval = 
masterConfig.getMasterHeartbeatInterval();
         HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
                 masterConfig.getMasterReservedMemory(),
                 masterConfig.getMasterMaxCpuloadAvg(),
                 Sets.newHashSet(getMasterPath()),
+                Constants.MASTER_PREFIX,
                 zookeeperRegistryCenter);
 
-        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0, 
masterHeartbeatInterval, TimeUnit.SECONDS);
-        logger.info("master node : {} registry to ZK path {} successfully with 
heartBeatInterval : {}s"
-                , address, localNodePath, masterHeartbeatInterval);
+        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
+        logger.info("master node : {} registry to ZK successfully with 
heartBeatInterval : {}s", address, masterHeartbeatInterval);
     }
 
     /**
@@ -116,16 +114,14 @@ public class MasterRegistry {
     public void unRegistry() {
         String address = getLocalAddress();
         String localNodePath = getMasterPath();
-        heartBeatExecutor.shutdownNow();
-        
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
-        logger.info("master node : {} unRegistry from ZK path {}."
-                , address, localNodePath);
+        zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath);
+        logger.info("master node : {} unRegistry to ZK.", address);
     }
 
     /**
      * get master path
      */
-    private String getMasterPath() {
+    public String getMasterPath() {
         String address = getLocalAddress();
         return this.zookeeperRegistryCenter.getMasterPath() + "/" + address;
     }
@@ -139,4 +135,12 @@ public class MasterRegistry {
 
     }
 
+    /**
+     * get zookeeper registry center
+     * @return ZookeeperRegistryCenter
+     */
+    public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
+        return zookeeperRegistryCenter;
+    }
+
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index b89d851..a12583b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.registry;
 import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 
@@ -29,7 +30,10 @@ import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HeartBeatTask extends Thread {
+/**
+ * Heart beat task
+ */
+public class HeartBeatTask implements Runnable {
 
     private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
 
@@ -37,23 +41,39 @@ public class HeartBeatTask extends Thread {
     private double reservedMemory;
     private double maxCpuloadAvg;
     private Set<String> heartBeatPaths;
+    private String serverType;
     private ZookeeperRegistryCenter zookeeperRegistryCenter;
+    /**
+     * server stop or not
+     */
+    protected IStoppable stoppable = null;
 
     public HeartBeatTask(String startTime,
                          double reservedMemory,
                          double maxCpuloadAvg,
                          Set<String> heartBeatPaths,
+                         String serverType,
                          ZookeeperRegistryCenter zookeeperRegistryCenter) {
         this.startTime = startTime;
         this.reservedMemory = reservedMemory;
         this.maxCpuloadAvg = maxCpuloadAvg;
         this.heartBeatPaths = heartBeatPaths;
         this.zookeeperRegistryCenter = zookeeperRegistryCenter;
+        this.serverType = serverType;
     }
 
     @Override
     public void run() {
         try {
+
+            // check dead or not in zookeeper
+            for (String heartBeatPath : heartBeatPaths) {
+                if (zookeeperRegistryCenter.checkIsDeadServer(heartBeatPath, 
serverType)) {
+                    zookeeperRegistryCenter.getStoppable().stop("i was judged 
to death, release resources and stop myself");
+                    return;
+                }
+            }
+
             double availablePhysicalMemorySize = 
OSUtils.availablePhysicalMemorySize();
             double loadAverage = OSUtils.loadAverage();
 
@@ -79,10 +99,19 @@ public class HeartBeatTask extends Thread {
             builder.append(OSUtils.getProcessID());
 
             for (String heartBeatPath : heartBeatPaths) {
-                
zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, 
builder.toString());
+                
zookeeperRegistryCenter.getRegisterOperator().update(heartBeatPath, 
builder.toString());
             }
         } catch (Throwable ex) {
             logger.error("error write heartbeat info", ex);
         }
     }
+
+    /**
+     * for stop server
+     *
+     * @param serverStoppable server stoppable interface
+     */
+    public void setStoppable(IStoppable serverStoppable) {
+        this.stoppable = serverStoppable;
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index b1a5ede..4dfdb80 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -93,11 +93,11 @@ public class ZookeeperNodeManager implements 
InitializingBean {
         /**
          * init MasterNodeListener listener
          */
-        registryCenter.getZookeeperCachedOperator().addListener(new 
MasterNodeListener());
+        registryCenter.getRegisterOperator().addListener(new 
MasterNodeListener());
         /**
          * init WorkerNodeListener listener
          */
-        registryCenter.getZookeeperCachedOperator().addListener(new 
WorkerGroupNodeListener());
+        registryCenter.getRegisterOperator().addListener(new 
WorkerGroupNodeListener());
     }
 
     /**
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 3ca62be..9017a13 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -17,19 +17,27 @@
 
 package org.apache.dolphinscheduler.server.registry;
 
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
 
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
 /**
- *  zookeeper register center
+ * zookeeper register center
  */
 @Service
 public class ZookeeperRegistryCenter implements InitializingBean {
@@ -38,10 +46,9 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
 
     @Autowired
-    protected ZookeeperCachedOperator zookeeperCachedOperator;
-
+    protected RegisterOperator registerOperator;
     @Autowired
-    private  ZookeeperConfig zookeeperConfig;
+    private ZookeeperConfig zookeeperConfig;
 
     /**
      * nodes namespace
@@ -60,6 +67,8 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     public final String EMPTY = "";
 
+    private IStoppable stoppable;
+
     @Override
     public void afterPropertiesSet() throws Exception {
         NODES = zookeeperConfig.getDsRoot() + "/nodes";
@@ -82,23 +91,22 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
      * init nodes
      */
     private void initNodes() {
-        zookeeperCachedOperator.persist(MASTER_PATH, EMPTY);
-        zookeeperCachedOperator.persist(WORKER_PATH, EMPTY);
+        registerOperator.persist(MASTER_PATH, EMPTY);
+        registerOperator.persist(WORKER_PATH, EMPTY);
     }
 
     /**
      * close
      */
     public void close() {
-        if (isStarted.compareAndSet(true, false)) {
-            if (zookeeperCachedOperator != null) {
-                zookeeperCachedOperator.close();
-            }
+        if (isStarted.compareAndSet(true, false) && registerOperator != null) {
+            registerOperator.close();
         }
     }
 
     /**
      * get master path
+     *
      * @return master path
      */
     public String getMasterPath() {
@@ -107,6 +115,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * get worker path
+     *
      * @return worker path
      */
     public String getWorkerPath() {
@@ -114,7 +123,8 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
     }
 
     /**
-     *  get master nodes directly
+     * get master nodes directly
+     *
      * @return master nodes
      */
     public Set<String> getMasterNodesDirectly() {
@@ -123,7 +133,8 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
     }
 
     /**
-     *  get worker nodes directly
+     * get worker nodes directly
+     *
      * @return master nodes
      */
     public Set<String> getWorkerNodesDirectly() {
@@ -133,6 +144,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * get worker group directly
+     *
      * @return worker group nodes
      */
     public Set<String> getWorkerGroupDirectly() {
@@ -142,6 +154,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * get worker group nodes
+     *
      * @param workerGroup
      * @return
      */
@@ -152,6 +165,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * whether worker path
+     *
      * @param path path
      * @return result
      */
@@ -161,6 +175,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * whether master path
+     *
      * @param path path
      * @return result
      */
@@ -170,6 +185,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * get worker group path
+     *
      * @param workerGroup workerGroup
      * @return worker group path
      */
@@ -179,19 +195,53 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * get children nodes
+     *
      * @param key key
      * @return children nodes
      */
     public List<String> getChildrenKeys(final String key) {
-        return zookeeperCachedOperator.getChildrenKeys(key);
+        return registerOperator.getChildrenKeys(key);
+    }
+
+    /**
+     * @return get dead server node parent path
+     */
+    public String getDeadZNodeParentPath() {
+        return registerOperator.getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
+    }
+
+    public void setStoppable(IStoppable stoppable) {
+        this.stoppable = stoppable;
+    }
+
+    public IStoppable getStoppable() {
+        return stoppable;
     }
 
     /**
-     * get zookeeperCachedOperator
-     * @return zookeeperCachedOperator
+     * check dead server or not , if dead, stop self
+     *
+     * @param zNode      node path
+     * @param serverType master or worker prefix
+     * @return true if not exists
+     * @throws Exception errors
      */
-    public ZookeeperCachedOperator getZookeeperCachedOperator() {
-        return zookeeperCachedOperator;
+    protected boolean checkIsDeadServer(String zNode, String serverType) 
throws Exception {
+        //ip_sequenceno
+        String[] zNodesPath = zNode.split("\\/");
+        String ipSeqNo = zNodesPath[zNodesPath.length - 1];
+
+        String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : 
WORKER_PREFIX;
+        String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type 
+ UNDERLINE + ipSeqNo;
+
+        if (!registerOperator.isExisted(zNode) || 
registerOperator.isExisted(deadServerPath)) {
+            return true;
+        }
+
+        return false;
     }
 
+    public RegisterOperator getRegisterOperator() {
+        return registerOperator;
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index a267b5b..10880bf 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -18,6 +18,8 @@
 package org.apache.dolphinscheduler.server.worker;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -33,20 +35,24 @@ import 
org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
+import java.util.Set;
+
 import javax.annotation.PostConstruct;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.springframework.beans.factory.annotation.Autowired;
+
 import org.springframework.boot.WebApplicationType;
 import org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.context.annotation.ComponentScan;
 
 /**
- *  worker server
+ * worker server
  */
 @ComponentScan("org.apache.dolphinscheduler")
-public class WorkerServer {
+public class WorkerServer implements IStoppable {
 
     /**
      * logger
@@ -54,31 +60,31 @@ public class WorkerServer {
     private static final Logger logger = 
LoggerFactory.getLogger(WorkerServer.class);
 
     /**
-     *  netty remote server
+     * netty remote server
      */
     private NettyRemotingServer nettyRemotingServer;
 
     /**
-     *  worker registry
+     * worker registry
      */
     @Autowired
     private WorkerRegistry workerRegistry;
 
     /**
-     *  worker config
+     * worker config
      */
     @Autowired
     private WorkerConfig workerConfig;
 
     /**
-     *  spring application context
-     *  only use it for initialization
+     * spring application context
+     * only use it for initialization
      */
     @Autowired
     private SpringApplicationContext springApplicationContext;
 
     /**
-     *  alert model netty remote server
+     * alert model netty remote server
      */
     private AlertClientService alertClientService;
 
@@ -105,24 +111,31 @@ public class WorkerServer {
      */
     @PostConstruct
     public void run() {
-        logger.info("start worker server...");
-
-        //alert-server client registry
-        alertClientService = new 
AlertClientService(workerConfig.getAlertListenHost(),Constants.ALERT_RPC_PORT);
-
-        //init remoting server
-        NettyServerConfig serverConfig = new NettyServerConfig();
-        serverConfig.setListenPort(workerConfig.getListenPort());
-        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, 
new TaskExecuteProcessor(alertClientService));
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new 
TaskKillProcessor());
-        this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, 
new DBTaskAckProcessor());
-        
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new 
DBTaskResponseProcessor());
-        this.nettyRemotingServer.start();
-
-        // worker registry
-        this.workerRegistry.registry();
-
+        try {
+            logger.info("start worker server...");
+
+            //init remoting server
+            NettyServerConfig serverConfig = new NettyServerConfig();
+            serverConfig.setListenPort(workerConfig.getListenPort());
+            this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+            
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, 
new TaskExecuteProcessor());
+            
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new 
TaskKillProcessor());
+            
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new 
DBTaskAckProcessor());
+            
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new 
DBTaskResponseProcessor());
+            this.nettyRemotingServer.start();
+
+            
this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this);
+            Set<String> workerZkPaths = this.workerRegistry.getWorkerZkPaths();
+            
this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths,
 ZKNodeType.WORKER, Constants.DELETE_ZK_OP);
+            // worker registry
+            this.workerRegistry.registry();
+
+            // retry report task status
+            this.retryReportTaskStatusThread.start();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
         // task execute manager
         this.workerManagerThread.start();
 
@@ -135,7 +148,9 @@ public class WorkerServer {
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
-                close("shutdownHook");
+                if (Stopper.isRunning()) {
+                    close("shutdownHook");
+                }
             }
         }));
     }
@@ -167,8 +182,13 @@ public class WorkerServer {
 
         } catch (Exception e) {
             logger.error("worker server stop exception ", e);
+        } finally {
             System.exit(-1);
         }
     }
 
+    @Override
+    public void stop(String cause) {
+        close(cause);
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 3d4d73f..b763497 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.registry;
 import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
 import static org.apache.dolphinscheduler.common.Constants.SLASH;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -29,9 +30,7 @@ import 
org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
 
 import java.util.Date;
 import java.util.Set;
@@ -90,6 +89,14 @@ public class WorkerRegistry {
     }
 
     /**
+     * get zookeeper registry center
+     * @return ZookeeperRegistryCenter
+     */
+    public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
+        return zookeeperRegistryCenter;
+    }
+
+    /**
      * registry
      */
     public void registry() {
@@ -98,28 +105,27 @@ public class WorkerRegistry {
         int workerHeartbeatInterval = 
workerConfig.getWorkerHeartbeatInterval();
 
         for (String workerZKPath : workerZkPaths) {
-            
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath,
 "");
-            
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new
 ConnectionStateListener() {
-                @Override
-                public void stateChanged(CuratorFramework client, 
ConnectionState newState) {
+            
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, 
"");
+            
zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
+                (client,newState) -> {
                     if (newState == ConnectionState.LOST) {
                         logger.error("worker : {} connection lost from 
zookeeper", address);
                     } else if (newState == ConnectionState.RECONNECTED) {
                         logger.info("worker : {} reconnected to zookeeper", 
address);
-                        
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath,
 "");
+                        
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, 
"");
                     } else if (newState == ConnectionState.SUSPENDED) {
                         logger.warn("worker : {} connection SUSPENDED ", 
address);
                     }
-                }
-            });
+                });
             logger.info("worker node : {} registry to ZK {} successfully", 
address, workerZKPath);
         }
 
         HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime,
-            this.workerConfig.getWorkerReservedMemory(),
-            this.workerConfig.getWorkerMaxCpuloadAvg(),
-            workerZkPaths,
-            this.zookeeperRegistryCenter);
+                this.workerConfig.getWorkerReservedMemory(),
+                this.workerConfig.getWorkerMaxCpuloadAvg(),
+                workerZkPaths,
+                Constants.WORKER_PREFIX,
+                this.zookeeperRegistryCenter);
 
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
         logger.info("worker node : {} heartbeat interval {} s", address, 
workerHeartbeatInterval);
@@ -132,7 +138,7 @@ public class WorkerRegistry {
         String address = getLocalAddress();
         Set<String> workerZkPaths = getWorkerZkPaths();
         for (String workerZkPath : workerZkPaths) {
-            
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath);
+            zookeeperRegistryCenter.getRegisterOperator().remove(workerZkPath);
             logger.info("worker node : {} unRegistry from ZK {}.", address, 
workerZkPath);
         }
         this.heartBeatExecutor.shutdownNow();
@@ -141,7 +147,7 @@ public class WorkerRegistry {
     /**
      * get worker path
      */
-    private Set<String> getWorkerZkPaths() {
+    public Set<String> getWorkerZkPaths() {
         Set<String> workerZkPaths = Sets.newHashSet();
 
         String address = getLocalAddress();
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index 1f0926b..28b752a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -85,6 +85,9 @@ public class ZKMasterClient extends AbstractZKClient {
             //  Master registry
             masterRegistry.registry();
 
+            String registPath = this.masterRegistry.getMasterPath();
+            
masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registPath,
 ZKNodeType.MASTER, Constants.DELETE_ZK_OP);
+
             // init system znode
             this.initSystemZNode();
 
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 8c2321d..74cd2da 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -45,7 +45,7 @@ import 
org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
 
 import java.util.ArrayList;
@@ -67,7 +67,7 @@ import 
org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration(classes = {DependencyConfig.class, 
SpringApplicationContext.class, SpringZKServer.class, 
CuratorZookeeperClient.class,
         NettyExecutorManager.class, ExecutorDispatcher.class, 
ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
-        ZookeeperNodeManager.class, ZookeeperCachedOperator.class, 
ZookeeperConfig.class, MasterConfig.class,
+        ZookeeperNodeManager.class, RegisterOperator.class, 
ZookeeperConfig.class, MasterConfig.class,
         CuratorZookeeperClient.class})
 public class TaskPriorityQueueConsumerTest {
 
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
index a180f51..9b62473 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.registry;
 
 import static 
org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
 
-import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.remote.utils.Constants;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@@ -60,8 +59,8 @@ public class MasterRegistryTest {
         masterRegistry.registry();
         String masterPath = zookeeperRegistryCenter.getMasterPath();
         TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); 
//wait heartbeat info write into zk node
-        String masterNodePath = masterPath + "/" + 
(NetUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort()));
-        String heartbeat = 
zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath);
+        String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + 
":" + masterConfig.getListenPort());
+        String heartbeat = 
zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath);
         Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, 
heartbeat.split(",").length);
         masterRegistry.unRegistry();
     }
@@ -73,7 +72,7 @@ public class MasterRegistryTest {
         TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); 
//wait heartbeat info write into zk node
         masterRegistry.unRegistry();
         String masterPath = zookeeperRegistryCenter.getMasterPath();
-        List<String> childrenKeys = 
zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(masterPath);
+        List<String> childrenKeys = 
zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(masterPath);
         Assert.assertTrue(childrenKeys.isEmpty());
     }
 }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
new file mode 100644
index 0000000..24bb25c
--- /dev/null
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.registry;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+/**
+ * zookeeper registry center test
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ZookeeperRegistryCenterTest {
+
+    @InjectMocks
+    private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+    @Mock
+    protected RegisterOperator registerOperator;
+
+    @Mock
+    private ZookeeperConfig zookeeperConfig;
+
+    private static final String DS_ROOT = "/dolphinscheduler";
+
+    @Test
+    public void testGetDeadZNodeParentPath() {
+        ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
+        zookeeperConfig.setDsRoot(DS_ROOT);
+        
Mockito.when(registerOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
+
+        String deadZNodeParentPath = 
zookeeperRegistryCenter.getDeadZNodeParentPath();
+
+        Assert.assertEquals(deadZNodeParentPath, DS_ROOT + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS);
+
+    }
+
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index 8938f49..bf04f1f 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -43,7 +43,7 @@ import 
org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.server.zk.SpringZKServer;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
 
 import java.util.Date;
@@ -71,7 +71,7 @@ import io.netty.channel.Channel;
     ZookeeperRegistryCenter.class,
     MasterConfig.class,
     WorkerConfig.class,
-    ZookeeperCachedOperator.class,
+    RegisterOperator.class,
     ZookeeperConfig.class,
     ZookeeperNodeManager.class,
     TaskCallbackService.class,
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
index a71e480..d7066c0 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
@@ -19,18 +19,20 @@ package org.apache.dolphinscheduler.server.worker.registry;
 
 import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
 
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
-import org.apache.curator.framework.imps.CuratorFrameworkImpl;
-import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
+
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionStateListener;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,7 +63,7 @@ public class WorkerRegistryTest {
     private ZookeeperRegistryCenter zookeeperRegistryCenter;
 
     @Mock
-    private ZookeeperCachedOperator zookeeperCachedOperator;
+    private RegisterOperator registerOperator;
 
     @Mock
     private CuratorFrameworkImpl zkClient;
@@ -69,15 +71,21 @@ public class WorkerRegistryTest {
     @Mock
     private WorkerConfig workerConfig;
 
+    private static final Set<String> workerGroups;
+
+    static {
+        workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, 
TEST_WORKER_GROUP);
+    }
+
     @Before
     public void before() {
-        Set<String> workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, 
TEST_WORKER_GROUP);
+
         Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups);
 
         
Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker");
-        
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator()).thenReturn(zookeeperCachedOperator);
-        
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient()).thenReturn(zkClient);
-        
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable()).thenReturn(
+        
Mockito.when(zookeeperRegistryCenter.getRegisterOperator()).thenReturn(registerOperator);
+        
Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient()).thenReturn(zkClient);
+        
Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable()).thenReturn(
                 new Listenable<ConnectionStateListener>() {
                     @Override
                     public void addListener(ConnectionStateListener 
connectionStateListener) {
@@ -114,7 +122,7 @@ public class WorkerRegistryTest {
         int i = 0;
         for (String workerGroup : workerConfig.getWorkerGroups()) {
             String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" 
+ (NetUtils.getAddr(workerConfig.getListenPort()));
-            String heartbeat = 
zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath);
+            String heartbeat = 
zookeeperRegistryCenter.getRegisterOperator().get(workerZkPath);
             if (0 == i) {
                 
Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/"));
             } else {
@@ -156,7 +164,7 @@ public class WorkerRegistryTest {
 
         for (String workerGroup : workerConfig.getWorkerGroups()) {
             String workerGroupPath = workerPath + "/" + workerGroup.trim();
-            List<String> childrenKeys = 
zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath);
+            List<String> childrenKeys = 
zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(workerGroupPath);
             Assert.assertTrue(childrenKeys.isEmpty());
         }
 
@@ -168,4 +176,10 @@ public class WorkerRegistryTest {
 
         workerRegistry.unRegistry();
     }
+
+    @Test
+    public void testGetWorkerZkPaths() {
+        workerRegistry.init();
+        
Assert.assertEquals(workerGroups.size(),workerRegistry.getWorkerZkPaths().size());
+    }
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 37d8f10..7cdf680 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -17,14 +17,8 @@
 
 package org.apache.dolphinscheduler.service.zk;
 
-import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
 import static org.apache.dolphinscheduler.common.Constants.COLON;
-import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
 import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
-import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
-import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
-import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ZKNodeType;
@@ -47,58 +41,11 @@ import org.springframework.stereotype.Component;
  * abstract zookeeper client
  */
 @Component
-public abstract class AbstractZKClient extends ZookeeperCachedOperator {
+public abstract class AbstractZKClient extends RegisterOperator {
 
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractZKClient.class);
 
     /**
-     * remove dead server by host
-     *
-     * @param host host
-     * @param serverType serverType
-     */
-    public void removeDeadServerByHost(String host, String serverType) {
-        List<String> deadServers = 
super.getChildrenKeys(getDeadZNodeParentPath());
-        for (String serverPath : deadServers) {
-            if (serverPath.startsWith(serverType + UNDERLINE + host)) {
-                String server = getDeadZNodeParentPath() + SINGLE_SLASH + 
serverPath;
-                super.remove(server);
-                logger.info("{} server {} deleted from zk dead server path 
success", serverType, host);
-            }
-        }
-    }
-
-    /**
-     * opType(add): if find dead server , then add to zk deadServerPath
-     * opType(delete): delete path from zk
-     *
-     * @param zNode node path
-     * @param zkNodeType master or worker
-     * @param opType delete or add
-     */
-    public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String 
opType) {
-        String host = getHostByEventDataPath(zNode);
-        String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : 
WORKER_PREFIX;
-
-        //check server restart, if restart , dead server path in zk should be 
delete
-        if (opType.equals(DELETE_ZK_OP)) {
-            removeDeadServerByHost(host, type);
-
-        } else if (opType.equals(ADD_ZK_OP)) {
-            String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + 
type + UNDERLINE + host;
-            if (!super.isExisted(deadServerPath)) {
-                //add dead server info to zk dead server path : /dead-servers/
-
-                super.persist(deadServerPath, (type + UNDERLINE + host));
-
-                logger.info("{} server dead , and {} added to zk dead server 
path success",
-                        zkNodeType, zNode);
-            }
-        }
-
-    }
-
-    /**
      * get active master num
      *
      * @return active master number
@@ -187,7 +134,7 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
     /**
      * check the zookeeper node already exists
      *
-     * @param host host
+     * @param host       host
      * @param zkNodeType zookeeper node type
      * @return true if exists
      */
@@ -247,12 +194,6 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
         return path;
     }
 
-    /**
-     * @return get dead server node parent path
-     */
-    protected String getDeadZNodeParentPath() {
-        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
-    }
 
     /**
      * @return get master start up lock path
@@ -310,26 +251,6 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
         }
     }
 
-    /**
-     * get host ip, string format: masterParentPath/ip
-     *
-     * @param path path
-     * @return host ip, string format: masterParentPath/ip
-     */
-    protected String getHostByEventDataPath(String path) {
-        if (StringUtils.isEmpty(path)) {
-            logger.error("empty path!");
-            return "";
-        }
-        String[] pathArray = path.split(SINGLE_SLASH);
-        if (pathArray.length < 1) {
-            logger.error("parse ip error: {}", path);
-            return "";
-        }
-        return pathArray[pathArray.length - 1];
-
-    }
-
     @Override
     public String toString() {
         return "AbstractZKClient{"
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
new file mode 100644
index 0000000..0fd4a4f
--- /dev/null
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.zk;
+
+import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+
+import java.util.List;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * register operator
+ */
+@Component
+public class RegisterOperator extends ZookeeperCachedOperator {
+
+    private final Logger logger = 
LoggerFactory.getLogger(RegisterOperator.class);
+
+    /**
+     * @return get dead server node parent path
+     */
+    protected String getDeadZNodeParentPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
+    }
+
+    /**
+     * remove dead server by host
+     *
+     * @param host       host
+     * @param serverType serverType
+     * @throws Exception
+     */
+    public void removeDeadServerByHost(String host, String serverType) throws 
Exception {
+        List<String> deadServers = 
super.getChildrenKeys(getDeadZNodeParentPath());
+        for (String serverPath : deadServers) {
+            if (serverPath.startsWith(serverType + UNDERLINE + host)) {
+                String server = getDeadZNodeParentPath() + SINGLE_SLASH + 
serverPath;
+                super.remove(server);
+                logger.info("{} server {} deleted from zk dead server path 
success", serverType, host);
+            }
+        }
+    }
+
+    /**
+     * get host ip, string format: masterParentPath/ip
+     *
+     * @param path path
+     * @return host ip, string format: masterParentPath/ip
+     */
+    protected String getHostByEventDataPath(String path) {
+        if (StringUtils.isEmpty(path)) {
+            logger.error("empty path!");
+            return "";
+        }
+        String[] pathArray = path.split(SINGLE_SLASH);
+        if (pathArray.length < 1) {
+            logger.error("parse ip error: {}", path);
+            return "";
+        }
+        return pathArray[pathArray.length - 1];
+
+    }
+
+    /**
+     * opType(add): if find dead server , then add to zk deadServerPath
+     * opType(delete): delete path from zk
+     *
+     * @param zNode      node path
+     * @param zkNodeType master or worker
+     * @param opType     delete or add
+     * @throws Exception errors
+     */
+    public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String 
opType) throws Exception {
+        String host = getHostByEventDataPath(zNode);
+        String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : 
WORKER_PREFIX;
+
+        //check server restart, if restart , dead server path in zk should be 
delete
+        if (opType.equals(DELETE_ZK_OP)) {
+            removeDeadServerByHost(host, type);
+
+        } else if (opType.equals(ADD_ZK_OP)) {
+            String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + 
type + UNDERLINE + host;
+            if (!super.isExisted(deadServerPath)) {
+                //add dead server info to zk dead server path : /dead-servers/
+
+                super.persist(deadServerPath, (type + UNDERLINE + host));
+
+                logger.info("{} server dead , and {} added to zk dead server 
path success",
+                        zkNodeType, zNode);
+            }
+        }
+
+    }
+
+    /**
+     * opType(add): if find dead server , then add to zk deadServerPath
+     * opType(delete): delete path from zk
+     *
+     * @param zNodeSet   node path set
+     * @param zkNodeType master or worker
+     * @param opType     delete or add
+     * @throws Exception errors
+     */
+    public void handleDeadServer(Set<String> zNodeSet, ZKNodeType zkNodeType, 
String opType) throws Exception {
+
+        String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : 
WORKER_PREFIX;
+        for (String zNode : zNodeSet) {
+            String host = getHostByEventDataPath(zNode);
+            //check server restart, if restart , dead server path in zk should 
be delete
+            if (opType.equals(DELETE_ZK_OP)) {
+                removeDeadServerByHost(host, type);
+
+            } else if (opType.equals(ADD_ZK_OP)) {
+                String deadServerPath = getDeadZNodeParentPath() + 
SINGLE_SLASH + type + UNDERLINE + host;
+                if (!super.isExisted(deadServerPath)) {
+                    //add dead server info to zk dead server path : 
/dead-servers/
+
+                    super.persist(deadServerPath, (type + UNDERLINE + host));
+
+                    logger.info("{} server dead , and {} added to zk dead 
server path success",
+                            zkNodeType, zNode);
+                }
+            }
+
+        }
+
+    }
+}
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
index 88c339b..54913cf 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
@@ -32,6 +32,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
+/**
+ * zookeeper cache operator
+ */
 @Component
 public class ZookeeperCachedOperator extends ZookeeperOperator {
 
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
new file mode 100644
index 0000000..f828c07
--- /dev/null
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.zk;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+/**
+ * register operator test
+ */
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class RegisterOperatorTest {
+
+    private static ZKServer zkServer;
+
+    @InjectMocks
+    private RegisterOperator registerOperator;
+
+    @Mock
+    private ZookeeperConfig zookeeperConfig;
+
+    private static final String DS_ROOT = "/dolphinscheduler";
+    private static final String MASTER_NODE = "127.0.0.1:5678";
+
+    @Before
+    public void before() {
+        new Thread(() -> {
+            if (zkServer == null) {
+                zkServer = new ZKServer();
+            }
+            zkServer.startLocalZkServer(2185);
+        }).start();
+    }
+
+    @Test
+    public void testAfterPropertiesSet() throws Exception {
+        TimeUnit.SECONDS.sleep(10);
+        
Mockito.when(zookeeperConfig.getServerList()).thenReturn("127.0.0.1:2185");
+        Mockito.when(zookeeperConfig.getBaseSleepTimeMs()).thenReturn(100);
+        Mockito.when(zookeeperConfig.getMaxRetries()).thenReturn(10);
+        Mockito.when(zookeeperConfig.getMaxSleepMs()).thenReturn(30000);
+        Mockito.when(zookeeperConfig.getSessionTimeoutMs()).thenReturn(60000);
+        
Mockito.when(zookeeperConfig.getConnectionTimeoutMs()).thenReturn(30000);
+        Mockito.when(zookeeperConfig.getDigest()).thenReturn("");
+        Mockito.when(zookeeperConfig.getDsRoot()).thenReturn(DS_ROOT);
+        Mockito.when(zookeeperConfig.getMaxWaitTime()).thenReturn(30000);
+
+        registerOperator.afterPropertiesSet();
+        Assert.assertNotNull(registerOperator.getZkClient());
+    }
+
+    @After
+    public void after() {
+        if (zkServer != null) {
+            zkServer.stop();
+        }
+    }
+
+    @Test
+    public void testGetDeadZNodeParentPath() throws Exception {
+
+        testAfterPropertiesSet();
+        String path = registerOperator.getDeadZNodeParentPath();
+
+        Assert.assertEquals(DS_ROOT + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS, path);
+    }
+
+    @Test
+    public void testHandleDeadServer() throws Exception {
+        testAfterPropertiesSet();
+        registerOperator.handleDeadServer(MASTER_NODE, 
ZKNodeType.MASTER,Constants.ADD_ZK_OP);
+        String path = registerOperator.getDeadZNodeParentPath();
+        
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
+
+    }
+
+    @Test
+    public void testRemoveDeadServerByHost() throws Exception {
+        testAfterPropertiesSet();
+        String path = registerOperator.getDeadZNodeParentPath();
+
+        registerOperator.handleDeadServer(MASTER_NODE, 
ZKNodeType.MASTER,Constants.ADD_ZK_OP);
+        
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
+
+        
registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_PREFIX);
+        
Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
+    }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index e69ff37..a07fde2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -930,6 +930,7 @@
                         
<include>**/server/master/processor/TaskKillResponseProcessorTest.java</include>
                         
<include>**/server/master/processor/queue/TaskResponseServiceTest.java</include>
                         
<include>**/server/register/ZookeeperNodeManagerTest.java</include>
+                        
<include>**/server/register/ZookeeperRegistryCenterTest.java</include>
                         <include>**/server/utils/DataxUtilsTest.java</include>
                         
<include>**/server/utils/ExecutionContextTestUtils.java</include>
                         <include>**/server/utils/HostTest.java</include>
@@ -961,6 +962,7 @@
                         
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
                         <include>**/service/zk/ZKServerTest.java</include>
                         
<include>**/service/zk/CuratorZookeeperClientTest.java</include>
+                        
<include>**/service/zk/RegisterOperatorTest.java</include>
                         
<include>**/service/queue/TaskUpdateQueueTest.java</include>
                         
<include>**/service/queue/PeerTaskInstancePriorityQueueTest.java</include>
 

Reply via email to