This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.1-release
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.1-release by this push:
new 9a307dc fix failover logic (#7276)
9a307dc is described below
commit 9a307dc71abbe66c52438897092b0f5daa5e5e55
Author: wind <[email protected]>
AuthorDate: Wed Dec 8 23:52:40 2021 +0800
fix failover logic (#7276)
---
.../master/registry/MasterRegistryClient.java | 140 +++++++++++++--------
1 file changed, 89 insertions(+), 51 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 318a06f..ae2b969 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -47,7 +47,9 @@ import org.apache.commons.lang.StringUtils;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -194,7 +196,7 @@ public class MasterRegistryClient {
failoverMaster(serverHost);
break;
case WORKER:
- failoverWorker(serverHost, true, true);
+ failoverWorker(serverHost);
break;
default:
break;
@@ -275,78 +277,114 @@ public class MasterRegistryClient {
* 3. failover all tasks when workerHost is null
*
* @param workerHost worker host
- * @param needCheckWorkerAlive need check worker alive
- * @param checkOwner need check process instance owner
*/
- private void failoverWorker(String workerHost, boolean
needCheckWorkerAlive, boolean checkOwner) {
- logger.info("start worker[{}] failover ...", workerHost);
+ private void failoverWorker(String workerHost) {
+ if (StringUtils.isEmpty(workerHost)) {
+ return;
+ }
+
+ long startTime = System.currentTimeMillis();
List<TaskInstance> needFailoverTaskInstanceList =
processService.queryNeedFailoverTaskInstances(workerHost);
- for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
- if (needCheckWorkerAlive) {
- if (!checkTaskInstanceNeedFailover(taskInstance)) {
- continue;
- }
- }
+ Map<Integer, ProcessInstance> processInstanceCacheMap = new
HashMap<>();
+ logger.info("start worker[{}] failover, task list size:{}",
workerHost, needFailoverTaskInstanceList.size());
- ProcessInstance processInstance =
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
- if (workerHost == null
- || !checkOwner
- ||
processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
- // only failover the task owned myself if worker down.
+ for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
+ ProcessInstance processInstance =
processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
+ if (processInstance == null) {
+ processInstance =
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if (processInstance == null) {
- logger.error("failover error, the process {} of task {} do
not exists.",
+ logger.error("failover task instance error,
processInstance {} of taskInstance {} is null",
taskInstance.getProcessInstanceId(),
taskInstance.getId());
continue;
}
- taskInstance.setProcessInstance(processInstance);
-
- TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
- .buildTaskInstanceRelatedInfo(taskInstance)
- .buildProcessInstanceRelatedInfo(processInstance)
- .create();
- // only kill yarn job if exists , the local thread has exited
- ProcessUtils.killYarnJob(taskExecutionContext);
-
- taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
- processService.saveTaskInstance(taskInstance);
- if
(!processInstanceExecMaps.containsKey(processInstance.getId())) {
- return;
- }
- WorkflowExecuteThread workflowExecuteThreadNotify =
processInstanceExecMaps.get(processInstance.getId());
- StateEvent stateEvent = new StateEvent();
- stateEvent.setTaskInstanceId(taskInstance.getId());
- stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
- stateEvent.setProcessInstanceId(processInstance.getId());
- stateEvent.setExecutionStatus(taskInstance.getState());
- workflowExecuteThreadNotify.addStateEvent(stateEvent);
+ processInstanceCacheMap.put(processInstance.getId(),
processInstance);
}
+ // only failover the task owned myself if worker down.
+ if (processInstance.getHost().equalsIgnoreCase(getLocalAddress()))
{
+ logger.info("failover task instance id: {}, process instance
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
+ failoverTaskInstance(processInstance, taskInstance);
+ }
}
- logger.info("end worker[{}] failover ...", workerHost);
+ logger.info("end worker[{}] failover, useTime:{}ms", workerHost,
System.currentTimeMillis() - startTime);
}
/**
- * failover master tasks
+ * failover master
+ * <p>
+ * failover process instance and associated task instance
*
* @param masterHost master host
*/
private void failoverMaster(String masterHost) {
- logger.info("start master failover ...");
+ if (StringUtils.isEmpty(masterHost)) {
+ return;
+ }
+ long startTime = System.currentTimeMillis();
List<ProcessInstance> needFailoverProcessInstanceList =
processService.queryNeedFailoverProcessInstances(masterHost);
+ logger.info("start master[{}] failover, process list size:{}",
masterHost, needFailoverProcessInstanceList.size());
- logger.info("failover process list size:{} ",
needFailoverProcessInstanceList.size());
- //updateProcessInstance host is null and insert into command
for (ProcessInstance processInstance :
needFailoverProcessInstanceList) {
- logger.info("failover process instance id: {} host:{}",
processInstance.getId(), processInstance.getHost());
if (Constants.NULL.equals(processInstance.getHost())) {
continue;
}
+
+ logger.info("failover process instance id: {}",
processInstance.getId());
+
+ List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
+ for (TaskInstance taskInstance : validTaskInstanceList) {
+ if (Constants.NULL.equals(taskInstance.getHost())) {
+ continue;
+ }
+ logger.info("failover task instance id: {}, process instance
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
+ failoverTaskInstance(processInstance, taskInstance);
+ }
+ //updateProcessInstance host is null and insert into command
processService.processNeedFailoverProcessInstances(processInstance);
}
- failoverWorker(masterHost, true, false);
- logger.info("master failover end");
+ logger.info("master[{}] failover end, useTime:{}ms", masterHost,
System.currentTimeMillis() - startTime);
+ }
+
+ private void failoverTaskInstance(ProcessInstance processInstance,
TaskInstance taskInstance) {
+ if (taskInstance == null) {
+ logger.error("failover task instance error, taskInstance is null");
+ return;
+ }
+
+ if (processInstance == null) {
+ logger.error("failover task instance error, processInstance {} of
taskInstance {} is null",
+ taskInstance.getProcessInstanceId(), taskInstance.getId());
+ return;
+ }
+
+ if (!checkTaskInstanceNeedFailover(taskInstance)) {
+ return;
+ }
+
+ taskInstance.setProcessInstance(processInstance);
+ TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .create();
+
+ // only kill yarn job if exists , the local thread has exited
+ ProcessUtils.killYarnJob(taskExecutionContext);
+
+ taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+ processService.saveTaskInstance(taskInstance);
+
+ WorkflowExecuteThread workflowExecuteThreadNotify =
processInstanceExecMaps.get(processInstance.getId());
+ if (workflowExecuteThreadNotify == null) {
+ return;
+ }
+ StateEvent stateEvent = new StateEvent();
+ stateEvent.setTaskInstanceId(taskInstance.getId());
+ stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+ stateEvent.setProcessInstanceId(processInstance.getId());
+ stateEvent.setExecutionStatus(taskInstance.getState());
+ workflowExecuteThreadNotify.addStateEvent(stateEvent);
}
/**
@@ -357,11 +395,11 @@ public class MasterRegistryClient {
localNodePath = getMasterPath();
int masterHeartbeatInterval =
masterConfig.getMasterHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
- masterConfig.getMasterMaxCpuloadAvg(),
- masterConfig.getMasterReservedMemory(),
- Sets.newHashSet(getMasterPath()),
- Constants.MASTER_TYPE,
- registryClient);
+ masterConfig.getMasterMaxCpuloadAvg(),
+ masterConfig.getMasterReservedMemory(),
+ Sets.newHashSet(getMasterPath()),
+ Constants.MASTER_TYPE,
+ registryClient);
registryClient.persistEphemeral(localNodePath,
heartBeatTask.getHeartBeatInfo());
registryClient.addConnectionStateListener(this::handleConnectionState);