This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch 1.3.6-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/1.3.6-prepare by this push:
     new 7100247  [1.3.6-Prepare][Improvement-4624][cherry pick] When the 
server exist in the dead server list of zk,need stop service byself (#5013)
7100247 is described below

commit 710024719e2353b491801bdb6a2daa2ce8f21ed7
Author: lgcareer <[email protected]>
AuthorDate: Thu Mar 11 21:45:43 2021 +0800

    [1.3.6-Prepare][Improvement-4624][cherry pick] When the server exist in the 
dead server list of zk,need stop service byself (#5013)
---
 .../server/master/MasterServer.java                |  63 ++-
 .../dispatch/host/LowerWeightHostManager.java      |   2 +-
 .../server/master/registry/MasterRegistry.java     |  73 +--
 .../server/registry/HeartBeatTask.java             |  40 +-
 .../server/registry/ZookeeperNodeManager.java      |   4 +-
 .../server/registry/ZookeeperRegistryCenter.java   |  92 +++-
 .../server/worker/WorkerServer.java                |  30 +-
 .../server/worker/registry/WorkerRegistry.java     |  60 ++-
 .../dolphinscheduler/server/zk/ZKMasterClient.java |   7 +-
 .../consumer/TaskPriorityQueueConsumerTest.java    |  35 +-
 .../server/master/registry/MasterRegistryTest.java |  25 +-
 .../registry/ZookeeperRegistryCenterTest.java      |  61 +++
 .../worker/processor/TaskCallbackServiceTest.java  |  78 ++-
 .../server/worker/registry/WorkerRegistryTest.java |  45 +-
 .../service/zk/AbstractZKClient.java               | 542 +++++++++------------
 .../service/zk/RegisterOperator.java               | 155 ++++++
 .../service/zk/ZookeeperConfig.java                |  11 +
 .../service/zk/RegisterOperatorTest.java           | 116 +++++
 pom.xml                                            |   3 +
 19 files changed, 939 insertions(+), 503 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 18882a2..f1962c7 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -17,6 +17,7 @@
 package org.apache.dolphinscheduler.server.master;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -25,6 +26,7 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
+import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
 import org.apache.dolphinscheduler.server.worker.WorkerServer;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
@@ -42,13 +44,10 @@ import 
org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.FilterType;
 
-
-
-
 @ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
         @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = 
{WorkerServer.class})
 })
-public class MasterServer {
+public class MasterServer implements IStoppable {
 
     /**
      * logger of MasterServer
@@ -74,6 +73,12 @@ public class MasterServer {
     private NettyRemotingServer nettyRemotingServer;
 
     /**
+     * master registry
+     */
+    @Autowired
+    private MasterRegistry masterRegistry;
+
+    /**
      * zk master client
      */
     @Autowired
@@ -100,19 +105,26 @@ public class MasterServer {
      * run master server
      */
     @PostConstruct
-    public void run(){
+    public void run() {
+        try {
+            //init remoting server
+            NettyServerConfig serverConfig = new NettyServerConfig();
+            serverConfig.setListenPort(masterConfig.getListenPort());
+            this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+            
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
new TaskResponseProcessor());
+            
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new 
TaskAckProcessor());
+            
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new 
TaskKillResponseProcessor());
+            this.nettyRemotingServer.start();
 
-        //init remoting server
-        NettyServerConfig serverConfig = new NettyServerConfig();
-        serverConfig.setListenPort(masterConfig.getListenPort());
-        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
new TaskResponseProcessor());
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new 
TaskAckProcessor());
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new 
TaskKillResponseProcessor());
-        this.nettyRemotingServer.start();
+            
this.masterRegistry.getZookeeperRegistryCenter().setStoppable(this);
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
 
         // self tolerant
-        this.zkMasterClient.start();
+        this.zkMasterClient.start(this);
 
         // scheduler start
         this.masterSchedulerService.start();
@@ -137,7 +149,9 @@ public class MasterServer {
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
-                close("shutdownHook");
+                if (Stopper.isRunning()) {
+                    close("shutdownHook");
+                }
             }
         }));
 
@@ -145,13 +159,14 @@ public class MasterServer {
 
     /**
      * gracefully close
+     *
      * @param cause close cause
      */
     public void close(String cause) {
 
         try {
             //execute only once
-            if(Stopper.isStopped()){
+            if (Stopper.isStopped()) {
                 return;
             }
 
@@ -163,24 +178,32 @@ public class MasterServer {
             try {
                 //thread sleep 3 seconds for thread quietly stop
                 Thread.sleep(3000L);
-            }catch (Exception e){
+            } catch (Exception e) {
                 logger.warn("thread sleep exception ", e);
             }
             //
             this.masterSchedulerService.close();
             this.nettyRemotingServer.close();
+            this.masterRegistry.unRegistry();
             this.zkMasterClient.close();
             //close quartz
-            try{
+            try {
                 QuartzExecutors.getInstance().shutdown();
                 logger.info("Quartz service stopped");
-            }catch (Exception e){
-                logger.warn("Quartz service stopped 
exception:{}",e.getMessage());
+            } catch (Exception e) {
+                logger.warn("Quartz service stopped exception:{}", 
e.getMessage());
             }
+
         } catch (Exception e) {
             logger.error("master server stop exception ", e);
+        } finally {
             System.exit(-1);
         }
     }
+
+    @Override
+    public void stop(String cause) {
+        close(cause);
+    }
 }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 1872ae0..ac7d8b0 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -149,7 +149,7 @@ public class LowerWeightHostManager extends 
CommonHostManager {
                     String workerGroupPath = 
registryCenter.getWorkerGroupPath(workerGroup);
                     Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
                     for(String node : nodes){
-                        String heartbeat = 
registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node);
+                        String heartbeat = 
registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
                         if(StringUtils.isNotEmpty(heartbeat)
                                 && heartbeat.split(COMMA).length == 
Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
                             String[] parts = heartbeat.split(COMMA);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index 0f41ba5..0624f0c 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -14,8 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.master.registry;
 
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+
+import org.apache.curator.framework.state.ConnectionState;
+
 import java.util.Date;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -23,15 +34,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.annotation.PostConstruct;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -40,7 +42,7 @@ import org.springframework.stereotype.Service;
 import com.google.common.collect.Sets;
 
 /**
- *  master registry
+ * master registry
  */
 @Service
 public class MasterRegistry {
@@ -48,7 +50,7 @@ public class MasterRegistry {
     private final Logger logger = 
LoggerFactory.getLogger(MasterRegistry.class);
 
     /**
-     *  zookeeper registry center
+     * zookeeper registry center
      */
     @Autowired
     private ZookeeperRegistryCenter zookeeperRegistryCenter;
@@ -65,42 +67,41 @@ public class MasterRegistry {
     private ScheduledExecutorService heartBeatExecutor;
 
     /**
-     * worker start time
+     * master start time
      */
     private String startTime;
 
-
     @PostConstruct
-    public void init(){
+    public void init() {
         this.startTime = DateUtils.dateToString(new Date());
         this.heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("HeartBeatExecutor"));
     }
 
     /**
-     *  registry
+     * registry
      */
     public void registry() {
         String address = OSUtils.getHost();
         String localNodePath = getMasterPath();
-        
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath,
 "");
-        
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new
 ConnectionStateListener() {
-            @Override
-            public void stateChanged(CuratorFramework client, ConnectionState 
newState) {
-                if(newState == ConnectionState.LOST){
+        
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, 
"");
+        
zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
+            (client, newState) -> {
+                if (newState == ConnectionState.LOST) {
                     logger.error("master : {} connection lost from zookeeper", 
address);
-                } else if(newState == ConnectionState.RECONNECTED){
+                } else if (newState == ConnectionState.RECONNECTED) {
                     logger.info("master : {} reconnected to zookeeper", 
address);
-                    
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath,
 "");
-                } else if(newState == ConnectionState.SUSPENDED){
+                    
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, 
"");
+                } else if (newState == ConnectionState.SUSPENDED) {
                     logger.warn("master : {} connection SUSPENDED ", address);
+                    
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, 
"");
                 }
-            }
-        });
+            });
         int masterHeartbeatInterval = 
masterConfig.getMasterHeartbeatInterval();
         HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
                 masterConfig.getMasterReservedMemory(),
                 masterConfig.getMasterMaxCpuloadAvg(),
                 Sets.newHashSet(getMasterPath()),
+                Constants.MASTER_PREFIX,
                 zookeeperRegistryCenter);
 
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
@@ -108,31 +109,37 @@ public class MasterRegistry {
     }
 
     /**
-     *  remove registry info
+     * remove registry info
      */
     public void unRegistry() {
         String address = getLocalAddress();
         String localNodePath = getMasterPath();
-        
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
+        zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath);
         logger.info("master node : {} unRegistry to ZK.", address);
     }
 
     /**
-     *  get master path
-     * @return
+     * get master path
      */
-    private String getMasterPath() {
+    public String getMasterPath() {
         String address = getLocalAddress();
-        String localNodePath = this.zookeeperRegistryCenter.getMasterPath() + 
"/" + address;
-        return localNodePath;
+        return this.zookeeperRegistryCenter.getMasterPath() + "/" + address;
     }
 
     /**
-     *  get local address
+     * get local address
      * @return
      */
     private String getLocalAddress(){
         return OSUtils.getAddr(masterConfig.getListenPort());
     }
 
+    /**
+     * get zookeeper registry center
+     * @return ZookeeperRegistryCenter
+     */
+    public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
+        return zookeeperRegistryCenter;
+    }
+
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index bd8c79c..90d6ea3 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -19,16 +19,21 @@ package org.apache.dolphinscheduler.server.registry;
 
 import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
 
-import java.util.Date;
-import java.util.Set;
-
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
+
+import java.util.Date;
+import java.util.Set;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HeartBeatTask extends Thread {
+/**
+ * Heart beat task
+ */
+public class HeartBeatTask implements Runnable {
 
     private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
 
@@ -36,23 +41,39 @@ public class HeartBeatTask extends Thread {
     private double reservedMemory;
     private double maxCpuloadAvg;
     private Set<String> heartBeatPaths;
+    private String serverType;
     private ZookeeperRegistryCenter zookeeperRegistryCenter;
+    /**
+     * server stop or not
+     */
+    protected IStoppable stoppable = null;
 
     public HeartBeatTask(String startTime,
                          double reservedMemory,
                          double maxCpuloadAvg,
                          Set<String> heartBeatPaths,
+                         String serverType,
                          ZookeeperRegistryCenter zookeeperRegistryCenter) {
         this.startTime = startTime;
         this.reservedMemory = reservedMemory;
         this.maxCpuloadAvg = maxCpuloadAvg;
         this.heartBeatPaths = heartBeatPaths;
         this.zookeeperRegistryCenter = zookeeperRegistryCenter;
+        this.serverType = serverType;
     }
 
     @Override
     public void run() {
         try {
+
+            // check dead or not in zookeeper
+            for (String heartBeatPath : heartBeatPaths) {
+                if (zookeeperRegistryCenter.checkIsDeadServer(heartBeatPath, 
serverType)) {
+                    zookeeperRegistryCenter.getStoppable().stop("i was judged 
to death, release resources and stop myself");
+                    return;
+                }
+            }
+
             double availablePhysicalMemorySize = 
OSUtils.availablePhysicalMemorySize();
             double loadAverage = OSUtils.loadAverage();
 
@@ -78,10 +99,19 @@ public class HeartBeatTask extends Thread {
             builder.append(OSUtils.getProcessID());
 
             for (String heartBeatPath : heartBeatPaths) {
-                
zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, 
builder.toString());
+                
zookeeperRegistryCenter.getRegisterOperator().update(heartBeatPath, 
builder.toString());
             }
         } catch (Throwable ex) {
             logger.error("error write heartbeat info", ex);
         }
     }
+
+    /**
+     * for stop server
+     *
+     * @param serverStoppable server stoppable interface
+     */
+    public void setStoppable(IStoppable serverStoppable) {
+        this.stoppable = serverStoppable;
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index 864276b..bae4d14 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -93,11 +93,11 @@ public class ZookeeperNodeManager implements 
InitializingBean {
         /**
          * init MasterNodeListener listener
          */
-        registryCenter.getZookeeperCachedOperator().addListener(new 
MasterNodeListener());
+        registryCenter.getRegisterOperator().addListener(new 
MasterNodeListener());
         /**
          * init WorkerNodeListener listener
          */
-        registryCenter.getZookeeperCachedOperator().addListener(new 
WorkerGroupNodeListener());
+        registryCenter.getRegisterOperator().addListener(new 
WorkerGroupNodeListener());
     }
 
     /**
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 3ca62be..9017a13 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -17,19 +17,27 @@
 
 package org.apache.dolphinscheduler.server.registry;
 
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
 
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
 /**
- *  zookeeper register center
+ * zookeeper register center
  */
 @Service
 public class ZookeeperRegistryCenter implements InitializingBean {
@@ -38,10 +46,9 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
 
     @Autowired
-    protected ZookeeperCachedOperator zookeeperCachedOperator;
-
+    protected RegisterOperator registerOperator;
     @Autowired
-    private  ZookeeperConfig zookeeperConfig;
+    private ZookeeperConfig zookeeperConfig;
 
     /**
      * nodes namespace
@@ -60,6 +67,8 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     public final String EMPTY = "";
 
+    private IStoppable stoppable;
+
     @Override
     public void afterPropertiesSet() throws Exception {
         NODES = zookeeperConfig.getDsRoot() + "/nodes";
@@ -82,23 +91,22 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
      * init nodes
      */
     private void initNodes() {
-        zookeeperCachedOperator.persist(MASTER_PATH, EMPTY);
-        zookeeperCachedOperator.persist(WORKER_PATH, EMPTY);
+        registerOperator.persist(MASTER_PATH, EMPTY);
+        registerOperator.persist(WORKER_PATH, EMPTY);
     }
 
     /**
      * close
      */
     public void close() {
-        if (isStarted.compareAndSet(true, false)) {
-            if (zookeeperCachedOperator != null) {
-                zookeeperCachedOperator.close();
-            }
+        if (isStarted.compareAndSet(true, false) && registerOperator != null) {
+            registerOperator.close();
         }
     }
 
     /**
      * get master path
+     *
      * @return master path
      */
     public String getMasterPath() {
@@ -107,6 +115,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * get worker path
+     *
      * @return worker path
      */
     public String getWorkerPath() {
@@ -114,7 +123,8 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
     }
 
     /**
-     *  get master nodes directly
+     * get master nodes directly
+     *
      * @return master nodes
      */
     public Set<String> getMasterNodesDirectly() {
@@ -123,7 +133,8 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
     }
 
     /**
-     *  get worker nodes directly
+     * get worker nodes directly
+     *
      * @return master nodes
      */
     public Set<String> getWorkerNodesDirectly() {
@@ -133,6 +144,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * get worker group directly
+     *
      * @return worker group nodes
      */
     public Set<String> getWorkerGroupDirectly() {
@@ -142,6 +154,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * get worker group nodes
+     *
      * @param workerGroup
      * @return
      */
@@ -152,6 +165,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * whether worker path
+     *
      * @param path path
      * @return result
      */
@@ -161,6 +175,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * whether master path
+     *
      * @param path path
      * @return result
      */
@@ -170,6 +185,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * get worker group path
+     *
      * @param workerGroup workerGroup
      * @return worker group path
      */
@@ -179,19 +195,53 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     /**
      * get children nodes
+     *
      * @param key key
      * @return children nodes
      */
     public List<String> getChildrenKeys(final String key) {
-        return zookeeperCachedOperator.getChildrenKeys(key);
+        return registerOperator.getChildrenKeys(key);
+    }
+
+    /**
+     * @return get dead server node parent path
+     */
+    public String getDeadZNodeParentPath() {
+        return registerOperator.getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
+    }
+
+    public void setStoppable(IStoppable stoppable) {
+        this.stoppable = stoppable;
+    }
+
+    public IStoppable getStoppable() {
+        return stoppable;
     }
 
     /**
-     * get zookeeperCachedOperator
-     * @return zookeeperCachedOperator
+     * check dead server or not , if dead, stop self
+     *
+     * @param zNode      node path
+     * @param serverType master or worker prefix
+     * @return true if not exists
+     * @throws Exception errors
      */
-    public ZookeeperCachedOperator getZookeeperCachedOperator() {
-        return zookeeperCachedOperator;
+    protected boolean checkIsDeadServer(String zNode, String serverType) 
throws Exception {
+        //ip_sequenceno
+        String[] zNodesPath = zNode.split("\\/");
+        String ipSeqNo = zNodesPath[zNodesPath.length - 1];
+
+        String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : 
WORKER_PREFIX;
+        String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type 
+ UNDERLINE + ipSeqNo;
+
+        if (!registerOperator.isExisted(zNode) || 
registerOperator.isExisted(deadServerPath)) {
+            return true;
+        }
+
+        return false;
     }
 
+    public RegisterOperator getRegisterOperator() {
+        return registerOperator;
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 6895de3..b408f6b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -17,6 +17,8 @@
 package org.apache.dolphinscheduler.server.worker;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -29,6 +31,9 @@ import 
org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
 import 
org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.Set;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -42,7 +47,7 @@ import javax.annotation.PostConstruct;
  *  worker server
  */
 @ComponentScan("org.apache.dolphinscheduler")
-public class WorkerServer {
+public class WorkerServer implements IStoppable {
 
     /**
      * logger
@@ -106,7 +111,15 @@ public class WorkerServer {
         this.nettyRemotingServer.start();
 
         // worker registry
-        this.workerRegistry.registry();
+        try {
+            this.workerRegistry.registry();
+            
this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this);
+            Set<String> workerZkPaths = this.workerRegistry.getWorkerZkPaths();
+            
this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths,
 ZKNodeType.WORKER, Constants.DELETE_ZK_OP);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
 
         // retry report task status
         this.retryReportTaskStatusThread.start();
@@ -117,7 +130,9 @@ public class WorkerServer {
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
-                close("shutdownHook");
+                if (Stopper.isRunning()) {
+                    close("shutdownHook");
+                }
             }
         }));
     }
@@ -126,7 +141,7 @@ public class WorkerServer {
 
         try {
             //execute only once
-            if(Stopper.isStopped()){
+            if (Stopper.isStopped()) {
                 return;
             }
 
@@ -138,7 +153,7 @@ public class WorkerServer {
             try {
                 //thread sleep 3 seconds for thread quitely stop
                 Thread.sleep(3000L);
-            }catch (Exception e){
+            } catch (Exception e) {
                 logger.warn("thread sleep exception", e);
             }
 
@@ -147,8 +162,13 @@ public class WorkerServer {
 
         } catch (Exception e) {
             logger.error("worker server stop exception ", e);
+        } finally {
             System.exit(-1);
         }
     }
 
+    @Override
+    public void stop(String cause) {
+        close(cause);
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 921d0de..01e4554 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -19,6 +19,17 @@ package org.apache.dolphinscheduler.server.worker.registry;
 import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
 import static org.apache.dolphinscheduler.common.Constants.SLASH;
 
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
+import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+
+import org.apache.curator.framework.state.ConnectionState;
+
 import java.util.Date;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -27,16 +38,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.annotation.PostConstruct;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -46,7 +47,7 @@ import com.google.common.collect.Sets;
 
 
 /**
- *  worker registry
+ * worker registry
  */
 @Service
 public class WorkerRegistry {
@@ -54,13 +55,13 @@ public class WorkerRegistry {
     private final Logger logger = 
LoggerFactory.getLogger(WorkerRegistry.class);
 
     /**
-     *  zookeeper registry center
+     * zookeeper registry center
      */
     @Autowired
     private ZookeeperRegistryCenter zookeeperRegistryCenter;
 
     /**
-     *  worker config
+     * worker config
      */
     @Autowired
     private WorkerConfig workerConfig;
@@ -79,14 +80,22 @@ public class WorkerRegistry {
     private Set<String> workerGroups;
 
     @PostConstruct
-    public void init(){
+    public void init() {
         this.workerGroups = workerConfig.getWorkerGroups();
         this.startTime = DateUtils.dateToString(new Date());
         this.heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("HeartBeatExecutor"));
     }
 
     /**
-     *  registry
+     * get zookeeper registry center
+     * @return ZookeeperRegistryCenter
+     */
+    public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
+        return zookeeperRegistryCenter;
+    }
+
+    /**
+     * registry
      */
     public void registry() {
         String address = OSUtils.getHost();
@@ -94,20 +103,18 @@ public class WorkerRegistry {
         int workerHeartbeatInterval = 
workerConfig.getWorkerHeartbeatInterval();
 
         for (String workerZKPath : workerZkPaths) {
-            
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath,
 "");
-            
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new
 ConnectionStateListener() {
-                @Override
-                public void stateChanged(CuratorFramework client, 
ConnectionState newState) {
+            
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, 
"");
+            
zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
+                (client,newState) -> {
                     if (newState == ConnectionState.LOST) {
                         logger.error("worker : {} connection lost from 
zookeeper", address);
                     } else if (newState == ConnectionState.RECONNECTED) {
                         logger.info("worker : {} reconnected to zookeeper", 
address);
-                        
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath,
 "");
+                        
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, 
"");
                     } else if (newState == ConnectionState.SUSPENDED) {
                         logger.warn("worker : {} connection SUSPENDED ", 
address);
                     }
-                }
-            });
+                });
             logger.info("worker node : {} registry to ZK {} successfully", 
address, workerZKPath);
         }
 
@@ -115,6 +122,7 @@ public class WorkerRegistry {
                 this.workerConfig.getWorkerReservedMemory(),
                 this.workerConfig.getWorkerMaxCpuloadAvg(),
                 workerZkPaths,
+                Constants.WORKER_PREFIX,
                 this.zookeeperRegistryCenter);
 
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
@@ -122,22 +130,22 @@ public class WorkerRegistry {
     }
 
     /**
-     *  remove registry info
+     * remove registry info
      */
     public void unRegistry() {
         String address = getLocalAddress();
         Set<String> workerZkPaths = getWorkerZkPaths();
         for (String workerZkPath : workerZkPaths) {
-            
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath);
+            zookeeperRegistryCenter.getRegisterOperator().remove(workerZkPath);
             logger.info("worker node : {} unRegistry from ZK {}.", address, 
workerZkPath);
         }
         this.heartBeatExecutor.shutdownNow();
     }
 
     /**
-     *  get worker path
+     * get worker path
      */
-    private Set<String> getWorkerZkPaths() {
+    public Set<String> getWorkerZkPaths() {
         Set<String> workerZkPaths = Sets.newHashSet();
 
         String address = getLocalAddress();
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index 7f74c8c..2d8eed9 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.MasterServer;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -72,8 +73,7 @@ public class ZKMasterClient extends AbstractZKClient {
     @Autowired
     private MasterRegistry masterRegistry;
 
-    public void start() {
-
+    public void start(MasterServer masterServer) {
         InterProcessMutex mutex = null;
         try {
             // create distributed lock with the root node path of the lock 
space as /dolphinscheduler/lock/failover/master
@@ -83,6 +83,9 @@ public class ZKMasterClient extends AbstractZKClient {
 
             //  Master registry
             masterRegistry.registry();
+            
masterRegistry.getZookeeperRegistryCenter().setStoppable(masterServer);
+            String registPath = this.masterRegistry.getMasterPath();
+            
masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registPath,
 ZKNodeType.MASTER, Constants.DELETE_ZK_OP);
 
             // init system znode
             this.initSystemZNode();
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index eb914f4..2c2a1b5 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -17,11 +17,20 @@
 
 package org.apache.dolphinscheduler.server.master.consumer;
 
-import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.DbType;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.*;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -35,8 +44,17 @@ import 
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import org.apache.curator.CuratorZookeeperClient;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -47,17 +65,10 @@ import 
org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration(classes={DependencyConfig.class, 
SpringApplicationContext.class, SpringZKServer.class,
         NettyExecutorManager.class, ExecutorDispatcher.class, 
ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
-        ZookeeperNodeManager.class, ZookeeperCachedOperator.class, 
ZookeeperConfig.class, MasterConfig.class})
+        ZookeeperNodeManager.class, RegisterOperator.class, 
ZookeeperConfig.class, MasterConfig.class})
 public class TaskPriorityQueueConsumerTest {
 
 
@@ -503,8 +514,6 @@ public class TaskPriorityQueueConsumerTest {
 
         TaskExecutionContext taskExecutionContext  = 
taskPriorityQueueConsumer.getTaskExecutionContext(1);
 
-
-
         Assert.assertNotNull(taskExecutionContext);
     }
 
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
index 81d27be..2488d59 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
@@ -17,29 +17,32 @@
 
 package org.apache.dolphinscheduler.server.master.registry;
 
-import org.apache.dolphinscheduler.common.utils.OSUtils;
+import static 
org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
+
 import org.apache.dolphinscheduler.remote.utils.Constants;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.zk.SpringZKServer;
 import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import org.apache.curator.CuratorZookeeperClient;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringRunner;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
 /**
  * master registry test
  */
 @RunWith(SpringRunner.class)
-@ContextConfiguration(classes={SpringZKServer.class, 
MasterRegistry.class,ZookeeperRegistryCenter.class, MasterConfig.class, 
ZookeeperCachedOperator.class, ZookeeperConfig.class})
+@ContextConfiguration(classes = {SpringZKServer.class, MasterRegistry.class, 
ZookeeperRegistryCenter.class,
+        MasterConfig.class, ZookeeperCachedOperator.class, 
ZookeeperConfig.class, CuratorZookeeperClient.class})
 public class MasterRegistryTest {
 
     @Autowired
@@ -56,18 +59,20 @@ public class MasterRegistryTest {
         masterRegistry.registry();
         String masterPath = zookeeperRegistryCenter.getMasterPath();
         TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); 
//wait heartbeat info write into zk node
-        String masterNodePath = masterPath + Constants.SLASH + 
(OSUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort()));
-        String heartbeat = 
zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath);
+        String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + 
":" + masterConfig.getListenPort());
+        String heartbeat = 
zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath);
         Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, 
heartbeat.split(",").length);
+        masterRegistry.unRegistry();
     }
 
     @Test
     public void testUnRegistry() throws InterruptedException {
+        masterRegistry.init();
         masterRegistry.registry();
         TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); 
//wait heartbeat info write into zk node
         masterRegistry.unRegistry();
         String masterPath = zookeeperRegistryCenter.getMasterPath();
-        List<String> childrenKeys = 
zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(masterPath);
+        List<String> childrenKeys = 
zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(masterPath);
         Assert.assertTrue(childrenKeys.isEmpty());
     }
 }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
new file mode 100644
index 0000000..24bb25c
--- /dev/null
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.registry;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+/**
+ * zookeeper registry center test
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ZookeeperRegistryCenterTest {
+
+    @InjectMocks
+    private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+    @Mock
+    protected RegisterOperator registerOperator;
+
+    @Mock
+    private ZookeeperConfig zookeeperConfig;
+
+    private static final String DS_ROOT = "/dolphinscheduler";
+
+    @Test
+    public void testGetDeadZNodeParentPath() {
+        ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
+        zookeeperConfig.setDsRoot(DS_ROOT);
+        
Mockito.when(registerOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
+
+        String deadZNodeParentPath = 
zookeeperRegistryCenter.getDeadZNodeParentPath();
+
+        Assert.assertEquals(deadZNodeParentPath, DS_ROOT + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS);
+
+    }
+
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index c38ca3e..f228802 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.processor;
 
-import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
@@ -33,12 +33,18 @@ import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseSer
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
 import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import 
org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
 import org.apache.dolphinscheduler.server.zk.SpringZKServer;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import org.apache.curator.CuratorZookeeperClient;
+
+import java.util.Date;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -47,17 +53,32 @@ import 
org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
-import java.io.IOException;
-import java.util.Date;
+import io.netty.channel.Channel;
 
 /**
  * test task call back service
+ * todo  refactor it in the form of mock
  */
 @RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, 
SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, 
WorkerRegistry.class,
-        ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class,
-        ZookeeperCachedOperator.class, ZookeeperConfig.class, 
ZookeeperNodeManager.class, TaskCallbackService.class,
-        TaskResponseService.class, 
TaskAckProcessor.class,TaskResponseProcessor.class})
+@ContextConfiguration(classes = {
+    TaskCallbackServiceTestConfig.class,
+    SpringZKServer.class,
+    SpringApplicationContext.class,
+    MasterRegistry.class,
+    WorkerRegistry.class,
+    ZookeeperRegistryCenter.class,
+    MasterConfig.class,
+    WorkerConfig.class,
+    RegisterOperator.class,
+    ZookeeperConfig.class,
+    ZookeeperNodeManager.class,
+    TaskCallbackService.class,
+    TaskResponseService.class,
+    TaskAckProcessor.class,
+    TaskResponseProcessor.class,
+    TaskExecuteProcessor.class,
+    CuratorZookeeperClient.class,
+    TaskExecutionContextCacheManagerImpl.class})
 public class TaskCallbackServiceTest {
 
     @Autowired
@@ -74,6 +95,7 @@ public class TaskCallbackServiceTest {
 
     /**
      * send ack test
+     *
      * @throws Exception
      */
     @Test
@@ -101,6 +123,7 @@ public class TaskCallbackServiceTest {
 
     /**
      * send result test
+     *
      * @throws Exception
      */
     @Test
@@ -140,7 +163,7 @@ public class TaskCallbackServiceTest {
 
     @Test
     public void testPause(){
-        Assert.assertEquals(5000, taskCallbackService.pause(3));;
+        Assert.assertEquals(5000, taskCallbackService.pause(3));
     }
 
     @Test
@@ -171,41 +194,4 @@ public class TaskCallbackServiceTest {
         nettyRemotingServer.close();
         nettyRemotingClient.close();
     }
-
-//    @Test(expected = IllegalStateException.class)
-//    public void testSendAckWithIllegalStateException2(){
-//        masterRegistry.registry();
-//        final NettyServerConfig serverConfig = new NettyServerConfig();
-//        serverConfig.setListenPort(30000);
-//        NettyRemotingServer nettyRemotingServer = new 
NettyRemotingServer(serverConfig);
-//        nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, 
taskAckProcessor);
-//        nettyRemotingServer.start();
-//
-//        final NettyClientConfig clientConfig = new NettyClientConfig();
-//        NettyRemotingClient nettyRemotingClient = new 
NettyRemotingClient(clientConfig);
-//        Channel channel = 
nettyRemotingClient.getChannel(Host.of("localhost:30000"));
-//        taskCallbackService.addRemoteChannel(1, new 
NettyRemoteChannel(channel, 1));
-//        channel.close();
-//        TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
-//        ackCommand.setTaskInstanceId(1);
-//        ackCommand.setStartTime(new Date());
-//
-//        nettyRemotingServer.close();
-//
-//        taskCallbackService.sendAck(1, ackCommand.convert2Command());
-//        try {
-//            Thread.sleep(5000);
-//        } catch (InterruptedException e) {
-//            e.printStackTrace();
-//        }
-//
-//        Stopper.stop();
-//
-//        try {
-//            Thread.sleep(5000);
-//        } catch (InterruptedException e) {
-//            e.printStackTrace();
-//        }
-//    }
-
 }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
index 98a78e9..0a4307b 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
@@ -19,18 +19,20 @@ package org.apache.dolphinscheduler.server.worker.registry;
 
 import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
 
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
-import org.apache.curator.framework.imps.CuratorFrameworkImpl;
-import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
+
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionStateListener;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,7 +63,7 @@ public class WorkerRegistryTest {
     private ZookeeperRegistryCenter zookeeperRegistryCenter;
 
     @Mock
-    private ZookeeperCachedOperator zookeeperCachedOperator;
+    private RegisterOperator registerOperator;
 
     @Mock
     private CuratorFrameworkImpl zkClient;
@@ -69,15 +71,21 @@ public class WorkerRegistryTest {
     @Mock
     private WorkerConfig workerConfig;
 
+    private static final Set<String> workerGroups;
+
+    static {
+        workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, 
TEST_WORKER_GROUP);
+    }
+
     @Before
     public void before() {
-        Set<String> workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, 
TEST_WORKER_GROUP);
+
         Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups);
 
         
Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker");
-        
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator()).thenReturn(zookeeperCachedOperator);
-        
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient()).thenReturn(zkClient);
-        
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable()).thenReturn(
+        
Mockito.when(zookeeperRegistryCenter.getRegisterOperator()).thenReturn(registerOperator);
+        
Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient()).thenReturn(zkClient);
+        
Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable()).thenReturn(
                 new Listenable<ConnectionStateListener>() {
                     @Override
                     public void addListener(ConnectionStateListener 
connectionStateListener) {
@@ -114,7 +122,7 @@ public class WorkerRegistryTest {
         int i = 0;
         for (String workerGroup : workerConfig.getWorkerGroups()) {
             String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" 
+ (OSUtils.getAddr(workerConfig.getListenPort()));
-            String heartbeat = 
zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath);
+            String heartbeat = 
zookeeperRegistryCenter.getRegisterOperator().get(workerZkPath);
             if (0 == i) {
                 
Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/"));
             } else {
@@ -143,6 +151,7 @@ public class WorkerRegistryTest {
 
         Assert.assertEquals(0, testWorkerGroupPathZkChildren.size());
         Assert.assertEquals(0, defaultWorkerGroupPathZkChildren.size());
+        workerRegistry.unRegistry();
     }
 
     @Test
@@ -155,7 +164,7 @@ public class WorkerRegistryTest {
 
         for (String workerGroup : workerConfig.getWorkerGroups()) {
             String workerGroupPath = workerPath + "/" + workerGroup.trim();
-            List<String> childrenKeys = 
zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath);
+            List<String> childrenKeys = 
zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(workerGroupPath);
             Assert.assertTrue(childrenKeys.isEmpty());
         }
 
@@ -167,4 +176,10 @@ public class WorkerRegistryTest {
 
         workerRegistry.unRegistry();
     }
+
+    @Test
+    public void testGetWorkerZkPaths() {
+        workerRegistry.init();
+        
Assert.assertEquals(workerGroups.size(),workerRegistry.getWorkerZkPaths().size());
+    }
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 1cc4db6..24cdb89 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -14,322 +14,256 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.zk;
 
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ZKNodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.ResInfo;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-import java.util.*;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-
 /**
  * abstract zookeeper client
  */
 @Component
-public abstract class AbstractZKClient extends ZookeeperCachedOperator {
-
-       private static final Logger logger = 
LoggerFactory.getLogger(AbstractZKClient.class);
-
-
-       /**
-        *  remove dead server by host
-        * @param host host
-        * @param serverType serverType
-        * @throws Exception
-        */
-       public void removeDeadServerByHost(String host, String serverType) 
throws Exception {
-               List<String> deadServers = 
super.getChildrenKeys(getDeadZNodeParentPath());
-               for(String serverPath : deadServers){
-                       if(serverPath.startsWith(serverType+UNDERLINE+host)){
-                               String server = getDeadZNodeParentPath() + 
SINGLE_SLASH + serverPath;
-                               super.remove(server);
-                               logger.info("{} server {} deleted from zk dead 
server path success" , serverType , host);
-                       }
-               }
-       }
-
-
-       /**
-        * opType(add): if find dead server , then add to zk deadServerPath
-        * opType(delete): delete path from zk
-        *
-        * @param zNode                   node path
-        * @param zkNodeType      master or worker
-        * @param opType                  delete or add
-        * @throws Exception errors
-        */
-       public void handleDeadServer(String zNode, ZKNodeType zkNodeType, 
String opType) throws Exception {
-               String host = getHostByEventDataPath(zNode);
-               String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX 
: WORKER_PREFIX;
-
-               //check server restart, if restart , dead server path in zk 
should be delete
-               if(opType.equals(DELETE_ZK_OP)){
-                       removeDeadServerByHost(host, type);
-
-               }else if(opType.equals(ADD_ZK_OP)){
-                       String deadServerPath = getDeadZNodeParentPath() + 
SINGLE_SLASH + type + UNDERLINE + host;
-                       if(!super.isExisted(deadServerPath)){
-                               //add dead server info to zk dead server path : 
/dead-servers/
-
-                               super.persist(deadServerPath,(type + UNDERLINE 
+ host));
-
-                               logger.info("{} server dead , and {} added to 
zk dead server path success" ,
-                                               zkNodeType.toString(), zNode);
-                       }
-               }
-
-       }
-
-       /**
-        * get active master num
-        * @return active master number
-        */
-       public int getActiveMasterNum(){
-               List<String> childrenList = new ArrayList<>();
-               try {
-                       // read master node parent path from conf
-                       
if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
-                               childrenList = 
super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
-                       }
-               } catch (Exception e) {
-                       logger.error("getActiveMasterNum error",e);
-               }
-               return childrenList.size();
-       }
-
-       /**
-        *
-        * @return zookeeper quorum
-        */
-       public String getZookeeperQuorum(){
-               return getZookeeperConfig().getServerList();
-       }
-
-       /**
-        * get server list.
-        * @param zkNodeType zookeeper node type
-        * @return server list
-        */
-       public List<Server> getServersList(ZKNodeType zkNodeType){
-               Map<String, String> masterMap = getServerMaps(zkNodeType);
-               String parentPath = getZNodeParentPath(zkNodeType);
-
-               List<Server> masterServers = new ArrayList<>();
-               for (Map.Entry<String, String> entry : masterMap.entrySet()) {
-                       Server masterServer = 
ResInfo.parseHeartbeatForZKInfo(entry.getValue());
-                       if(masterServer == null){
-                               continue;
-                       }
-                       String key = entry.getKey();
-                       masterServer.setZkDirectory(parentPath + "/"+ key);
-                       //set host and port
-                       String[] hostAndPort=key.split(COLON);
-                       String[] hosts=hostAndPort[0].split(DIVISION_STRING);
-                       // fetch the last one
-                       masterServer.setHost(hosts[hosts.length-1]);
-                       masterServer.setPort(Integer.parseInt(hostAndPort[1]));
-                       masterServers.add(masterServer);
-               }
-               return masterServers;
-       }
-
-       /**
-        * get master server list map.
-        * @param zkNodeType zookeeper node type
-        * @return result : {host : resource info}
-        */
-       public Map<String, String> getServerMaps(ZKNodeType zkNodeType){
-
-               Map<String, String> masterMap = new HashMap<>();
-               try {
-                       String path =  getZNodeParentPath(zkNodeType);
-                       List<String> serverList  = super.getChildrenKeys(path);
-                       if(zkNodeType == ZKNodeType.WORKER){
-                           List<String> workerList = new ArrayList<>();
-                           for(String group : serverList){
-                               List<String> groupServers = 
super.getChildrenKeys(path + Constants.SLASH + group);
-                               for(String groupServer : groupServers){
-                                       workerList.add(group + Constants.SLASH 
+ groupServer);
-                                       }
-                               }
-                               serverList = workerList;
-                       }
-                       for(String server : serverList){
-                               masterMap.putIfAbsent(server, super.get(path + 
Constants.SLASH + server));
-                       }
-               } catch (Exception e) {
-                       logger.error("get server list failed", e);
-               }
-
-               return masterMap;
-       }
-
-       /**
-        * check the zookeeper node already exists
-        * @param host host
-        * @param zkNodeType zookeeper node type
-        * @return true if exists
-        */
-       public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
-               String path = getZNodeParentPath(zkNodeType);
-               if(StringUtils.isEmpty(path)){
-                       logger.error("check zk node exists error, host:{}, zk 
node type:{}",
-                                       host, zkNodeType.toString());
-                       return false;
-               }
-               Map<String, String> serverMaps = getServerMaps(zkNodeType);
-               for(String hostKey : serverMaps.keySet()){
-                       if(hostKey.contains(host)){
-                               return true;
-                       }
-               }
-               return false;
-       }
-
-       /**
-        *
-        * @return get worker node parent path
-        */
-       protected String getWorkerZNodeParentPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
-       }
-
-       /**
-        *
-        * @return get master node parent path
-        */
-       protected String getMasterZNodeParentPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
-       }
-
-       /**
-        *
-        * @return get master lock path
-        */
-       public String getMasterLockPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
-       }
-
-       /**
-        *
-        * @param zkNodeType zookeeper node type
-        * @return get zookeeper node parent path
-        */
-       public String getZNodeParentPath(ZKNodeType zkNodeType) {
-               String path = "";
-               switch (zkNodeType){
-                       case MASTER:
-                               return getMasterZNodeParentPath();
-                       case WORKER:
-                               return getWorkerZNodeParentPath();
-                       case DEAD_SERVER:
-                               return getDeadZNodeParentPath();
-                       default:
-                               break;
-               }
-               return path;
-       }
-
-       /**
-        *
-        * @return get dead server node parent path
-        */
-       protected String getDeadZNodeParentPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
-       }
-
-       /**
-        *
-        * @return get master start up lock path
-        */
-       public String getMasterStartUpLockPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
-       }
-
-       /**
-        *
-        * @return get master failover lock path
-        */
-       public String getMasterFailoverLockPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
-       }
-
-       /**
-        *
-        * @return get worker failover lock path
-        */
-       public String getWorkerFailoverLockPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
-       }
-
-       /**
-        * release mutex
-        * @param mutex mutex
-        */
-       public void releaseMutex(InterProcessMutex mutex) {
-               if (mutex != null){
-                       try {
-                               mutex.release();
-                       } catch (Exception e) {
-                               if(e.getMessage().equals("instance must be 
started before calling this method")){
-                                       logger.warn("lock release");
-                               }else{
-                                       logger.error("lock release failed",e);
-                               }
-
-                       }
-               }
-       }
-
-       /**
-        *  init system znode
-        */
-       protected void initSystemZNode(){
-               try {
-                       persist(getMasterZNodeParentPath(), "");
-                       persist(getWorkerZNodeParentPath(), "");
-                       persist(getDeadZNodeParentPath(), "");
-
-                       logger.info("initialize server nodes success.");
-               } catch (Exception e) {
-                       logger.error("init system znode failed",e);
-               }
-       }
-
-       /**
-        *  get host ip, string format: masterParentPath/ip
-        * @param path path
-        * @return host ip, string format: masterParentPath/ip
-        */
-       protected String getHostByEventDataPath(String path) {
-               if(StringUtils.isEmpty(path)){
-                       logger.error("empty path!");
-                       return "";
-               }
-               String[] pathArray = path.split(SINGLE_SLASH);
-               if(pathArray.length < 1){
-                       logger.error("parse ip error: {}", path);
-                       return "";
-               }
-               return pathArray[pathArray.length - 1];
-
-       }
-
-       @Override
-       public String toString() {
-               return "AbstractZKClient{" +
-                               "zkClient=" + zkClient +
-                               ", deadServerZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
-                               ", masterZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
-                               ", workerZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.WORKER) + '\'' +
-                               '}';
-       }
-}
\ No newline at end of file
+public abstract class AbstractZKClient extends RegisterOperator {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractZKClient.class);
+
+    /**
+     * get active master num
+     *
+     * @return active master number
+     */
+    public int getActiveMasterNum() {
+        List<String> childrenList = new ArrayList<>();
+        try {
+            // read master node parent path from conf
+            if (super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))) {
+                childrenList = 
super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
+            }
+        } catch (Exception e) {
+            logger.error("getActiveMasterNum error", e);
+        }
+        return childrenList.size();
+    }
+
+    /**
+     * @return zookeeper quorum
+     */
+    public String getZookeeperQuorum() {
+        return getZookeeperConfig().getServerList();
+    }
+
+    /**
+     * get server list.
+     *
+     * @param zkNodeType zookeeper node type
+     * @return server list
+     */
+    public List<Server> getServersList(ZKNodeType zkNodeType) {
+        Map<String, String> masterMap = getServerMaps(zkNodeType);
+        String parentPath = getZNodeParentPath(zkNodeType);
+
+        List<Server> masterServers = new ArrayList<>();
+        for (Map.Entry<String, String> entry : masterMap.entrySet()) {
+            Server masterServer = 
ResInfo.parseHeartbeatForZKInfo(entry.getValue());
+            if (masterServer == null) {
+                continue;
+            }
+            String key = entry.getKey();
+            masterServer.setZkDirectory(parentPath + "/" + key);
+            //set host and port
+            String[] hostAndPort = key.split(COLON);
+            String[] hosts = hostAndPort[0].split(DIVISION_STRING);
+            // fetch the last one
+            masterServer.setHost(hosts[hosts.length - 1]);
+            masterServer.setPort(Integer.parseInt(hostAndPort[1]));
+            masterServers.add(masterServer);
+        }
+        return masterServers;
+    }
+
+    /**
+     * get master server list map.
+     *
+     * @param zkNodeType zookeeper node type
+     * @return result : {host : resource info}
+     */
+    public Map<String, String> getServerMaps(ZKNodeType zkNodeType) {
+
+        Map<String, String> masterMap = new HashMap<>();
+        try {
+            String path = getZNodeParentPath(zkNodeType);
+            List<String> serverList = super.getChildrenKeys(path);
+            if (zkNodeType == ZKNodeType.WORKER) {
+                List<String> workerList = new ArrayList<>();
+                for (String group : serverList) {
+                    List<String> groupServers = super.getChildrenKeys(path + 
Constants.SLASH + group);
+                    for (String groupServer : groupServers) {
+                        workerList.add(group + Constants.SLASH + groupServer);
+                    }
+                }
+                serverList = workerList;
+            }
+            for (String server : serverList) {
+                masterMap.putIfAbsent(server, super.get(path + Constants.SLASH 
+ server));
+            }
+        } catch (Exception e) {
+            logger.error("get server list failed", e);
+        }
+
+        return masterMap;
+    }
+
+    /**
+     * check the zookeeper node already exists
+     *
+     * @param host host
+     * @param zkNodeType zookeeper node type
+     * @return true if exists
+     */
+    public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
+        String path = getZNodeParentPath(zkNodeType);
+        if (StringUtils.isEmpty(path)) {
+            logger.error("check zk node exists error, host:{}, zk node 
type:{}",
+                    host, zkNodeType);
+            return false;
+        }
+        Map<String, String> serverMaps = getServerMaps(zkNodeType);
+        for (String hostKey : serverMaps.keySet()) {
+            if (hostKey.contains(host)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @return get worker node parent path
+     */
+    protected String getWorkerZNodeParentPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
+    }
+
+    /**
+     * @return get master node parent path
+     */
+    protected String getMasterZNodeParentPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
+    }
+
+    /**
+     * @return get master lock path
+     */
+    public String getMasterLockPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
+    }
+
+    /**
+     * @param zkNodeType zookeeper node type
+     * @return get zookeeper node parent path
+     */
+    public String getZNodeParentPath(ZKNodeType zkNodeType) {
+        String path = "";
+        switch (zkNodeType) {
+            case MASTER:
+                return getMasterZNodeParentPath();
+            case WORKER:
+                return getWorkerZNodeParentPath();
+            case DEAD_SERVER:
+                return getDeadZNodeParentPath();
+            default:
+                break;
+        }
+        return path;
+    }
+
+
+    /**
+     * @return get master start up lock path
+     */
+    public String getMasterStartUpLockPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
+    }
+
+    /**
+     * @return get master failover lock path
+     */
+    public String getMasterFailoverLockPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
+    }
+
+    /**
+     * @return get worker failover lock path
+     */
+    public String getWorkerFailoverLockPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
+    }
+
+    /**
+     * release mutex
+     *
+     * @param mutex mutex
+     */
+    public void releaseMutex(InterProcessMutex mutex) {
+        if (mutex != null) {
+            try {
+                mutex.release();
+            } catch (Exception e) {
+                if ("instance must be started before calling this 
method".equals(e.getMessage())) {
+                    logger.warn("lock release");
+                } else {
+                    logger.error("lock release failed", e);
+                }
+
+            }
+        }
+    }
+
+    /**
+     * init system znode
+     */
+    protected void initSystemZNode() {
+        try {
+            persist(getMasterZNodeParentPath(), "");
+            persist(getWorkerZNodeParentPath(), "");
+            persist(getDeadZNodeParentPath(), "");
+
+            logger.info("initialize server nodes success.");
+        } catch (Exception e) {
+            logger.error("init system znode failed", e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "AbstractZKClient{"
+                + "zkClient=" + getZkClient()
+                + ", deadServerZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\''
+                + ", masterZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.MASTER) + '\''
+                + ", workerZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.WORKER) + '\''
+                + '}';
+    }
+}
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
new file mode 100644
index 0000000..0fd4a4f
--- /dev/null
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.zk;
+
+import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+
+import java.util.List;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * register operator
+ */
+@Component
+public class RegisterOperator extends ZookeeperCachedOperator {
+
+    private final Logger logger = 
LoggerFactory.getLogger(RegisterOperator.class);
+
+    /**
+     * @return get dead server node parent path
+     */
+    protected String getDeadZNodeParentPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
+    }
+
+    /**
+     * remove dead server by host
+     *
+     * @param host       host
+     * @param serverType serverType
+     * @throws Exception
+     */
+    public void removeDeadServerByHost(String host, String serverType) throws 
Exception {
+        List<String> deadServers = 
super.getChildrenKeys(getDeadZNodeParentPath());
+        for (String serverPath : deadServers) {
+            if (serverPath.startsWith(serverType + UNDERLINE + host)) {
+                String server = getDeadZNodeParentPath() + SINGLE_SLASH + 
serverPath;
+                super.remove(server);
+                logger.info("{} server {} deleted from zk dead server path 
success", serverType, host);
+            }
+        }
+    }
+
+    /**
+     * get host ip, string format: masterParentPath/ip
+     *
+     * @param path path
+     * @return host ip, string format: masterParentPath/ip
+     */
+    protected String getHostByEventDataPath(String path) {
+        if (StringUtils.isEmpty(path)) {
+            logger.error("empty path!");
+            return "";
+        }
+        String[] pathArray = path.split(SINGLE_SLASH);
+        if (pathArray.length < 1) {
+            logger.error("parse ip error: {}", path);
+            return "";
+        }
+        return pathArray[pathArray.length - 1];
+
+    }
+
+    /**
+     * opType(add): if find dead server , then add to zk deadServerPath
+     * opType(delete): delete path from zk
+     *
+     * @param zNode      node path
+     * @param zkNodeType master or worker
+     * @param opType     delete or add
+     * @throws Exception errors
+     */
+    public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String 
opType) throws Exception {
+        String host = getHostByEventDataPath(zNode);
+        String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : 
WORKER_PREFIX;
+
+        //check server restart, if restart , dead server path in zk should be 
delete
+        if (opType.equals(DELETE_ZK_OP)) {
+            removeDeadServerByHost(host, type);
+
+        } else if (opType.equals(ADD_ZK_OP)) {
+            String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + 
type + UNDERLINE + host;
+            if (!super.isExisted(deadServerPath)) {
+                //add dead server info to zk dead server path : /dead-servers/
+
+                super.persist(deadServerPath, (type + UNDERLINE + host));
+
+                logger.info("{} server dead , and {} added to zk dead server 
path success",
+                        zkNodeType, zNode);
+            }
+        }
+
+    }
+
+    /**
+     * opType(add): if find dead server , then add to zk deadServerPath
+     * opType(delete): delete path from zk
+     *
+     * @param zNodeSet   node path set
+     * @param zkNodeType master or worker
+     * @param opType     delete or add
+     * @throws Exception errors
+     */
+    public void handleDeadServer(Set<String> zNodeSet, ZKNodeType zkNodeType, 
String opType) throws Exception {
+
+        String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : 
WORKER_PREFIX;
+        for (String zNode : zNodeSet) {
+            String host = getHostByEventDataPath(zNode);
+            //check server restart, if restart , dead server path in zk should 
be delete
+            if (opType.equals(DELETE_ZK_OP)) {
+                removeDeadServerByHost(host, type);
+
+            } else if (opType.equals(ADD_ZK_OP)) {
+                String deadServerPath = getDeadZNodeParentPath() + 
SINGLE_SLASH + type + UNDERLINE + host;
+                if (!super.isExisted(deadServerPath)) {
+                    //add dead server info to zk dead server path : 
/dead-servers/
+
+                    super.persist(deadServerPath, (type + UNDERLINE + host));
+
+                    logger.info("{} server dead , and {} added to zk dead 
server path success",
+                            zkNodeType, zNode);
+                }
+            }
+
+        }
+
+    }
+}
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
index 5bdc6f8..57ac13e 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
@@ -52,6 +52,9 @@ public class ZookeeperConfig {
     @Value("${zookeeper.dolphinscheduler.root:/dolphinscheduler}")
     private String dsRoot;
 
+    @Value("${zookeeper.max.wait.time:10000}")
+    private int maxWaitTime;
+
     public String getServerList() {
         return serverList;
     }
@@ -115,4 +118,12 @@ public class ZookeeperConfig {
     public void setDsRoot(String dsRoot) {
         this.dsRoot = dsRoot;
     }
+
+    public int getMaxWaitTime() {
+        return maxWaitTime;
+    }
+
+    public void setMaxWaitTime(int maxWaitTime) {
+        this.maxWaitTime = maxWaitTime;
+    }
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
new file mode 100644
index 0000000..f828c07
--- /dev/null
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.zk;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+/**
+ * register operator test
+ */
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class RegisterOperatorTest {
+
+    private static ZKServer zkServer;
+
+    @InjectMocks
+    private RegisterOperator registerOperator;
+
+    @Mock
+    private ZookeeperConfig zookeeperConfig;
+
+    private static final String DS_ROOT = "/dolphinscheduler";
+    private static final String MASTER_NODE = "127.0.0.1:5678";
+
+    @Before
+    public void before() {
+        new Thread(() -> {
+            if (zkServer == null) {
+                zkServer = new ZKServer();
+            }
+            zkServer.startLocalZkServer(2185);
+        }).start();
+    }
+
+    @Test
+    public void testAfterPropertiesSet() throws Exception {
+        TimeUnit.SECONDS.sleep(10);
+        
Mockito.when(zookeeperConfig.getServerList()).thenReturn("127.0.0.1:2185");
+        Mockito.when(zookeeperConfig.getBaseSleepTimeMs()).thenReturn(100);
+        Mockito.when(zookeeperConfig.getMaxRetries()).thenReturn(10);
+        Mockito.when(zookeeperConfig.getMaxSleepMs()).thenReturn(30000);
+        Mockito.when(zookeeperConfig.getSessionTimeoutMs()).thenReturn(60000);
+        
Mockito.when(zookeeperConfig.getConnectionTimeoutMs()).thenReturn(30000);
+        Mockito.when(zookeeperConfig.getDigest()).thenReturn("");
+        Mockito.when(zookeeperConfig.getDsRoot()).thenReturn(DS_ROOT);
+        Mockito.when(zookeeperConfig.getMaxWaitTime()).thenReturn(30000);
+
+        registerOperator.afterPropertiesSet();
+        Assert.assertNotNull(registerOperator.getZkClient());
+    }
+
+    @After
+    public void after() {
+        if (zkServer != null) {
+            zkServer.stop();
+        }
+    }
+
+    @Test
+    public void testGetDeadZNodeParentPath() throws Exception {
+
+        testAfterPropertiesSet();
+        String path = registerOperator.getDeadZNodeParentPath();
+
+        Assert.assertEquals(DS_ROOT + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS, path);
+    }
+
+    @Test
+    public void testHandleDeadServer() throws Exception {
+        testAfterPropertiesSet();
+        registerOperator.handleDeadServer(MASTER_NODE, 
ZKNodeType.MASTER,Constants.ADD_ZK_OP);
+        String path = registerOperator.getDeadZNodeParentPath();
+        
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
+
+    }
+
+    @Test
+    public void testRemoveDeadServerByHost() throws Exception {
+        testAfterPropertiesSet();
+        String path = registerOperator.getDeadZNodeParentPath();
+
+        registerOperator.handleDeadServer(MASTER_NODE, 
ZKNodeType.MASTER,Constants.ADD_ZK_OP);
+        
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
+
+        
registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_PREFIX);
+        
Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
+    }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index aff1f98..fd0ea39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -817,6 +817,7 @@
                         
<include>**/server/master/MasterExecThreadTest.java</include> -->
                         <include>**/server/master/ParamsTest.java</include>
                         
<include>**/server/register/ZookeeperNodeManagerTest.java</include>
+                        
<include>**/server/register/ZookeeperRegistryCenterTest.java</include>
                         <include>**/server/utils/DataxUtilsTest.java</include>
                         
<include>**/server/utils/ExecutionContextTestUtils.java</include>
                         
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
@@ -838,6 +839,8 @@
                         
<include>**/service/quartz/cron/CronUtilsTest.java</include>
                         
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
                         <include>**/service/zk/ZKServerTest.java</include>
+                        
<include>**/service/zk/CuratorZookeeperClientTest.java</include>
+                        
<include>**/service/zk/RegisterOperatorTest.java</include>
                         
<include>**/service/queue/TaskUpdateQueueTest.java</include>
                         
<include>**/service/queue/TaskPriorityTest.java</include>
                         
<include>**/dao/mapper/DataSourceUserMapperTest.java</include>

Reply via email to