This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3f69ec8f28 [Fix-10842] Fix master/worker failover will cause status
incorrect (#10839)
3f69ec8f28 is described below
commit 3f69ec8f28c7206c153f5de8c7c0d1fa33d311f9
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Jul 9 11:54:59 2022 +0800
[Fix-10842] Fix master/worker failover will cause status incorrect (#10839)
* Fix master failover will not update task instance status
* Add some failover log
* Fix worker failover will rerun task more than once
* Fix workflowInstance failover may rerun already success taskInstance
---
.../apache/dolphinscheduler/common/graph/DAG.java | 11 +
.../master/consumer/TaskPriorityQueueConsumer.java | 32 +-
.../master/dispatch/context/ExecutionContext.java | 45 +--
.../dispatch/executor/NettyExecutorManager.java | 3 +
.../processor/queue/TaskExecuteRunnable.java | 153 +++++----
.../registry/MasterRegistryDataListener.java | 1 +
.../master/runner/FailoverExecuteThread.java | 5 +-
.../master/runner/StateWheelExecuteThread.java | 12 +-
.../master/runner/WorkflowExecuteRunnable.java | 125 ++++---
.../master/runner/task/CommonTaskProcessor.java | 5 +-
.../server/master/service/FailoverService.java | 371 +--------------------
.../master/service/MasterFailoverService.java | 253 ++++++++++++++
.../master/service/WorkerFailoverService.java | 266 +++++++++++++++
.../master/dispatch/ExecutionContextTestUtils.java | 2 +-
.../executor/NettyExecutorManagerTest.java | 4 +-
.../server/master/service/FailoverServiceTest.java | 22 +-
.../service/process/ProcessService.java | 4 +-
.../service/process/ProcessServiceImpl.java | 42 ++-
.../service/queue/TaskPriority.java | 41 ++-
.../service/process/ProcessServiceTest.java | 5 +-
20 files changed, 852 insertions(+), 550 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java
index e57b3dd93e..afa780163a 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java
@@ -498,5 +498,16 @@ public class DAG<Node, NodeInfo, EdgeInfo> {
return new AbstractMap.SimpleEntry<>(notZeroIndegreeNodeMap.size() ==
0, topoResultList);
}
+ @Override
+ public String toString() {
+ return "DAG{"
+ + "nodesMap="
+ + nodesMap
+ + ", edgesMap="
+ + edgesMap
+ + ", reverseEdgesMap="
+ + reverseEdgesMap
+ + '}';
+ }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index f04ae15dd8..82198e942a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -34,11 +34,11 @@ import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import
org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections.CollectionUtils;
@@ -46,6 +46,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -187,8 +188,24 @@ public class TaskPriorityQueueConsumer extends
BaseDaemonThread {
TaskMetrics.incTaskDispatch();
boolean result = false;
try {
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+
processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
+ if (workflowExecuteRunnable == null) {
+ logger.error("Cannot find the related processInstance of the
task, taskPriority: {}", taskPriority);
+ return true;
+ }
+ Optional<TaskInstance> taskInstanceOptional =
+
workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
+ if (!taskInstanceOptional.isPresent()) {
+ logger.error("Cannot find the task instance from related
processInstance, taskPriority: {}",
+ taskPriority);
+ // we return true, so that we will drop this task.
+ return true;
+ }
+ TaskInstance taskInstance = taskInstanceOptional.get();
TaskExecutionContext context =
taskPriority.getTaskExecutionContext();
- ExecutionContext executionContext = new
ExecutionContext(toCommand(context), ExecutorType.WORKER,
context.getWorkerGroup());
+ ExecutionContext executionContext =
+ new ExecutionContext(toCommand(context), ExecutorType.WORKER,
context.getWorkerGroup(), taskInstance);
if (isTaskNeedToCheck(taskPriority)) {
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
@@ -196,16 +213,21 @@ public class TaskPriorityQueueConsumer extends
BaseDaemonThread {
return true;
}
}
+
result = dispatcher.dispatch(executionContext);
if (result) {
- logger.info("Master success dispatch task to worker,
taskInstanceId: {}", taskPriority.getTaskId());
+ logger.info("Master success dispatch task to worker,
taskInstanceId: {}, worker: {}",
+ taskPriority.getTaskId(),
+ executionContext.getHost());
addDispatchEvent(context, executionContext);
} else {
- logger.info("Master failed to dispatch task to worker,
taskInstanceId: {}", taskPriority.getTaskId());
+ logger.info("Master failed to dispatch task to worker,
taskInstanceId: {}, worker: {}",
+ taskPriority.getTaskId(),
+ executionContext.getHost());
}
} catch (RuntimeException | ExecuteException e) {
- logger.error("Master dispatch task to worker error: ", e);
+ logger.error("Master dispatch task to worker error, taskPriority:
{}", taskPriority, e);
}
return result;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index b3fba87870..880640e91d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -19,13 +19,17 @@ package
org.apache.dolphinscheduler.server.master.dispatch.context;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import lombok.Data;
+
/**
* execution context
*/
+@Data
public class ExecutionContext {
/**
@@ -34,51 +38,30 @@ public class ExecutionContext {
private Host host;
/**
- * command
+ * command
*/
private final Command command;
+ private final TaskInstance taskInstance;
+
/**
- * executor type : worker or client
+ * executor type : worker or client
*/
private final ExecutorType executorType;
/**
- * worker group
+ * worker group
*/
- private String workerGroup;
+ private final String workerGroup;
- public ExecutionContext(Command command, ExecutorType executorType) {
- this(command, executorType, DEFAULT_WORKER_GROUP);
+ public ExecutionContext(Command command, ExecutorType executorType,
TaskInstance taskInstance) {
+ this(command, executorType, DEFAULT_WORKER_GROUP, taskInstance);
}
- public ExecutionContext(Command command, ExecutorType executorType, String
workerGroup) {
+ public ExecutionContext(Command command, ExecutorType executorType, String
workerGroup, TaskInstance taskInstance) {
this.command = command;
this.executorType = executorType;
this.workerGroup = workerGroup;
- }
-
- public Command getCommand() {
- return command;
- }
-
- public ExecutorType getExecutorType() {
- return executorType;
- }
-
- public void setWorkerGroup(String workerGroup) {
- this.workerGroup = workerGroup;
- }
-
- public String getWorkerGroup() {
- return this.workerGroup;
- }
-
- public Host getHost() {
- return host;
- }
-
- public void setHost(Host host) {
- this.host = host;
+ this.taskInstance = taskInstance;
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 82f3416569..fe172f9816 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -117,6 +117,9 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean> {
doExecute(host, command);
success = true;
context.setHost(host);
+ // We set the host to taskInstance to avoid when the worker
down, this taskInstance may not be failovered, due to the taskInstance's host
+ // is not belongs to the down worker ISSUE-10842.
+ context.getTaskInstance().setHost(host.getAddress());
} catch (ExecuteException ex) {
logger.error(String.format("execute command : %s error",
command), ex);
try {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index f2a6d6873e..9fc96e7564 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -27,6 +26,7 @@ import
org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
@@ -76,7 +76,7 @@ public class TaskExecuteRunnable implements Runnable {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(),
event.getTaskInstanceId());
persist(event);
} catch (Exception e) {
- logger.error("persist error, event:{}, error: {}", event, e);
+ logger.error("persist task event error, event:{}", event, e);
} finally {
this.events.remove(event);
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
@@ -114,37 +114,44 @@ public class TaskExecuteRunnable implements Runnable {
*
* @param taskEvent taskEvent
*/
- private void persist(TaskEvent taskEvent) {
+ private void persist(TaskEvent taskEvent) throws Exception {
Event event = taskEvent.getEvent();
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
Optional<TaskInstance> taskInstance;
- WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable != null &&
workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) {
taskInstance =
workflowExecuteRunnable.getTaskInstance(taskInstanceId);
} else {
taskInstance =
Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId));
}
+ boolean needToSendEvent = true;
switch (event) {
case DISPATCH:
- handleDispatchEvent(taskEvent, taskInstance);
+ needToSendEvent = handleDispatchEvent(taskEvent, taskInstance);
// dispatch event do not need to submit state event
- return;
+ break;
case DELAY:
case RUNNING:
- handleRunningEvent(taskEvent, taskInstance);
+ needToSendEvent = handleRunningEvent(taskEvent, taskInstance);
break;
case RESULT:
- handleResultEvent(taskEvent, taskInstance);
+ needToSendEvent = handleResultEvent(taskEvent, taskInstance);
break;
case WORKER_REJECT:
- handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance,
workflowExecuteRunnable);
+ needToSendEvent =
+ handleWorkerRejectEvent(taskEvent.getChannel(),
taskInstance, workflowExecuteRunnable);
break;
default:
throw new IllegalArgumentException("invalid event type : " +
event);
}
+ if (!needToSendEvent) {
+ logger.info("Handle task event: {} success, there is no need to
send a StateEvent", taskEvent);
+ return;
+ }
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
@@ -157,101 +164,101 @@ public class TaskExecuteRunnable implements Runnable {
/**
* handle dispatch event
*/
- private void handleDispatchEvent(TaskEvent taskEvent,
Optional<TaskInstance> taskInstanceOptional) {
+ private boolean handleDispatchEvent(TaskEvent taskEvent,
Optional<TaskInstance> taskInstanceOptional) {
if (!taskInstanceOptional.isPresent()) {
logger.error("taskInstance is null");
- return;
+ return false;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
- return;
+ return false;
}
taskInstance.setState(ExecutionStatus.DISPATCH);
taskInstance.setHost(taskEvent.getWorkerAddress());
processService.saveTaskInstance(taskInstance);
+ return true;
}
/**
* handle running event
*/
- private void handleRunningEvent(TaskEvent taskEvent,
Optional<TaskInstance> taskInstanceOptional) {
+ private boolean handleRunningEvent(TaskEvent taskEvent,
Optional<TaskInstance> taskInstanceOptional) {
Channel channel = taskEvent.getChannel();
- try {
- if (taskInstanceOptional.isPresent()) {
- TaskInstance taskInstance = taskInstanceOptional.get();
- if (taskInstance.getState().typeIsFinished()) {
- logger.warn("task is finish, running event is meaningless,
taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
- } else {
- taskInstance.setState(taskEvent.getState());
- taskInstance.setStartTime(taskEvent.getStartTime());
- taskInstance.setHost(taskEvent.getWorkerAddress());
- taskInstance.setLogPath(taskEvent.getLogPath());
- taskInstance.setExecutePath(taskEvent.getExecutePath());
- taskInstance.setPid(taskEvent.getProcessId());
- taskInstance.setAppLink(taskEvent.getAppIds());
- processService.saveTaskInstance(taskInstance);
- }
- }
- // if taskInstance is null (maybe deleted) or finish. retry will
be meaningless . so ack success
- // send ack to worker
- TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new
TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId());
-
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
- } catch (Exception e) {
- logger.error("handle worker ack master error", e);
- TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new
TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
-
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
- }
- }
-
- /**
- * handle result event
- */
- private void handleResultEvent(TaskEvent taskEvent, Optional<TaskInstance>
taskInstanceOptional) {
- Channel channel = taskEvent.getChannel();
- try {
- if (taskInstanceOptional.isPresent()) {
- TaskInstance taskInstance = taskInstanceOptional.get();
- dataQualityResultOperator.operateDqExecuteResult(taskEvent,
taskInstance);
-
+ if (taskInstanceOptional.isPresent()) {
+ TaskInstance taskInstance = taskInstanceOptional.get();
+ if (taskInstance.getState().typeIsFinished()) {
+ logger.warn("task is finish, running event is meaningless,
taskInstanceId:{}, state:{}",
+ taskInstance.getId(),
+ taskInstance.getState());
+ return false;
+ } else {
+ taskInstance.setState(taskEvent.getState());
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setLogPath(taskEvent.getLogPath());
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
- taskInstance.setState(taskEvent.getState());
- taskInstance.setEndTime(taskEvent.getEndTime());
- taskInstance.setVarPool(taskEvent.getVarPool());
- processService.changeOutParam(taskInstance);
processService.saveTaskInstance(taskInstance);
}
- // if taskInstance is null (maybe deleted) . retry will be
meaningless . so response success
- TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new
TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId());
-
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
- } catch (Exception e) {
- logger.error("handle worker response master error", e);
- TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new
TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
-
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
}
+ // if taskInstance is null (maybe deleted) or finish. retry will be
meaningless . so ack success
+ // send ack to worker
+ TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
+ new
TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId());
+ channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+ return true;
}
/**
* handle result event
*/
- private void handleWorkerRejectEvent(Channel channel,
Optional<TaskInstance> taskInstanceOptional, WorkflowExecuteRunnable
executeThread) {
- TaskInstance taskInstance = taskInstanceOptional.orElseThrow(() -> new
RuntimeException("taskInstance is null"));
- try {
- if (executeThread != null) {
- executeThread.resubmit(taskInstance.getTaskCode());
- }
- if (channel != null) {
- TaskRecallAckCommand taskRecallAckCommand = new
TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId());
- channel.writeAndFlush(taskRecallAckCommand.convert2Command());
+ private boolean handleResultEvent(TaskEvent taskEvent,
Optional<TaskInstance> taskInstanceOptional) {
+ Channel channel = taskEvent.getChannel();
+ if (taskInstanceOptional.isPresent()) {
+ TaskInstance taskInstance = taskInstanceOptional.get();
+ if (taskInstance.getState().typeIsFinished()) {
+ logger.warn("The current taskInstance has already been
finished, taskEvent: {}", taskEvent);
+ return false;
}
- } catch (Exception e) {
- logger.error("handle worker reject error", e);
- TaskRecallAckCommand taskRecallAckCommand = new
TaskRecallAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance.getId());
+
+ dataQualityResultOperator.operateDqExecuteResult(taskEvent,
taskInstance);
+
+ taskInstance.setStartTime(taskEvent.getStartTime());
+ taskInstance.setHost(taskEvent.getWorkerAddress());
+ taskInstance.setLogPath(taskEvent.getLogPath());
+ taskInstance.setExecutePath(taskEvent.getExecutePath());
+ taskInstance.setPid(taskEvent.getProcessId());
+ taskInstance.setAppLink(taskEvent.getAppIds());
+ taskInstance.setState(taskEvent.getState());
+ taskInstance.setEndTime(taskEvent.getEndTime());
+ taskInstance.setVarPool(taskEvent.getVarPool());
+ processService.changeOutParam(taskInstance);
+ processService.saveTaskInstance(taskInstance);
+ }
+ // if taskInstance is null (maybe deleted) . retry will be meaningless
. so response success
+ TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
+ new
TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId());
+ channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
+ return true;
+ }
+
+ /**
+ * handle result event
+ */
+ private boolean handleWorkerRejectEvent(Channel channel,
+ Optional<TaskInstance>
taskInstanceOptional,
+ WorkflowExecuteRunnable
executeThread) throws Exception {
+ TaskInstance taskInstance =
+ taskInstanceOptional.orElseThrow(() -> new
RuntimeException("taskInstance is null"));
+ if (executeThread != null) {
+ executeThread.resubmit(taskInstance.getTaskCode());
+ }
+ if (channel != null) {
+ TaskRecallAckCommand taskRecallAckCommand =
+ new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskInstance.getId());
channel.writeAndFlush(taskRecallAckCommand.convert2Command());
}
+ return true;
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
index 30cea36db7..1eafb4cb54 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
@@ -64,6 +64,7 @@ public class MasterRegistryDataListener implements
SubscribeListener {
break;
case REMOVE:
masterRegistryClient.removeMasterNodePath(path,
NodeType.MASTER, true);
+
break;
default:
break;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 63f4215f27..16656c8760 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
+import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +42,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
* failover service
*/
@Autowired
- private FailoverService failoverService;
+ private MasterFailoverService masterFailoverService;
protected FailoverExecuteThread() {
super("FailoverExecuteThread");
@@ -63,7 +64,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
try {
// todo: DO we need to schedule a task to do this kind of check
// This kind of check may only need to be executed when a
master server start
- failoverService.checkMasterFailover();
+ masterFailoverService.checkMasterFailover();
} catch (Exception e) {
logger.error("Master failover thread execute error", e);
} finally {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index c70fb1f1d4..843c1b246c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
@@ -31,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import java.util.Optional;
@@ -296,15 +296,21 @@ public class StateWheelExecuteThread extends
BaseDaemonThread {
}
if (!taskInstanceOptional.isPresent()) {
- logger.warn("Task instance retry check failed, can not
find taskInstance from workflowExecuteThread, will remove this check");
+ logger.warn(
+ "Task instance retry check failed, can not find
taskInstance from workflowExecuteThread, will remove this check");
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = taskInstanceOptional.get();
- if (taskInstance.retryTaskIntervalOverTime()) {
+ // We check the status to avoid when we do worker failover we
submit a failover task, this task may be resubmit by this
+ // thread
+ if (taskInstance.getState() !=
ExecutionStatus.NEED_FAULT_TOLERANCE
+ && taskInstance.retryTaskIntervalOverTime()) {
// reset taskInstance endTime and state
// todo relative funtion:
TaskInstance.retryTaskIntervalOverTime,
WorkflowExecuteThread.cloneRetryTaskInstance
+ logger.info("[TaskInstance-{}]The task instance can retry,
will retry this task instance",
+ taskInstance.getId());
taskInstance.setEndTime(null);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 525aa6154e..32d1724b15 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -108,6 +108,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeanUtils;
import com.google.common.collect.Lists;
@@ -141,7 +142,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
/**
* process instance
*/
- private ProcessInstance processInstance;
+ private final ProcessInstance processInstance;
/**
* process definition
@@ -298,6 +299,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
StateEventHandler stateEventHandler =
StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
.orElseThrow(() -> new StateEventHandleError("Cannot
find handler for the given state event"));
+ logger.info("Begin to handle state event, {}", stateEvent);
if (stateEventHandler.handleStateEvent(this, stateEvent)) {
this.stateEvents.remove(stateEvent);
}
@@ -483,8 +485,12 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
*/
public void refreshProcessInstance(int processInstanceId) {
logger.info("process instance update: {}", processInstanceId);
- processInstance =
processService.findProcessInstanceById(processInstanceId);
- processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
+ ProcessInstance newProcessInstance =
processService.findProcessInstanceById(processInstanceId);
+ // just update the processInstance field(this is soft copy)
+ BeanUtils.copyProperties(newProcessInstance, processInstance);
+
+ processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
}
@@ -770,6 +776,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
// generate process dag
dag = DagHelper.buildDagGraph(processDag);
+ logger.info("Build dag success, dag: {}", dag);
}
/**
@@ -784,45 +791,60 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
errorTaskMap.clear();
if (!isNewProcessInstance()) {
+ logger.info("The workflowInstance is not a newly running instance,
runtimes: {}, recover flag: {}",
+ processInstance.getRunTimes(),
+ processInstance.getRecovery());
List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) {
- if (validTaskMap.containsKey(task.getTaskCode())) {
- int oldTaskInstanceId =
validTaskMap.get(task.getTaskCode());
- TaskInstance oldTaskInstance =
taskInstanceMap.get(oldTaskInstanceId);
- if (!oldTaskInstance.getState().typeIsFinished() &&
task.getState().typeIsFinished()) {
- task.setFlag(Flag.NO);
- processService.updateTaskInstance(task);
- continue;
+ try {
+
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(),
task.getId());
+ logger.info(
+ "Check the taskInstance from a exist workflowInstance,
existTaskInstanceCode: {}, taskInstanceStatus: {}",
+ task.getTaskCode(),
+ task.getState());
+ if (validTaskMap.containsKey(task.getTaskCode())) {
+ int oldTaskInstanceId =
validTaskMap.get(task.getTaskCode());
+ TaskInstance oldTaskInstance =
taskInstanceMap.get(oldTaskInstanceId);
+ if (!oldTaskInstance.getState().typeIsFinished() &&
task.getState().typeIsFinished()) {
+ task.setFlag(Flag.NO);
+ processService.updateTaskInstance(task);
+ continue;
+ }
+ logger.warn("have same taskCode taskInstance when init
task queue, taskCode:{}",
+ task.getTaskCode());
}
- logger.warn("have same taskCode taskInstance when init
task queue, taskCode:{}",
- task.getTaskCode());
- }
- validTaskMap.put(task.getTaskCode(), task.getId());
- taskInstanceMap.put(task.getId(), task);
+ validTaskMap.put(task.getTaskCode(), task.getId());
+ taskInstanceMap.put(task.getId(), task);
- if (task.isTaskComplete()) {
- completeTaskMap.put(task.getTaskCode(), task.getId());
- continue;
- }
- if (task.isConditionsTask() ||
DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
- continue;
- }
- if (task.taskCanRetry()) {
- if (task.getState() ==
ExecutionStatus.NEED_FAULT_TOLERANCE) {
- // tolerantTaskInstance add to standby list directly
- TaskInstance tolerantTaskInstance =
cloneTolerantTaskInstance(task);
- addTaskToStandByList(tolerantTaskInstance);
- } else {
- retryTaskInstance(task);
+ if (task.isTaskComplete()) {
+ completeTaskMap.put(task.getTaskCode(), task.getId());
+ continue;
}
- continue;
- }
- if (task.getState().typeIsFailure()) {
- errorTaskMap.put(task.getTaskCode(), task.getId());
+ if (task.isConditionsTask() ||
DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()),
+ dag)) {
+ continue;
+ }
+ if (task.taskCanRetry()) {
+ if (task.getState() ==
ExecutionStatus.NEED_FAULT_TOLERANCE) {
+ // tolerantTaskInstance add to standby list
directly
+ TaskInstance tolerantTaskInstance =
cloneTolerantTaskInstance(task);
+ addTaskToStandByList(tolerantTaskInstance);
+ } else {
+ retryTaskInstance(task);
+ }
+ continue;
+ }
+ if (task.getState().typeIsFailure()) {
+ errorTaskMap.put(task.getTaskCode(), task.getId());
+ }
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
+ } else {
+ logger.info("The current workflowInstance is a newly running
workflowInstance");
}
if (processInstance.isComplementData() &&
complementListDate.isEmpty()) {
@@ -849,15 +871,22 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
if (!complementListDate.isEmpty() && Flag.NO ==
processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementListDate.get(0));
- String globalParams =
-
curingParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA,
- processInstance.getScheduleTime(),
cmdParam.get(Constants.SCHEDULE_TIMEZONE));
+ String globalParams =
curingParamsService.curingGlobalParams(processInstance.getId(),
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ CommandType.COMPLEMENT_DATA,
+ processInstance.getScheduleTime(),
+ cmdParam.get(Constants.SCHEDULE_TIMEZONE));
processInstance.setGlobalParams(globalParams);
processService.updateProcessInstance(processInstance);
}
}
}
}
+ logger.info("Initialize task queue, dependFailedTaskMap: {},
completeTaskMap: {}, errorTaskMap: {}",
+ dependFailedTaskMap,
+ completeTaskMap,
+ errorTaskMap);
}
/**
@@ -899,6 +928,15 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getTaskCode(),
taskProcessor);
+ boolean dispatchSuccess =
taskProcessor.action(TaskAction.DISPATCH);
+ if (!dispatchSuccess) {
+ logger.error("process id:{} name:{} dispatch standby task
id:{} name:{} failed!",
+ processInstance.getId(),
+ processInstance.getName(),
+ taskInstance.getId(),
+ taskInstance.getName());
+ return Optional.empty();
+ }
taskProcessor.action(TaskAction.RUN);
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance,
taskInstance);
@@ -1816,14 +1854,19 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
* is new process instance
*/
private boolean isNewProcessInstance() {
+ if (Flag.YES.equals(processInstance.getRecovery())) {
+ logger.info("This workInstance will be recover by this execution");
+ return false;
+ }
+
if (ExecutionStatus.RUNNING_EXECUTION == processInstance.getState() &&
processInstance.getRunTimes() == 1) {
return true;
- } else if (processInstance.getRecovery().equals(Flag.YES)) {
- // host is empty use old task instance
- return false;
- } else {
- return false;
}
+ logger.info(
+ "The workflowInstance has been executed before, this execution is
to reRun, processInstance status: {}, runTimes: {}",
+ processInstance.getState(),
+ processInstance.getRunTimes());
+ return false;
}
public void resubmit(long taskCode) throws Exception {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index fcfe7c67bf..06a72d1c88 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -67,7 +67,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return true;
}
}
- dispatchTask();
return true;
}
@@ -119,7 +118,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
logger.info("submit task, but the status of the task {} is
already running or delayed.", taskInstance.getName());
return true;
}
- logger.info("task ready to submit: taskInstanceId: {}",
taskInstance.getId());
+ logger.info("task ready to dispatch to worker: taskInstanceId:
{}", taskInstance.getId());
TaskPriority taskPriority = new
TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(),
@@ -167,7 +166,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
killCommand.setTaskInstanceId(taskInstance.getId());
- ExecutionContext executionContext = new
ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER);
+ ExecutionContext executionContext = new
ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER,
taskInstance);
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index 2d4bdfe5da..05a54e8529 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -17,46 +17,12 @@
package org.apache.dolphinscheduler.server.master.service;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
-import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
-import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
-import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
-import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.StopWatch;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-import io.micrometer.core.annotation.Counted;
-import io.micrometer.core.annotation.Timed;
import lombok.NonNull;
/**
@@ -65,41 +31,14 @@ import lombok.NonNull;
@Component
public class FailoverService {
private static final Logger LOGGER =
LoggerFactory.getLogger(FailoverService.class);
- private final RegistryClient registryClient;
- private final MasterConfig masterConfig;
- private final ProcessService processService;
- private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
- private final ProcessInstanceExecCacheManager cacheManager;
- private final String localAddress;
-
- public FailoverService(@NonNull RegistryClient registryClient,
- @NonNull MasterConfig masterConfig,
- @NonNull ProcessService processService,
- @NonNull WorkflowExecuteThreadPool
workflowExecuteThreadPool,
- @NonNull ProcessInstanceExecCacheManager
cacheManager) {
- this.registryClient = registryClient;
- this.masterConfig = masterConfig;
- this.processService = processService;
- this.workflowExecuteThreadPool = workflowExecuteThreadPool;
- this.cacheManager = cacheManager;
- this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
- }
- /**
- * check master failover
- */
- @Counted(value = "ds.master.scheduler.failover.check.count")
- @Timed(value = "ds.master.scheduler.failover.check.time", percentiles =
{0.5, 0.75, 0.95, 0.99}, histogram = true)
- public void checkMasterFailover() {
- List<String> hosts = getNeedFailoverMasterServers();
- if (CollectionUtils.isEmpty(hosts)) {
- return;
- }
- LOGGER.info("Master failover service {} begin to failover hosts:{}",
localAddress, hosts);
+ private final MasterFailoverService masterFailoverService;
+ private final WorkerFailoverService workerFailoverService;
- for (String host : hosts) {
- failoverMasterWithLock(host);
- }
+ public FailoverService(@NonNull MasterFailoverService
masterFailoverService,
+ @NonNull WorkerFailoverService
workerFailoverService) {
+ this.masterFailoverService = masterFailoverService;
+ this.workerFailoverService = workerFailoverService;
}
/**
@@ -111,304 +50,18 @@ public class FailoverService {
public void failoverServerWhenDown(String serverHost, NodeType nodeType) {
switch (nodeType) {
case MASTER:
- failoverMasterWithLock(serverHost);
+ LOGGER.info("Master failover starting, masterServer: {}",
serverHost);
+ masterFailoverService.failoverMaster(serverHost);
+ LOGGER.info("Master failover finished, masterServer: {}",
serverHost);
break;
case WORKER:
- failoverWorker(serverHost);
+ LOGGER.info("Worker failover staring, workerServer: {}",
serverHost);
+ workerFailoverService.failoverWorker(serverHost);
+ LOGGER.info("Worker failover finished, workerServer: {}",
serverHost);
break;
default:
break;
}
}
- private void failoverMasterWithLock(String masterHost) {
- String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);
- try {
- registryClient.getLock(failoverPath);
- this.failoverMaster(masterHost);
- } catch (Exception e) {
- LOGGER.error("{} server failover failed, host:{}",
NodeType.MASTER, masterHost, e);
- } finally {
- registryClient.releaseLock(failoverPath);
- }
- }
-
- /**
- * Failover master, will failover process instance and associated task
instance.
- * <p>When the process instance belongs to the given masterHost and the
restartTime is before the current server start up time,
- * then the process instance will be failovered.
- *
- * @param masterHost master host
- */
- private void failoverMaster(String masterHost) {
- if (StringUtils.isEmpty(masterHost)) {
- return;
- }
- Date serverStartupTime = getServerStartupTime(NodeType.MASTER,
masterHost);
- StopWatch failoverTimeCost = StopWatch.createStarted();
- List<ProcessInstance> needFailoverProcessInstanceList =
processService.queryNeedFailoverProcessInstances(masterHost);
- LOGGER.info("start master[{}] failover, need to failover process list
size:{}", masterHost, needFailoverProcessInstanceList.size());
-
- // servers need to contain master hosts and worker hosts, otherwise
the logic task will failover fail.
- List<Server> servers = registryClient.getServerList(NodeType.WORKER);
- servers.addAll(registryClient.getServerList(NodeType.MASTER));
-
- for (ProcessInstance processInstance :
needFailoverProcessInstanceList) {
- if (Constants.NULL.equals(processInstance.getHost())) {
- continue;
- }
-
- List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
- for (TaskInstance taskInstance : validTaskInstanceList) {
- LOGGER.info("failover task instance id: {}, process instance
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
- failoverTaskInstance(processInstance, taskInstance, servers);
- }
-
- if (serverStartupTime != null && processInstance.getRestartTime()
!= null
- &&
processInstance.getRestartTime().after(serverStartupTime)) {
- continue;
- }
-
- LOGGER.info("failover process instance id: {}",
processInstance.getId());
- ProcessInstanceMetrics.incProcessInstanceFailover();
- //updateProcessInstance host is null and insert into command
- processInstance.setHost(Constants.NULL);
-
processService.processNeedFailoverProcessInstances(processInstance);
- }
-
- failoverTimeCost.stop();
- LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost,
failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
- }
-
- /**
- * Do the worker failover. Will find the
SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP
tasks belong the given worker,
- * and failover these tasks.
- * <p>
- * Note: When we do worker failover, the master will only failover the
processInstance belongs to the current master.
- *
- * @param workerHost worker host
- */
- private void failoverWorker(String workerHost) {
- if (StringUtils.isEmpty(workerHost)) {
- return;
- }
-
- long startTime = System.currentTimeMillis();
- // we query the task instance from cache, so that we can directly
update the cache
- final List<TaskInstance> needFailoverTaskInstanceList =
cacheManager.getAll()
- .stream()
- .flatMap(workflowExecuteRunnable ->
workflowExecuteRunnable.getAllTaskInstances().stream())
- .filter(taskInstance ->
- workerHost.equals(taskInstance.getHost()) &&
ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState()))
- .collect(Collectors.toList());
- final Map<Integer, ProcessInstance> processInstanceCacheMap = new
HashMap<>();
- LOGGER.info("start worker[{}] failover, task list size:{}",
workerHost, needFailoverTaskInstanceList.size());
- final List<Server> workerServers =
registryClient.getServerList(NodeType.WORKER);
- for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
-
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(),
taskInstance.getId());
- try {
- ProcessInstance processInstance =
processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
- if (processInstance == null) {
- processInstance =
cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()).getProcessInstance();
- if (processInstance == null) {
- LOGGER.error("failover task instance error,
processInstance {} of taskInstance {} is null",
- taskInstance.getProcessInstanceId(),
taskInstance.getId());
- continue;
- }
- processInstanceCacheMap.put(processInstance.getId(),
processInstance);
- }
-
- // only failover the task owned myself if worker down.
- if (!StringUtils.equalsIgnoreCase(processInstance.getHost(),
localAddress)) {
- continue;
- }
-
- LOGGER.info("failover task instance id: {}, process instance
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
- failoverTaskInstance(processInstance, taskInstance,
workerServers);
- } finally {
- LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
- }
- }
- LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost,
System.currentTimeMillis() - startTime);
- }
-
- /**
- * failover task instance
- * <p>
- * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
- * 2. change task state from running to need failover.
- * 3. try to notify local master
- *
- * @param processInstance
- * @param taskInstance
- * @param servers if failover master, servers container master
servers and worker servers; if failover worker, servers contain worker servers.
- */
- private void failoverTaskInstance(@NonNull ProcessInstance
processInstance, TaskInstance taskInstance, List<Server> servers) {
- if (!checkTaskInstanceNeedFailover(servers, taskInstance)) {
- LOGGER.info("The taskInstance doesn't need to failover");
- return;
- }
- TaskMetrics.incTaskFailover();
- boolean isMasterTask =
TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
-
- taskInstance.setProcessInstance(processInstance);
-
- if (!isMasterTask) {
- LOGGER.info("The failover taskInstance is not master task");
- TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
- .buildTaskInstanceRelatedInfo(taskInstance)
- .buildProcessInstanceRelatedInfo(processInstance)
- .create();
-
- if (masterConfig.isKillYarnJobWhenTaskFailover()) {
- // only kill yarn job if exists , the local thread has exited
- ProcessUtils.killYarnJob(taskExecutionContext);
- }
- } else {
- LOGGER.info("The failover taskInstance is a master task");
- }
-
- taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
- processService.saveTaskInstance(taskInstance);
-
- StateEvent stateEvent = new StateEvent();
- stateEvent.setTaskInstanceId(taskInstance.getId());
- stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
- stateEvent.setProcessInstanceId(processInstance.getId());
- stateEvent.setExecutionStatus(taskInstance.getState());
- workflowExecuteThreadPool.submitStateEvent(stateEvent);
- }
-
- /**
- * Get need failover master servers.
- * <p>
- * Query the process instances from database, if the processInstance's
host doesn't exist in registry
- * or the host is the currentServer, then it will need to failover.
- *
- * @return need failover master servers
- */
- private List<String> getNeedFailoverMasterServers() {
- // failover myself && failover dead masters
- List<String> hosts =
processService.queryNeedFailoverProcessInstanceHost();
-
- Iterator<String> iterator = hosts.iterator();
- while (iterator.hasNext()) {
- String host = iterator.next();
- if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
- if (!localAddress.equals(host)) {
- iterator.remove();
- }
- }
- }
- return hosts;
- }
-
- /**
- * task needs failover if task start before server starts
- *
- * @param servers servers, can container master servers or worker servers
- * @param taskInstance task instance
- * @return true if task instance need fail over
- */
- private boolean checkTaskInstanceNeedFailover(List<Server> servers,
TaskInstance taskInstance) {
-
- boolean taskNeedFailover = true;
-
- if (taskInstance == null) {
- LOGGER.error("Master failover task instance error, taskInstance is
null");
- return false;
- }
-
- if (Constants.NULL.equals(taskInstance.getHost())) {
- return false;
- }
-
- if (taskInstance.getState() != null &&
taskInstance.getState().typeIsFinished()) {
- return false;
- }
-
- //now no host will execute this task instance,so no need to failover
the task
- if (taskInstance.getHost() == null) {
- return false;
- }
-
- //if task start after server starts, there is no need to failover the
task.
- if (checkTaskAfterServerStart(servers, taskInstance)) {
- taskNeedFailover = false;
- }
-
- return taskNeedFailover;
- }
-
- /**
- * check task start after the worker server starts.
- *
- * @param servers servers, can contain master servers or worker servers
- * @param taskInstance task instance
- * @return true if task instance start time after server start date
- */
- private boolean checkTaskAfterServerStart(List<Server> servers,
TaskInstance taskInstance) {
- if (StringUtils.isEmpty(taskInstance.getHost())) {
- return false;
- }
- Date serverStartDate = getServerStartupTime(servers,
taskInstance.getHost());
- if (serverStartDate != null) {
- if (taskInstance.getStartTime() == null) {
- return taskInstance.getSubmitTime().after(serverStartDate);
- } else {
- return taskInstance.getStartTime().after(serverStartDate);
- }
- }
- return false;
- }
-
- /**
- * get failover lock path
- *
- * @param nodeType zookeeper node type
- * @return fail over lock path
- */
- private String getFailoverLockPath(NodeType nodeType, String host) {
- switch (nodeType) {
- case MASTER:
- return
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
- case WORKER:
- return
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
- default:
- return "";
- }
- }
-
- /**
- * get server startup time
- */
- private Date getServerStartupTime(NodeType nodeType, String host) {
- if (StringUtils.isEmpty(host)) {
- return null;
- }
- List<Server> servers = registryClient.getServerList(nodeType);
- return getServerStartupTime(servers, host);
- }
-
- /**
- * get server startup time
- */
- private Date getServerStartupTime(List<Server> servers, String host) {
- if (CollectionUtils.isEmpty(servers)) {
- return null;
- }
- Date serverStartupTime = null;
- for (Server server : servers) {
- if (host.equals(server.getHost() + Constants.COLON +
server.getPort())) {
- serverStartupTime = server.getCreateTime();
- break;
- }
- }
- return serverStartupTime;
- }
-
- public String getLocalAddress() {
- return localAddress;
- }
-
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
new file mode 100644
index 0000000000..c7e5b4ea13
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.service;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
+import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import io.micrometer.core.annotation.Counted;
+import io.micrometer.core.annotation.Timed;
+import lombok.NonNull;
+
+@Service
+public class MasterFailoverService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MasterFailoverService.class);
+ private final RegistryClient registryClient;
+ private final MasterConfig masterConfig;
+ private final ProcessService processService;
+ private final String localAddress;
+
+ public MasterFailoverService(@NonNull RegistryClient registryClient,
+ @NonNull MasterConfig masterConfig,
+ @NonNull ProcessService processService) {
+ this.registryClient = registryClient;
+ this.masterConfig = masterConfig;
+ this.processService = processService;
+ this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
+
+ }
+
+ /**
+ * check master failover
+ */
+ @Counted(value = "ds.master.scheduler.failover.check.count")
+ @Timed(value = "ds.master.scheduler.failover.check.time", percentiles =
{0.5, 0.75, 0.95, 0.99}, histogram = true)
+ public void checkMasterFailover() {
+ List<String> needFailoverMasterHosts =
processService.queryNeedFailoverProcessInstanceHost()
+ .stream()
+ // failover myself || dead server
+ .filter(host -> localAddress.equals(host) ||
!registryClient.checkNodeExists(host, NodeType.MASTER))
+ .distinct()
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(needFailoverMasterHosts)) {
+ return;
+ }
+ LOGGER.info("Master failover service {} begin to failover hosts:{}",
localAddress, needFailoverMasterHosts);
+
+ for (String needFailoverMasterHost : needFailoverMasterHosts) {
+ failoverMaster(needFailoverMasterHost);
+ }
+ }
+
+ public void failoverMaster(String masterHost) {
+ String failoverPath =
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + masterHost;
+ try {
+ registryClient.getLock(failoverPath);
+ doFailoverMaster(masterHost);
+ } catch (Exception e) {
+ LOGGER.error("Master server failover failed, host:{}", masterHost,
e);
+ } finally {
+ registryClient.releaseLock(failoverPath);
+ }
+ }
+
+ /**
+ * Failover master, will failover process instance and associated task
instance.
+ * <p>When the process instance belongs to the given masterHost and the
restartTime is before the current server start up time,
+ * then the process instance will be failovered.
+ *
+ * @param masterHost master host
+ */
+ private void doFailoverMaster(@NonNull String masterHost) {
+ LOGGER.info("Master[{}] failover starting, need to failover process",
masterHost);
+ StopWatch failoverTimeCost = StopWatch.createStarted();
+
+ Optional<Date> masterStartupTimeOptional =
+
getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost);
+ List<ProcessInstance> needFailoverProcessInstanceList =
+ processService.queryNeedFailoverProcessInstances(masterHost);
+
+ LOGGER.info(
+ "Master[{}] failover there are {} workflowInstance may need to
failover, will do a deep check, workflowInstanceIds: {}",
+ masterHost,
+ needFailoverProcessInstanceList.size(),
+
needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
+
+ for (ProcessInstance processInstance :
needFailoverProcessInstanceList) {
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+ LOGGER.info("WorkflowInstance failover starting");
+ if
(!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance))
{
+ LOGGER.info("WorkflowInstance doesn't need to failover");
+ continue;
+ }
+ int processInstanceId = processInstance.getId();
+ List<TaskInstance> taskInstanceList =
processService.findValidTaskListByProcessId(processInstanceId);
+ for (TaskInstance taskInstance : taskInstanceList) {
+ try {
+ LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId());
+ LOGGER.info("TaskInstance failover starting");
+ if (!checkTaskInstanceNeedFailover(taskInstance)) {
+ LOGGER.info("The taskInstance doesn't need to
failover");
+ continue;
+ }
+ failoverTaskInstance(processInstance, taskInstance);
+ LOGGER.info("TaskInstance failover finished");
+ } finally {
+ LoggerUtils.removeTaskInstanceIdMDC();
+ }
+ }
+
+ ProcessInstanceMetrics.incProcessInstanceFailover();
+ //updateProcessInstance host is null to mark this
processInstance has been failover
+ // and insert a failover command
+ processInstance.setHost(Constants.NULL);
+
processService.processNeedFailoverProcessInstances(processInstance);
+ LOGGER.info("WorkflowInstance failover finished");
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
+ }
+ }
+
+ failoverTimeCost.stop();
+ LOGGER.info("Master[{}] failover finished, useTime:{}ms",
+ masterHost,
+ failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
+ }
+
+ private Optional<Date> getServerStartupTime(List<Server> servers, String
host) {
+ if (CollectionUtils.isEmpty(servers)) {
+ return Optional.empty();
+ }
+ Date serverStartupTime = null;
+ for (Server server : servers) {
+ if (host.equals(server.getHost() + Constants.COLON +
server.getPort())) {
+ serverStartupTime = server.getCreateTime();
+ break;
+ }
+ }
+ return Optional.ofNullable(serverStartupTime);
+ }
+
+ /**
+ * failover task instance
+ * <p>
+ * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
+ * 2. change task state from running to need failover.
+ * 3. try to notify local master
+ *
+ * @param processInstance
+ * @param taskInstance
+ */
+ private void failoverTaskInstance(@NonNull ProcessInstance
processInstance, @NonNull TaskInstance taskInstance) {
+ TaskMetrics.incTaskFailover();
+ boolean isMasterTask =
TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
+
+ taskInstance.setProcessInstance(processInstance);
+
+ if (!isMasterTask) {
+ LOGGER.info("The failover taskInstance is not master task");
+ TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .create();
+
+ if (masterConfig.isKillYarnJobWhenTaskFailover()) {
+ // only kill yarn job if exists , the local thread has exited
+ LOGGER.info("TaskInstance failover begin kill the task related
yarn job");
+ ProcessUtils.killYarnJob(taskExecutionContext);
+ }
+ } else {
+ LOGGER.info("The failover taskInstance is a master task");
+ }
+
+ taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+ taskInstance.setFlag(Flag.NO);
+ processService.saveTaskInstance(taskInstance);
+ }
+
+ private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance
taskInstance) {
+ if (taskInstance.getState() != null &&
taskInstance.getState().typeIsFinished()) {
+ // The task is already finished, so we don't need to failover this
task instance
+ return false;
+ }
+ return true;
+ }
+
+ private boolean checkProcessInstanceNeedFailover(Optional<Date>
beFailoveredMasterStartupTimeOptional,
+ @NonNull ProcessInstance
processInstance) {
+ // The process has already been failover, since when we do master
failover we will hold a lock, so we can guarantee
+ // the host will not be set concurrent.
+ if (Constants.NULL.equals(processInstance.getHost())) {
+ return false;
+ }
+ if (!beFailoveredMasterStartupTimeOptional.isPresent()) {
+ // the master is not active, we can failover all it's
processInstance
+ return true;
+ }
+ Date beFailoveredMasterStartupTime =
beFailoveredMasterStartupTimeOptional.get();
+
+ if
(processInstance.getStartTime().after(beFailoveredMasterStartupTime)) {
+ // The processInstance is newly created
+ return false;
+ }
+
+ return true;
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
new file mode 100644
index 0000000000..ec126a3ec3
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.service;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
+import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
+import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import lombok.NonNull;
+
+@Service
+public class WorkerFailoverService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(WorkerFailoverService.class);
+
+ private final RegistryClient registryClient;
+ private final MasterConfig masterConfig;
+ private final ProcessService processService;
+ private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
+ private final ProcessInstanceExecCacheManager cacheManager;
+ private final String localAddress;
+
+ public WorkerFailoverService(@NonNull RegistryClient registryClient,
+ @NonNull MasterConfig masterConfig,
+ @NonNull ProcessService processService,
+ @NonNull WorkflowExecuteThreadPool
workflowExecuteThreadPool,
+ @NonNull ProcessInstanceExecCacheManager
cacheManager) {
+ this.registryClient = registryClient;
+ this.masterConfig = masterConfig;
+ this.processService = processService;
+ this.workflowExecuteThreadPool = workflowExecuteThreadPool;
+ this.cacheManager = cacheManager;
+ this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
+ }
+
+ /**
+ * Do the worker failover. Will find the
SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP
tasks belong the given worker,
+ * and failover these tasks.
+ * <p>
+ * Note: When we do worker failover, the master will only failover the
processInstance belongs to the current master.
+ *
+ * @param workerHost worker host
+ */
+ public void failoverWorker(@NonNull String workerHost) {
+ LOGGER.info("Worker[{}] failover starting", workerHost);
+ final StopWatch failoverTimeCost = StopWatch.createStarted();
+
+ // we query the task instance from cache, so that we can directly
update the cache
+ final Optional<Date> needFailoverWorkerStartTime =
+
getServerStartupTime(registryClient.getServerList(NodeType.WORKER), workerHost);
+
+ final List<TaskInstance> needFailoverTaskInstanceList =
getNeedFailoverTaskInstance(workerHost);
+ if (CollectionUtils.isEmpty(needFailoverTaskInstanceList)) {
+ LOGGER.info("Worker[{}] failover finished there are no
taskInstance need to failover", workerHost);
+ return;
+ }
+ LOGGER.info(
+ "Worker[{}] failover there are {} taskInstance may need to
failover, will do a deep check, taskInstanceIds: {}",
+ workerHost,
+ needFailoverTaskInstanceList.size(),
+
needFailoverTaskInstanceList.stream().map(TaskInstance::getId).collect(Collectors.toList()));
+ final Map<Integer, ProcessInstance> processInstanceCacheMap = new
HashMap<>();
+ for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
+
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(),
taskInstance.getId());
+ try {
+ ProcessInstance processInstance =
+
processInstanceCacheMap.computeIfAbsent(taskInstance.getProcessInstanceId(), k
-> {
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+
cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId());
+ if (workflowExecuteRunnable == null) {
+ return null;
+ }
+ return workflowExecuteRunnable.getProcessInstance();
+ });
+ if
(!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance,
taskInstance)) {
+ LOGGER.info("Worker[{}] the current taskInstance doesn't
need to failover", workerHost);
+ continue;
+ }
+ LOGGER.info(
+ "Worker[{}] failover: begin to failover taskInstance, will
set the status to NEED_FAULT_TOLERANCE",
+ workerHost);
+ failoverTaskInstance(processInstance, taskInstance);
+ LOGGER.info("Worker[{}] failover: Finish failover
taskInstance", workerHost);
+ } catch (Exception ex) {
+ LOGGER.info("Worker[{}] failover taskInstance occur
exception", workerHost, ex);
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+ }
+ }
+ failoverTimeCost.stop();
+ LOGGER.info("Worker[{}] failover finished, useTime:{}ms",
+ workerHost,
+ failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * failover task instance
+ * <p>
+ * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
+ * 2. change task state from running to need failover.
+ * 3. try to notify local master
+ *
+ * @param processInstance
+ * @param taskInstance
+ */
+ private void failoverTaskInstance(@NonNull ProcessInstance
processInstance, @NonNull TaskInstance taskInstance) {
+
+ TaskMetrics.incTaskFailover();
+ boolean isMasterTask =
TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
+
+ taskInstance.setProcessInstance(processInstance);
+
+ if (!isMasterTask) {
+ LOGGER.info("The failover taskInstance is not master task");
+ TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .create();
+
+ if (masterConfig.isKillYarnJobWhenTaskFailover()) {
+ // only kill yarn job if exists , the local thread has exited
+ LOGGER.info("TaskInstance failover begin kill the task related
yarn job");
+ ProcessUtils.killYarnJob(taskExecutionContext);
+ }
+ } else {
+ LOGGER.info("The failover taskInstance is a master task");
+ }
+
+ taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+ taskInstance.setFlag(Flag.NO);
+ processService.saveTaskInstance(taskInstance);
+
+ StateEvent stateEvent = new StateEvent();
+ stateEvent.setTaskInstanceId(taskInstance.getId());
+ stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+ stateEvent.setProcessInstanceId(processInstance.getId());
+ stateEvent.setExecutionStatus(taskInstance.getState());
+ workflowExecuteThreadPool.submitStateEvent(stateEvent);
+ }
+
+ /**
+ * task needs failover if task start before server starts
+ *
+ * @return true if task instance need fail over
+ */
+ private boolean checkTaskInstanceNeedFailover(Optional<Date>
needFailoverWorkerStartTime,
+ @Nullable ProcessInstance
processInstance,
+ TaskInstance taskInstance) {
+ if (processInstance == null) {
+ // This case should be happened.
+ LOGGER.error(
+ "Failover task instance error, cannot find the related
processInstance form memory, this case shouldn't happened");
+ return false;
+ }
+ if (taskInstance == null) {
+ // This case should be happened.
+ LOGGER.error("Master failover task instance error, taskInstance is
null, this case shouldn't happened");
+ return false;
+ }
+ // only failover the task owned myself if worker down.
+ if (!StringUtils.equalsIgnoreCase(processInstance.getHost(),
localAddress)) {
+ LOGGER.error(
+ "Master failover task instance error, the taskInstance's
processInstance's host: {} is not the current master: {}",
+ processInstance.getHost(),
+ localAddress);
+ return false;
+ }
+ if (taskInstance.getState() != null &&
taskInstance.getState().typeIsFinished()) {
+ // The taskInstance is already finished, doesn't need to failover
+ LOGGER.info("The task is already finished, doesn't need to
failover");
+ return false;
+ }
+ if (!needFailoverWorkerStartTime.isPresent()) {
+ // The worker is still down
+ return true;
+ }
+ // The worker is active, may already send some new task to it
+ if (taskInstance.getSubmitTime() != null &&
taskInstance.getSubmitTime()
+ .after(needFailoverWorkerStartTime.get())) {
+ LOGGER.info(
+ "The taskInstance's submitTime: {} is after the need failover
worker's start time: {}, the taskInstance is newly submit, it doesn't need to
failover",
+ taskInstance.getSubmitTime(),
+ needFailoverWorkerStartTime.get());
+ return false;
+ }
+
+ return true;
+ }
+
+ private List<TaskInstance> getNeedFailoverTaskInstance(@NonNull String
failoverWorkerHost) {
+ // we query the task instance from cache, so that we can directly
update the cache
+ return cacheManager.getAll()
+ .stream()
+ .flatMap(workflowExecuteRunnable ->
workflowExecuteRunnable.getAllTaskInstances().stream())
+ // If the worker is in dispatching and the host is not set
+ .filter(taskInstance ->
failoverWorkerHost.equals(taskInstance.getHost())
+ &&
ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState()))
+ .collect(Collectors.toList());
+ }
+
+ private Optional<Date> getServerStartupTime(List<Server> servers, String
host) {
+ if (CollectionUtils.isEmpty(servers)) {
+ return Optional.empty();
+ }
+ Date serverStartupTime = null;
+ for (Server server : servers) {
+ if (host.equals(server.getHost() + Constants.COLON +
server.getPort())) {
+ serverStartupTime = server.getCreateTime();
+ break;
+ }
+ }
+ return Optional.ofNullable(serverStartupTime);
+ }
+}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
index b2c7d63cd7..55dd236f71 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
@@ -52,7 +52,7 @@ public class ExecutionContextTestUtils {
TaskExecuteRequestCommand requestCommand = new
TaskExecuteRequestCommand(context);
Command command = requestCommand.convert2Command();
- ExecutionContext executionContext = new ExecutionContext(command,
ExecutorType.WORKER);
+ ExecutionContext executionContext = new ExecutionContext(command,
ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(port)));
return executionContext;
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
index fdd79552b1..1ee890faf2 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
@@ -70,7 +70,7 @@ public class NettyExecutorManagerTest {
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
- ExecutionContext executionContext = new
ExecutionContext(toCommand(context), ExecutorType.WORKER);
+ ExecutionContext executionContext = new
ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort())));
Boolean execute = nettyExecutorManager.execute(executionContext);
Assert.assertTrue(execute);
@@ -89,7 +89,7 @@ public class NettyExecutorManagerTest {
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
- ExecutionContext executionContext = new
ExecutionContext(toCommand(context), ExecutorType.WORKER);
+ ExecutionContext executionContext = new
ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(4444)));
nettyExecutorManager.execute(executionContext);
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index 98bf514730..44e5a382f6 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -26,13 +26,14 @@ import static org.mockito.Mockito.doNothing;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -98,9 +99,17 @@ public class FailoverServiceTest {
springApplicationContext.setApplicationContext(applicationContext);
given(masterConfig.getListenPort()).willReturn(masterPort);
- failoverService = new FailoverService(registryClient, masterConfig,
processService, workflowExecuteThreadPool, cacheManager);
+ MasterFailoverService masterFailoverService =
+ new MasterFailoverService(registryClient, masterConfig,
processService);
+ WorkerFailoverService workerFailoverService = new
WorkerFailoverService(registryClient,
+ masterConfig,
+ processService,
+ workflowExecuteThreadPool,
+ cacheManager);
+
+ failoverService = new FailoverService(masterFailoverService,
workerFailoverService);
- testMasterHost = failoverService.getLocalAddress();
+ testMasterHost = NetUtils.getAddr(masterConfig.getListenPort());
String ip = testMasterHost.split(":")[0];
int port = Integer.valueOf(testMasterHost.split(":")[1]);
Assert.assertEquals(masterPort, port);
@@ -118,6 +127,7 @@ public class FailoverServiceTest {
processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setHost(testMasterHost);
+ processInstance.setStartTime(new Date());
processInstance.setRestartTime(new Date());
processInstance.setHistoryCmd("xxx");
processInstance.setCommandType(CommandType.STOP);
@@ -154,16 +164,10 @@ public class FailoverServiceTest {
given(registryClient.getServerList(NodeType.WORKER)).willReturn(new
ArrayList<>(Arrays.asList(workerServer)));
given(registryClient.getServerList(NodeType.MASTER)).willReturn(new
ArrayList<>(Arrays.asList(masterServer)));
- ReflectionTestUtils.setField(failoverService, "registryClient",
registryClient);
doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class));
}
- @Test
- public void checkMasterFailoverTest() {
- failoverService.checkMasterFailover();
- }
-
@Test
public void failoverMasterTest() {
processInstance.setHost(Constants.NULL);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index f6eab1befa..fd33eb115a 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -22,6 +22,7 @@ import
org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
@@ -62,7 +63,8 @@ import
org.springframework.transaction.annotation.Transactional;
public interface ProcessService {
@Transactional
- ProcessInstance handleCommand(String host, Command command) throws
CronParseException;
+ ProcessInstance handleCommand(String host, Command command)
+ throws CronParseException, CodeGenerateUtils.CodeGenerateException;
void moveToErrorCommand(Command command, String message);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index f9d0782f95..5024a944f1 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -128,7 +128,6 @@ import
org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
@@ -276,6 +275,9 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private CuringParamsService curingGlobalParamsService;
+ @Autowired
+ private ProcessService processService;
+
/**
* handle Command (construct ProcessInstance from Command) , wrapped in
transaction
*
@@ -285,7 +287,8 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
@Transactional
- public ProcessInstance handleCommand(String host, Command command) throws
CronParseException {
+ public ProcessInstance handleCommand(String host, Command command) throws
CronParseException,
+ CodeGenerateException {
ProcessInstance processInstance = constructProcessInstance(command,
host);
// cannot construct process instance, return null
if (processInstance == null) {
@@ -762,7 +765,7 @@ public class ProcessServiceImpl implements ProcessService {
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition
processDefinition,
Command command,
- Map<String, String>
cmdParam) {
+ Map<String, String>
cmdParam) throws CodeGenerateException {
ProcessInstance processInstance = new
ProcessInstance(processDefinition);
processInstance.setProcessDefinitionCode(processDefinition.getCode());
processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
@@ -922,7 +925,8 @@ public class ProcessServiceImpl implements ProcessService {
* @param host host
* @return process instance
*/
- protected ProcessInstance constructProcessInstance(Command command, String
host) throws CronParseException {
+ protected ProcessInstance constructProcessInstance(Command command, String
host)
+ throws CronParseException, CodeGenerateException {
ProcessInstance processInstance;
ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType();
@@ -1028,6 +1032,7 @@ public class ProcessServiceImpl implements ProcessService
{
case RECOVER_TOLERANCE_FAULT_PROCESS:
// recover tolerance fault process
processInstance.setRecovery(Flag.YES);
+ processInstance.setRunTimes(runTime + 1);
runStatus = processInstance.getState();
break;
case COMPLEMENT_DATA:
@@ -1273,11 +1278,15 @@ public class ProcessServiceImpl implements
ProcessService {
while (retryTimes <= commitRetryTimes) {
try {
// submit task to db
- task =
SpringApplicationContext.getBean(ProcessService.class).submitTask(processInstance,
taskInstance);
+ // Only want to use transaction here
+ task = processService.submitTask(processInstance,
taskInstance);
if (task != null && task.getId() != 0) {
break;
}
- logger.error("task commit to db failed , taskId {} has already
retry {} times, please check the database", taskInstance.getId(), retryTimes);
+ logger.error(
+ "task commit to db failed , taskId {} has already retry {}
times, please check the database",
+ taskInstance.getId(),
+ retryTimes);
Thread.sleep(commitInterval);
} catch (Exception e) {
logger.error("task commit to db failed", e);
@@ -1299,13 +1308,17 @@ public class ProcessServiceImpl implements
ProcessService {
@Override
@Transactional
public TaskInstance submitTask(ProcessInstance processInstance,
TaskInstance taskInstance) {
- logger.info("start submit task : {}, processInstance id:{}, state: {}",
- taskInstance.getName(), taskInstance.getProcessInstanceId(),
processInstance.getState());
+ logger.info("Start save taskInstance to database : {}, processInstance
id:{}, state: {}",
+ taskInstance.getName(),
+ taskInstance.getProcessInstanceId(),
+ processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance,
processInstance);
if (task == null) {
- logger.error("end submit task to db error, task name:{}, process
id:{} state: {} ",
- taskInstance.getName(), taskInstance.getProcessInstance(),
processInstance.getState());
+ logger.error("Save taskInstance to db error, task name:{}, process
id:{} state: {} ",
+ taskInstance.getName(),
+ taskInstance.getProcessInstance(),
+ processInstance.getState());
return null;
}
@@ -1313,8 +1326,13 @@ public class ProcessServiceImpl implements
ProcessService {
createSubWorkProcess(processInstance, task);
}
- logger.info("end submit task to db successfully:{} {} state:{}
complete, instance id:{} state: {} ",
- taskInstance.getId(), taskInstance.getName(), task.getState(),
processInstance.getId(), processInstance.getState());
+ logger.info(
+ "End save taskInstance to db successfully:{}, taskInstanceName:
{}, taskInstance state:{}, processInstanceId:{}, processInstanceState: {}",
+ taskInstance.getId(),
+ taskInstance.getName(),
+ task.getState(),
+ processInstance.getId(),
+ processInstance.getState());
return task;
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
index 0aeec4609d..823ec81d3c 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
@@ -211,16 +211,45 @@ public class TaskPriority implements
Comparable<TaskPriority> {
}
TaskPriority that = (TaskPriority) o;
return processInstancePriority == that.processInstancePriority
- && processInstanceId == that.processInstanceId
- && taskInstancePriority == that.taskInstancePriority
- && taskId == that.taskId
- && taskGroupPriority == that.taskGroupPriority
- && Objects.equals(groupName, that.groupName);
+ && processInstanceId == that.processInstanceId
+ && taskInstancePriority == that.taskInstancePriority
+ && taskId == that.taskId
+ && taskGroupPriority == that.taskGroupPriority
+ && Objects.equals(groupName, that.groupName);
}
@Override
public int hashCode() {
- return Objects.hash(processInstancePriority, processInstanceId,
taskInstancePriority, taskId, taskGroupPriority, groupName);
+ return Objects.hash(processInstancePriority,
+ processInstanceId,
+ taskInstancePriority,
+ taskId,
+ taskGroupPriority,
+ groupName);
}
+ @Override
+ public String toString() {
+ return "TaskPriority{"
+ + "processInstancePriority="
+ + processInstancePriority
+ + ", processInstanceId="
+ + processInstanceId
+ + ", taskInstancePriority="
+ + taskInstancePriority
+ + ", taskId="
+ + taskId
+ + ", taskExecutionContext="
+ + taskExecutionContext
+ + ", groupName='"
+ + groupName
+ + '\''
+ + ", context="
+ + context
+ + ", checkpoint="
+ + checkpoint
+ + ", taskGroupPriority="
+ + taskGroupPriority
+ + '}';
+ }
}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 26b9e66045..f655e3b2af 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -30,6 +30,7 @@ import
org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
@@ -285,7 +286,7 @@ public class ProcessServiceTest {
}
@Test
- public void testHandleCommand() throws CronParseException {
+ public void testHandleCommand() throws CronParseException,
CodeGenerateUtils.CodeGenerateException {
//cannot construct process instance, return null;
String host = "127.0.0.1";
@@ -462,7 +463,7 @@ public class ProcessServiceTest {
}
@Test(expected = ServiceException.class)
- public void testDeleteNotExistCommand() throws CronParseException {
+ public void testDeleteNotExistCommand() throws CronParseException,
CodeGenerateUtils.CodeGenerateException {
String host = "127.0.0.1";
int definitionVersion = 1;
long definitionCode = 123;