This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 1.3.6-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/1.3.6-prepare by this push:
     new 077f4d1  [1.3.6-prepare][Fix-#4840][worker] fix master fault tolerance 
when startup #4845 (#4916)
077f4d1 is described below

commit 077f4d120902288fd3a13f6e7cb3603d16b6f4c3
Author: Kirs <[email protected]>
AuthorDate: Tue Mar 9 15:58:58 2021 +0800

    [1.3.6-prepare][Fix-#4840][worker] fix master fault tolerance when startup 
#4845 (#4916)
    
    * [1.3.6-prepare][Fix-#4840][worker] fix master fault tolerance when 
startup #4845
    
    * code style
---
 .../server/master/MasterServer.java                |  20 +-
 .../dolphinscheduler/server/zk/ZKMasterClient.java | 628 +++++++++++----------
 2 files changed, 328 insertions(+), 320 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index d863742..18882a2 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -25,12 +25,14 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
-import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
 import org.apache.dolphinscheduler.server.worker.WorkerServer;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
+
+import javax.annotation.PostConstruct;
+
 import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,8 +42,6 @@ import 
org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.FilterType;
 
-import javax.annotation.PostConstruct;
-
 
 
 
@@ -74,12 +74,6 @@ public class MasterServer {
     private NettyRemotingServer nettyRemotingServer;
 
     /**
-     * master registry
-     */
-    @Autowired
-    private MasterRegistry masterRegistry;
-
-    /**
      * zk master client
      */
     @Autowired
@@ -117,14 +111,11 @@ public class MasterServer {
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new 
TaskKillResponseProcessor());
         this.nettyRemotingServer.start();
 
-        // register
-        this.masterRegistry.registry();
-
         // self tolerant
         this.zkMasterClient.start();
 
-        //
-        masterSchedulerService.start();
+        // scheduler start
+        this.masterSchedulerService.start();
 
         // start QuartzExecutors
         // what system should do if exception
@@ -178,7 +169,6 @@ public class MasterServer {
             //
             this.masterSchedulerService.close();
             this.nettyRemotingServer.close();
-            this.masterRegistry.unRegistry();
             this.zkMasterClient.close();
             //close quartz
             try{
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index dbba51b..7f74c8c 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.ZKNodeType;
@@ -30,9 +31,11 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -45,314 +48,329 @@ import static 
org.apache.dolphinscheduler.common.Constants.*;
 
 
 /**
- *  zookeeper master client
- *
- *  single instance
+ * zookeeper master client
+ * <p>
+ * single instance
  */
 @Component
 public class ZKMasterClient extends AbstractZKClient {
 
-       /**
-        * logger
-        */
-       private static final Logger logger = 
LoggerFactory.getLogger(ZKMasterClient.class);
-
-       /**
-        *  process service
-        */
-       @Autowired
-       private ProcessService processService;
-
-       public void start() {
-
-               InterProcessMutex mutex = null;
-               try {
-                       // create distributed lock with the root node path of 
the lock space as /dolphinscheduler/lock/failover/master
-                       String znodeLock = getMasterStartUpLockPath();
-                       mutex = new InterProcessMutex(getZkClient(), znodeLock);
-                       mutex.acquire();
-
-                       // init system znode
-                       this.initSystemZNode();
-
-                       while (!checkZKNodeExists(OSUtils.getHost(), 
ZKNodeType.MASTER)){
-                               ThreadUtils.sleep(SLEEP_TIME_MILLIS);
-                       }
-                       // startup tolerant
-                       if (getActiveMasterNum() == 1) {
-                               removeZKNodePath(null, ZKNodeType.MASTER, true);
-                               removeZKNodePath(null, ZKNodeType.WORKER, true);
-                       }
-                       registerListener();
-               }catch (Exception e){
-                       logger.error("master start up exception",e);
-               }finally {
-                       releaseMutex(mutex);
-               }
-       }
-
-       @Override
-       public void close(){
-               super.close();
-       }
-
-       /**
-        * handle path events that this class cares about
-        * @param client   zkClient
-        * @param event    path event
-        * @param path     zk path
-        */
-       @Override
-       protected void dataChanged(CuratorFramework client, TreeCacheEvent 
event, String path) {
-               //monitor master
-               
if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){
-                       handleMasterEvent(event,path);
-               }else 
if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){
-                       //monitor worker
-                       handleWorkerEvent(event,path);
-               }
-       }
-
-       /**
-        * remove zookeeper node path
-        *
-        * @param path                  zookeeper node path
-        * @param zkNodeType    zookeeper node type
-        * @param failover              is failover
-        */
-       private void removeZKNodePath(String path, ZKNodeType zkNodeType, 
boolean failover) {
-               logger.info("{} node deleted : {}", zkNodeType.toString(), 
path);
-               InterProcessMutex mutex = null;
-               try {
-                       String failoverPath = getFailoverLockPath(zkNodeType);
-                       // create a distributed lock
-                       mutex = new InterProcessMutex(getZkClient(), 
failoverPath);
-                       mutex.acquire();
-
-                       String serverHost = null;
-                       if(StringUtils.isNotEmpty(path)){
-                               serverHost = getHostByEventDataPath(path);
-                               if(StringUtils.isEmpty(serverHost)){
-                                       logger.error("server down error: 
unknown path: {}", path);
-                                       return;
-                               }
-                               // handle dead server
-                               handleDeadServer(path, zkNodeType, 
Constants.ADD_ZK_OP);
-                       }
-                       //failover server
-                       if(failover){
-                               failoverServerWhenDown(serverHost, zkNodeType);
-                       }
-               }catch (Exception e){
-                       logger.error("{} server failover failed.", 
zkNodeType.toString());
-                       logger.error("failover exception ",e);
-               }
-               finally {
-                       releaseMutex(mutex);
-               }
-       }
-
-       /**
-        * failover server when server down
-        *
-        * @param serverHost    server host
-        * @param zkNodeType    zookeeper node type
-        * @throws Exception    exception
-        */
-       private void failoverServerWhenDown(String serverHost, ZKNodeType 
zkNodeType) throws Exception {
-               switch (zkNodeType) {
-                       case MASTER:
-                               failoverMaster(serverHost);
-                               break;
-                       case WORKER:
-                               failoverWorker(serverHost, true);
-                       default:
-                               break;
-               }
-       }
-
-       /**
-        * get failover lock path
-        *
-        * @param zkNodeType zookeeper node type
-        * @return fail over lock path
-        */
-       private String getFailoverLockPath(ZKNodeType zkNodeType){
-
-               switch (zkNodeType){
-                       case MASTER:
-                               return getMasterFailoverLockPath();
-                       case WORKER:
-                               return getWorkerFailoverLockPath();
-                       default:
-                               return "";
-               }
-       }
-
-       /**
-        * monitor master
-        * @param event event
-        * @param path path
-        */
-       public void handleMasterEvent(TreeCacheEvent event, String path){
-               switch (event.getType()) {
-                       case NODE_ADDED:
-                               logger.info("master node added : {}", path);
-                               break;
-                       case NODE_REMOVED:
-                               removeZKNodePath(path, ZKNodeType.MASTER, true);
-                               break;
-                       default:
-                               break;
-               }
-       }
-
-       /**
-        * monitor worker
-        * @param event event
-        * @param path path
-        */
-       public void handleWorkerEvent(TreeCacheEvent event, String path){
-               switch (event.getType()) {
-                       case NODE_ADDED:
-                               logger.info("worker node added : {}", path);
-                               break;
-                       case NODE_REMOVED:
-                               logger.info("worker node deleted : {}", path);
-                               removeZKNodePath(path, ZKNodeType.WORKER, true);
-                               break;
-                       default:
-                               break;
-               }
-       }
-
-       /**
-        * task needs failover if task start before worker starts
-        *
-        * @param taskInstance task instance
-        * @return true if task instance need fail over
-        */
-       private boolean checkTaskInstanceNeedFailover(TaskInstance 
taskInstance) throws Exception {
-
-               boolean taskNeedFailover = true;
-
-               //now no host will execute this task instance,so no need to 
failover the task
-               if(taskInstance.getHost() == null){
-                       return false;
-               }
-
-               // if the worker node exists in zookeeper, we must check the 
task starts after the worker
-               if(checkZKNodeExists(taskInstance.getHost(), 
ZKNodeType.WORKER)){
-                       //if task start after worker starts, there is no need 
to failover the task.
-                       if(checkTaskAfterWorkerStart(taskInstance)){
-                               taskNeedFailover = false;
-                       }
-               }
-               return taskNeedFailover;
-       }
-
-       /**
-        * check task start after the worker server starts.
-        *
-        * @param taskInstance task instance
-        * @return true if task instance start time after worker server start 
date
-        */
-       private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
-               if(StringUtils.isEmpty(taskInstance.getHost())){
-                       return false;
-               }
-               Date workerServerStartDate = null;
-               List<Server> workerServers = getServersList(ZKNodeType.WORKER);
-               for(Server workerServer : workerServers){
-                       if(taskInstance.getHost().equals(workerServer.getHost() 
+ Constants.COLON + workerServer.getPort())){
-                               workerServerStartDate = 
workerServer.getCreateTime();
-                               break;
-                       }
-               }
-
-               if(workerServerStartDate != null){
-                       return 
taskInstance.getStartTime().after(workerServerStartDate);
-               }else{
-                       return false;
-               }
-       }
-
-       /**
-        * failover worker tasks
-        *
-        * 1. kill yarn job if there are yarn jobs in tasks.
-        * 2. change task state from running to need failover.
-        * 3. failover all tasks when workerHost is null
-        * @param workerHost worker host
-        */
-
-       /**
-        * failover worker tasks
-        *
-        * 1. kill yarn job if there are yarn jobs in tasks.
-        * 2. change task state from running to need failover.
-        * 3. failover all tasks when workerHost is null
-        * @param workerHost                    worker host
-        * @param needCheckWorkerAlive  need check worker alive
-        * @throws Exception                    exception
-        */
-       private void failoverWorker(String workerHost, boolean 
needCheckWorkerAlive) throws Exception {
-               logger.info("start worker[{}] failover ...", workerHost);
-
-               List<TaskInstance> needFailoverTaskInstanceList = 
processService.queryNeedFailoverTaskInstances(workerHost);
-               for(TaskInstance taskInstance : needFailoverTaskInstanceList){
-                       if(needCheckWorkerAlive){
-                               
if(!checkTaskInstanceNeedFailover(taskInstance)){
-                                       continue;
-                               }
-                       }
-
-                       ProcessInstance processInstance = 
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
-                       if(processInstance != null){
-                               
taskInstance.setProcessInstance(processInstance);
-                       }
-
-                       TaskExecutionContext taskExecutionContext = 
TaskExecutionContextBuilder.get()
-                                       
.buildTaskInstanceRelatedInfo(taskInstance)
-                                       
.buildProcessInstanceRelatedInfo(processInstance)
-                                       .create();
-                       // only kill yarn job if exists , the local thread has 
exited
-                       ProcessUtils.killYarnJob(taskExecutionContext);
-
-                       
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
-                       processService.saveTaskInstance(taskInstance);
-               }
-               logger.info("end worker[{}] failover ...", workerHost);
-       }
-
-       /**
-        * failover master tasks
-        *
-        * @param masterHost master host
-        */
-       private void failoverMaster(String masterHost) {
-               logger.info("start master failover ...");
-
-               List<ProcessInstance> needFailoverProcessInstanceList = 
processService.queryNeedFailoverProcessInstances(masterHost);
-
-               logger.info("failover process list size:{} ", 
needFailoverProcessInstanceList.size());
-               //updateProcessInstance host is null and insert into command
-               for(ProcessInstance processInstance : 
needFailoverProcessInstanceList){
-                       logger.info("failover process instance id: {} host:{}",
-                                       processInstance.getId(), 
processInstance.getHost());
-                       if(Constants.NULL.equals(processInstance.getHost()) ){
-                               continue;
-                       }
-                       
processService.processNeedFailoverProcessInstances(processInstance);
-               }
-
-               logger.info("master failover end");
-       }
-
-       public InterProcessMutex blockAcquireMutex() throws Exception {
-               InterProcessMutex mutex = new InterProcessMutex(getZkClient(), 
getMasterLockPath());
-               mutex.acquire();
-               return mutex;
-       }
+    /**
+     * logger
+     */
+    private static final Logger logger = 
LoggerFactory.getLogger(ZKMasterClient.class);
+
+    /**
+     * process service
+     */
+    @Autowired
+    private ProcessService processService;
+
+    /**
+     * master registry
+     */
+    @Autowired
+    private MasterRegistry masterRegistry;
+
+    public void start() {
+
+        InterProcessMutex mutex = null;
+        try {
+            // create distributed lock with the root node path of the lock 
space as /dolphinscheduler/lock/failover/master
+            String znodeLock = getMasterStartUpLockPath();
+            mutex = new InterProcessMutex(getZkClient(), znodeLock);
+            mutex.acquire();
+
+            //  Master registry
+            masterRegistry.registry();
+
+            // init system znode
+            this.initSystemZNode();
+
+            while (!checkZKNodeExists(OSUtils.getHost(), ZKNodeType.MASTER)) {
+                ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+            }
+            // startup tolerant
+            if (getActiveMasterNum() == 1) {
+                removeZKNodePath(null, ZKNodeType.MASTER, true);
+                removeZKNodePath(null, ZKNodeType.WORKER, true);
+            }
+            registerListener();
+        } catch (Exception e) {
+            logger.error("master start up exception", e);
+        } finally {
+            releaseMutex(mutex);
+        }
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        masterRegistry.unRegistry();
+    }
+
+    /**
+     * handle path events that this class cares about
+     *
+     * @param client zkClient
+     * @param event path event
+     * @param path zk path
+     */
+    @Override
+    protected void dataChanged(CuratorFramework client, TreeCacheEvent event, 
String path) {
+        //monitor master
+        if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + 
Constants.SINGLE_SLASH)) {
+            handleMasterEvent(event, path);
+        } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + 
Constants.SINGLE_SLASH)) {
+            //monitor worker
+            handleWorkerEvent(event, path);
+        }
+    }
+
+    /**
+     * remove zookeeper node path
+     *
+     * @param path zookeeper node path
+     * @param zkNodeType zookeeper node type
+     * @param failover is failover
+     */
+    private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean 
failover) {
+        logger.info("{} node deleted : {}", zkNodeType.toString(), path);
+        InterProcessMutex mutex = null;
+        try {
+            String failoverPath = getFailoverLockPath(zkNodeType);
+            // create a distributed lock
+            mutex = new InterProcessMutex(getZkClient(), failoverPath);
+            mutex.acquire();
+
+            String serverHost = null;
+            if (StringUtils.isNotEmpty(path)) {
+                serverHost = getHostByEventDataPath(path);
+                if (StringUtils.isEmpty(serverHost)) {
+                    logger.error("server down error: unknown path: {}", path);
+                    return;
+                }
+                // handle dead server
+                handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
+            }
+            //failover server
+            if (failover) {
+                failoverServerWhenDown(serverHost, zkNodeType);
+            }
+        } catch (Exception e) {
+            logger.error("{} server failover failed.", zkNodeType.toString());
+            logger.error("failover exception ", e);
+        } finally {
+            releaseMutex(mutex);
+        }
+    }
+
+    /**
+     * failover server when server down
+     *
+     * @param serverHost server host
+     * @param zkNodeType zookeeper node type
+     * @throws Exception exception
+     */
+    private void failoverServerWhenDown(String serverHost, ZKNodeType 
zkNodeType) throws Exception {
+        if (StringUtils.isEmpty(serverHost)) {
+            return;
+        }
+        switch (zkNodeType) {
+            case MASTER:
+                failoverMaster(serverHost);
+                break;
+            case WORKER:
+                failoverWorker(serverHost, true);
+            default:
+                break;
+        }
+    }
+
+    /**
+     * get failover lock path
+     *
+     * @param zkNodeType zookeeper node type
+     * @return fail over lock path
+     */
+    private String getFailoverLockPath(ZKNodeType zkNodeType) {
+
+        switch (zkNodeType) {
+            case MASTER:
+                return getMasterFailoverLockPath();
+            case WORKER:
+                return getWorkerFailoverLockPath();
+            default:
+                return "";
+        }
+    }
+
+    /**
+     * monitor master
+     *
+     * @param event event
+     * @param path path
+     */
+    public void handleMasterEvent(TreeCacheEvent event, String path) {
+        switch (event.getType()) {
+            case NODE_ADDED:
+                logger.info("master node added : {}", path);
+                break;
+            case NODE_REMOVED:
+                removeZKNodePath(path, ZKNodeType.MASTER, true);
+                break;
+            default:
+                break;
+        }
+    }
+
+    /**
+     * monitor worker
+     *
+     * @param event event
+     * @param path path
+     */
+    public void handleWorkerEvent(TreeCacheEvent event, String path) {
+        switch (event.getType()) {
+            case NODE_ADDED:
+                logger.info("worker node added : {}", path);
+                break;
+            case NODE_REMOVED:
+                logger.info("worker node deleted : {}", path);
+                removeZKNodePath(path, ZKNodeType.WORKER, true);
+                break;
+            default:
+                break;
+        }
+    }
+
+    /**
+     * task needs failover if task start before worker starts
+     *
+     * @param taskInstance task instance
+     * @return true if task instance need fail over
+     */
+    private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) 
throws Exception {
+
+        boolean taskNeedFailover = true;
+
+        //now no host will execute this task instance,so no need to failover 
the task
+        if (taskInstance.getHost() == null) {
+            return false;
+        }
+
+        // if the worker node exists in zookeeper, we must check the task 
starts after the worker
+        if (checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)) {
+            //if task start after worker starts, there is no need to failover 
the task.
+            if (checkTaskAfterWorkerStart(taskInstance)) {
+                taskNeedFailover = false;
+            }
+        }
+        return taskNeedFailover;
+    }
+
+    /**
+     * check task start after the worker server starts.
+     *
+     * @param taskInstance task instance
+     * @return true if task instance start time after worker server start date
+     */
+    private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
+        if (StringUtils.isEmpty(taskInstance.getHost())) {
+            return false;
+        }
+        Date workerServerStartDate = null;
+        List<Server> workerServers = getServersList(ZKNodeType.WORKER);
+        for (Server workerServer : workerServers) {
+            if (taskInstance.getHost().equals(workerServer.getHost() + 
Constants.COLON + workerServer.getPort())) {
+                workerServerStartDate = workerServer.getCreateTime();
+                break;
+            }
+        }
+
+        if (workerServerStartDate != null) {
+            return taskInstance.getStartTime().after(workerServerStartDate);
+        }
+        return false;
+    }
+
+    /**
+     * failover worker tasks
+     *
+     * 1. kill yarn job if there are yarn jobs in tasks.
+     * 2. change task state from running to need failover.
+     * 3. failover all tasks when workerHost is null
+     * @param workerHost worker host
+     */
+
+    /**
+     * failover worker tasks
+     * <p>
+     * 1. kill yarn job if there are yarn jobs in tasks.
+     * 2. change task state from running to need failover.
+     * 3. failover all tasks when workerHost is null
+     *
+     * @param workerHost worker host
+     * @param needCheckWorkerAlive need check worker alive
+     * @throws Exception exception
+     */
+    private void failoverWorker(String workerHost, boolean 
needCheckWorkerAlive) throws Exception {
+        logger.info("start worker[{}] failover ...", workerHost);
+
+        List<TaskInstance> needFailoverTaskInstanceList = 
processService.queryNeedFailoverTaskInstances(workerHost);
+        for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
+            if (needCheckWorkerAlive) {
+                if (!checkTaskInstanceNeedFailover(taskInstance)) {
+                    continue;
+                }
+            }
+
+            ProcessInstance processInstance = 
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
+            if (processInstance != null) {
+                taskInstance.setProcessInstance(processInstance);
+            }
+
+            TaskExecutionContext taskExecutionContext = 
TaskExecutionContextBuilder.get()
+                    .buildTaskInstanceRelatedInfo(taskInstance)
+                    .buildProcessInstanceRelatedInfo(processInstance)
+                    .create();
+            // only kill yarn job if exists , the local thread has exited
+            ProcessUtils.killYarnJob(taskExecutionContext);
+
+            taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+            processService.saveTaskInstance(taskInstance);
+        }
+        logger.info("end worker[{}] failover ...", workerHost);
+    }
+
+    /**
+     * failover master tasks
+     *
+     * @param masterHost master host
+     */
+    private void failoverMaster(String masterHost) {
+        logger.info("start master failover ...");
+
+        List<ProcessInstance> needFailoverProcessInstanceList = 
processService.queryNeedFailoverProcessInstances(masterHost);
+
+        logger.info("failover process list size:{} ", 
needFailoverProcessInstanceList.size());
+        //updateProcessInstance host is null and insert into command
+        for (ProcessInstance processInstance : 
needFailoverProcessInstanceList) {
+            logger.info("failover process instance id: {} host:{}",
+                    processInstance.getId(), processInstance.getHost());
+            if (Constants.NULL.equals(processInstance.getHost())) {
+                continue;
+            }
+            
processService.processNeedFailoverProcessInstances(processInstance);
+        }
+
+        logger.info("master failover end");
+    }
+
+    public InterProcessMutex blockAcquireMutex() throws Exception {
+        InterProcessMutex mutex = new InterProcessMutex(getZkClient(), 
getMasterLockPath());
+        mutex.acquire();
+        return mutex;
+    }
 
 }
\ No newline at end of file

Reply via email to