This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.1-release
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.1-release by this push:
     new 9a307dc  fix failover logic (#7276)
9a307dc is described below

commit 9a307dc71abbe66c52438897092b0f5daa5e5e55
Author: wind <[email protected]>
AuthorDate: Wed Dec 8 23:52:40 2021 +0800

    fix failover logic (#7276)
---
 .../master/registry/MasterRegistryClient.java      | 140 +++++++++++++--------
 1 file changed, 89 insertions(+), 51 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 318a06f..ae2b969 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -47,7 +47,9 @@ import org.apache.commons.lang.StringUtils;
 
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -194,7 +196,7 @@ public class MasterRegistryClient {
                 failoverMaster(serverHost);
                 break;
             case WORKER:
-                failoverWorker(serverHost, true, true);
+                failoverWorker(serverHost);
                 break;
             default:
                 break;
@@ -275,78 +277,114 @@ public class MasterRegistryClient {
      * 3. failover all tasks when workerHost is null
      *
      * @param workerHost worker host
-     * @param needCheckWorkerAlive need check worker alive
-     * @param checkOwner need check process instance owner
      */
-    private void failoverWorker(String workerHost, boolean 
needCheckWorkerAlive, boolean checkOwner) {
-        logger.info("start worker[{}] failover ...", workerHost);
+    private void failoverWorker(String workerHost) {
+        if (StringUtils.isEmpty(workerHost)) {
+            return;
+        }
+
+        long startTime = System.currentTimeMillis();
         List<TaskInstance> needFailoverTaskInstanceList = 
processService.queryNeedFailoverTaskInstances(workerHost);
-        for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
-            if (needCheckWorkerAlive) {
-                if (!checkTaskInstanceNeedFailover(taskInstance)) {
-                    continue;
-                }
-            }
+        Map<Integer, ProcessInstance> processInstanceCacheMap = new 
HashMap<>();
+        logger.info("start worker[{}] failover, task list size:{}", 
workerHost, needFailoverTaskInstanceList.size());
 
-            ProcessInstance processInstance = 
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
-            if (workerHost == null
-                    || !checkOwner
-                    || 
processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
-                // only failover the task owned myself if worker down.
+        for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
+            ProcessInstance processInstance = 
processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
+            if (processInstance == null) {
+                processInstance = 
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
                 if (processInstance == null) {
-                    logger.error("failover error, the process {} of task {} do 
not exists.",
+                    logger.error("failover task instance error, 
processInstance {} of taskInstance {} is null",
                             taskInstance.getProcessInstanceId(), 
taskInstance.getId());
                     continue;
                 }
-                taskInstance.setProcessInstance(processInstance);
-
-                TaskExecutionContext taskExecutionContext = 
TaskExecutionContextBuilder.get()
-                        .buildTaskInstanceRelatedInfo(taskInstance)
-                        .buildProcessInstanceRelatedInfo(processInstance)
-                        .create();
-                // only kill yarn job if exists , the local thread has exited
-                ProcessUtils.killYarnJob(taskExecutionContext);
-
-                taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
-                processService.saveTaskInstance(taskInstance);
-                if 
(!processInstanceExecMaps.containsKey(processInstance.getId())) {
-                    return;
-                }
-                WorkflowExecuteThread workflowExecuteThreadNotify = 
processInstanceExecMaps.get(processInstance.getId());
-                StateEvent stateEvent = new StateEvent();
-                stateEvent.setTaskInstanceId(taskInstance.getId());
-                stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-                stateEvent.setProcessInstanceId(processInstance.getId());
-                stateEvent.setExecutionStatus(taskInstance.getState());
-                workflowExecuteThreadNotify.addStateEvent(stateEvent);
+                processInstanceCacheMap.put(processInstance.getId(), 
processInstance);
             }
 
+            // only failover the task owned myself if worker down.
+            if (processInstance.getHost().equalsIgnoreCase(getLocalAddress())) 
{
+                logger.info("failover task instance id: {}, process instance 
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
+                failoverTaskInstance(processInstance, taskInstance);
+            }
         }
-        logger.info("end worker[{}] failover ...", workerHost);
+        logger.info("end worker[{}] failover, useTime:{}ms", workerHost, 
System.currentTimeMillis() - startTime);
     }
 
     /**
-     * failover master tasks
+     * failover master
+     * <p>
+     * failover process instance and associated task instance
      *
      * @param masterHost master host
      */
     private void failoverMaster(String masterHost) {
-        logger.info("start master failover ...");
+        if (StringUtils.isEmpty(masterHost)) {
+            return;
+        }
 
+        long startTime = System.currentTimeMillis();
         List<ProcessInstance> needFailoverProcessInstanceList = 
processService.queryNeedFailoverProcessInstances(masterHost);
+        logger.info("start master[{}] failover, process list size:{}", 
masterHost, needFailoverProcessInstanceList.size());
 
-        logger.info("failover process list size:{} ", 
needFailoverProcessInstanceList.size());
-        //updateProcessInstance host is null and insert into command
         for (ProcessInstance processInstance : 
needFailoverProcessInstanceList) {
-            logger.info("failover process instance id: {} host:{}", 
processInstance.getId(), processInstance.getHost());
             if (Constants.NULL.equals(processInstance.getHost())) {
                 continue;
             }
+
+            logger.info("failover process instance id: {}", 
processInstance.getId());
+
+            List<TaskInstance> validTaskInstanceList = 
processService.findValidTaskListByProcessId(processInstance.getId());
+            for (TaskInstance taskInstance : validTaskInstanceList) {
+                if (Constants.NULL.equals(taskInstance.getHost())) {
+                    continue;
+                }
+                logger.info("failover task instance id: {}, process instance 
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
+                failoverTaskInstance(processInstance, taskInstance);
+            }
+            //updateProcessInstance host is null and insert into command
             
processService.processNeedFailoverProcessInstances(processInstance);
         }
-        failoverWorker(masterHost, true, false);
 
-        logger.info("master failover end");
+        logger.info("master[{}] failover end, useTime:{}ms", masterHost, 
System.currentTimeMillis() - startTime);
+    }
+
+    private void failoverTaskInstance(ProcessInstance processInstance, 
TaskInstance taskInstance) {
+        if (taskInstance == null) {
+            logger.error("failover task instance error, taskInstance is null");
+            return;
+        }
+
+        if (processInstance == null) {
+            logger.error("failover task instance error, processInstance {} of 
taskInstance {} is null",
+                    taskInstance.getProcessInstanceId(), taskInstance.getId());
+            return;
+        }
+
+        if (!checkTaskInstanceNeedFailover(taskInstance)) {
+            return;
+        }
+
+        taskInstance.setProcessInstance(processInstance);
+        TaskExecutionContext taskExecutionContext = 
TaskExecutionContextBuilder.get()
+                .buildTaskInstanceRelatedInfo(taskInstance)
+                .buildProcessInstanceRelatedInfo(processInstance)
+                .create();
+
+        // only kill yarn job if exists , the local thread has exited
+        ProcessUtils.killYarnJob(taskExecutionContext);
+
+        taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+        processService.saveTaskInstance(taskInstance);
+
+        WorkflowExecuteThread workflowExecuteThreadNotify = 
processInstanceExecMaps.get(processInstance.getId());
+        if (workflowExecuteThreadNotify == null) {
+            return;
+        }
+        StateEvent stateEvent = new StateEvent();
+        stateEvent.setTaskInstanceId(taskInstance.getId());
+        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+        stateEvent.setProcessInstanceId(processInstance.getId());
+        stateEvent.setExecutionStatus(taskInstance.getState());
+        workflowExecuteThreadNotify.addStateEvent(stateEvent);
     }
 
     /**
@@ -357,11 +395,11 @@ public class MasterRegistryClient {
         localNodePath = getMasterPath();
         int masterHeartbeatInterval = 
masterConfig.getMasterHeartbeatInterval();
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-            masterConfig.getMasterMaxCpuloadAvg(),
-            masterConfig.getMasterReservedMemory(),
-            Sets.newHashSet(getMasterPath()),
-            Constants.MASTER_TYPE,
-            registryClient);
+                masterConfig.getMasterMaxCpuloadAvg(),
+                masterConfig.getMasterReservedMemory(),
+                Sets.newHashSet(getMasterPath()),
+                Constants.MASTER_TYPE,
+                registryClient);
 
         registryClient.persistEphemeral(localNodePath, 
heartBeatTask.getHeartBeatInfo());
         registryClient.addConnectionStateListener(this::handleConnectionState);

Reply via email to