This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch 2.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0-prepare by this push:
new 6ffd019 [cherry-pick][6471-6502]cherry-pick 6471 && 6502 to
2.0-prepare (#6508)
6ffd019 is described below
commit 6ffd019683c84b61378a56895f2903ec460311c4
Author: OS <[email protected]>
AuthorDate: Tue Oct 12 17:32:04 2021 +0800
[cherry-pick][6471-6502]cherry-pick 6471 && 6502 to 2.0-prepare (#6508)
---
.../api/service/impl/ExecutorServiceImpl.java | 16 ++-
.../dolphinscheduler/dao/entity/Command.java | 41 ++++++-
.../dolphinscheduler/dao/mapper/CommandMapper.java | 7 --
.../dolphinscheduler/dao/mapper/CommandMapper.xml | 10 --
.../dao/mapper/CommandMapperTest.java | 4 +-
.../server/master/config/MasterConfig.java | 12 ++
.../master/runner/MasterSchedulerService.java | 28 ++++-
.../src/main/resources/master.properties | 3 +
.../service/process/ProcessService.java | 133 +++++++++------------
.../service/quartz/ProcessScheduleJob.java | 1 +
.../service/process/ProcessServiceTest.java | 48 +++++---
sql/dolphinscheduler_h2.sql | 4 +
sql/dolphinscheduler_mysql.sql | 34 +++---
sql/dolphinscheduler_postgre.sql | 63 +++++-----
14 files changed, 235 insertions(+), 169 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 5042a03..fe5f910 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -283,13 +283,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
switch (executeType) {
case REPEAT_RUNNING:
- result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(), CommandType.REPEAT_RUNNING, startParams);
+ result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(), processDefinition.getVersion(),
CommandType.REPEAT_RUNNING, startParams);
break;
case RECOVER_SUSPENDED_PROCESS:
- result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(), CommandType.RECOVER_SUSPENDED_PROCESS,
startParams);
+ result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(), processDefinition.getVersion(),
CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
break;
case START_FAILURE_TASK_PROCESS:
- result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(), CommandType.START_FAILURE_TASK_PROCESS,
startParams);
+ result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(), processDefinition.getVersion(),
CommandType.START_FAILURE_TASK_PROCESS, startParams);
break;
case STOP:
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
@@ -409,10 +409,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
* @param loginUser login user
* @param instanceId instance id
* @param processDefinitionCode process definition code
+ * @param version
* @param commandType command type
* @return insert result code
*/
- private Map<String, Object> insertCommand(User loginUser, Integer
instanceId, long processDefinitionCode, CommandType commandType, String
startParams) {
+ private Map<String, Object> insertCommand(User loginUser, Integer
instanceId, long processDefinitionCode, int processVersion, CommandType
commandType, String startParams) {
Map<String, Object> result = new HashMap<>();
//To add startParams only when repeat running is needed
@@ -427,6 +428,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
command.setProcessDefinitionCode(processDefinitionCode);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setExecutorId(loginUser.getId());
+ command.setProcessDefinitionVersion(processVersion);
+ command.setProcessInstanceId(instanceId);
if (!processService.verifyIsNeedCreateCommand(command)) {
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND,
processDefinitionCode);
@@ -545,6 +548,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
command.setWorkerGroup(workerGroup);
command.setEnvironmentCode(environmentCode);
command.setDryRun(dryRun);
+ ProcessDefinition processDefinition =
processService.findProcessDefinitionByCode(processDefineCode);
+ if (processDefinition != null) {
+
command.setProcessDefinitionVersion(processDefinition.getVersion());
+ }
+ command.setProcessInstanceId(0);
Date start = null;
Date end = null;
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
index b1ed217..ae2ff62 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
@@ -132,6 +132,12 @@ public class Command {
@TableField("dry_run")
private int dryRun;
+ @TableField("process_instance_id")
+ private int processInstanceId;
+
+ @TableField("process_definition_version")
+ private int processDefinitionVersion;
+
public Command() {
this.taskDependType = TaskDependType.TASK_POST;
this.failureStrategy = FailureStrategy.CONTINUE;
@@ -152,7 +158,10 @@ public class Command {
String workerGroup,
Long environmentCode,
Priority processInstancePriority,
- int dryRun) {
+ int dryRun,
+ int processInstanceId,
+ int processDefinitionVersion
+ ) {
this.commandType = commandType;
this.executorId = executorId;
this.processDefinitionCode = processDefinitionCode;
@@ -168,6 +177,8 @@ public class Command {
this.environmentCode = environmentCode;
this.processInstancePriority = processInstancePriority;
this.dryRun = dryRun;
+ this.processInstanceId = processInstanceId;
+ this.processDefinitionVersion = processDefinitionVersion;
}
public TaskDependType getTaskDependType() {
@@ -298,6 +309,22 @@ public class Command {
this.dryRun = dryRun;
}
+ public int getProcessInstanceId() {
+ return processInstanceId;
+ }
+
+ public void setProcessInstanceId(int processInstanceId) {
+ this.processInstanceId = processInstanceId;
+ }
+
+ public int getProcessDefinitionVersion() {
+ return processDefinitionVersion;
+ }
+
+ public void setProcessDefinitionVersion(int processDefinitionVersion) {
+ this.processDefinitionVersion = processDefinitionVersion;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -353,8 +380,13 @@ public class Command {
if (processInstancePriority != command.processInstancePriority) {
return false;
}
+ if (processInstanceId != command.processInstanceId) {
+ return false;
+ }
+ if (processDefinitionVersion != command.getProcessDefinitionVersion())
{
+ return false;
+ }
return !(updateTime != null ? !updateTime.equals(command.updateTime) :
command.updateTime != null);
-
}
@Override
@@ -375,6 +407,8 @@ public class Command {
result = 31 * result + (workerGroup != null ? workerGroup.hashCode() :
0);
result = 31 * result + (environmentCode != null ?
environmentCode.hashCode() : 0);
result = 31 * result + dryRun;
+ result = 31 * result + processInstanceId;
+ result = 31 * result + processDefinitionVersion;
return result;
}
@@ -397,7 +431,10 @@ public class Command {
+ ", workerGroup='" + workerGroup + '\''
+ ", environmentCode='" + environmentCode + '\''
+ ", dryRun='" + dryRun + '\''
+ + ", processInstanceId='" + processInstanceId + '\''
+ + ", processDefinitionVersion='" + processDefinitionVersion +
'\''
+ '}';
}
+
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
index 2bbfb4b..2291384 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
@@ -31,13 +31,6 @@ import java.util.List;
*/
public interface CommandMapper extends BaseMapper<Command> {
-
- /**
- * get one command
- * @return command
- */
- Command getOneToRun();
-
/**
* count command state
* @param userId userId
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index 5b2d6b4..b0ea477 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -18,16 +18,6 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.CommandMapper">
- <select id="getOneToRun"
resultType="org.apache.dolphinscheduler.dao.entity.Command">
- select cmd.id, cmd.command_type, cmd.process_definition_code,
cmd.command_param, cmd.task_depend_type, cmd.failure_strategy,
- cmd.warning_type, cmd.warning_group_id, cmd.schedule_time,
cmd.start_time, cmd.executor_id, cmd.update_time,
- cmd.process_instance_priority, cmd.worker_group, cmd.environment_code,
cmd.dry_run
- from t_ds_command cmd
- join t_ds_process_definition definition on cmd.process_definition_code
= definition.code
- where definition.release_state = 1 AND definition.flag = 1
- order by cmd.update_time asc
- limit 1
- </select>
<select id="countCommandState"
resultType="org.apache.dolphinscheduler.dao.entity.CommandCount">
select cmd.command_type as command_type, count(1) as count
from t_ds_command cmd, t_ds_process_definition process
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
index dc9dbee..0266ac0 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
@@ -147,7 +147,7 @@ public class CommandMapperTest {
createCommand(CommandType.START_PROCESS, processDefinition.getCode());
- Command actualCommand = commandMapper.getOneToRun();
+ List<Command> actualCommand = commandMapper.queryCommandPage(1,0);
assertNotNull(actualCommand);
}
@@ -259,6 +259,8 @@ public class CommandMapperTest {
command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+ command.setProcessInstanceId(0);
+ command.setProcessDefinitionVersion(0);
commandMapper.insert(command);
return command;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 6c2e2a1..124eceb 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -60,6 +60,9 @@ public class MasterConfig {
@Value("${master.reserved.memory:0.3}")
private double masterReservedMemory;
+ @Value("${master.cache.process.definition:true}")
+ private boolean masterCacheProcessDefinition;
+
public int getListenPort() {
return listenPort;
}
@@ -150,4 +153,13 @@ public class MasterConfig {
public void setStateWheelInterval(int stateWheelInterval) {
this.stateWheelInterval = stateWheelInterval;
}
+
+ public boolean getMasterCacheProcessDefinition() {
+ return masterCacheProcessDefinition;
+ }
+
+ public void setMasterCacheProcessDefinition(boolean
masterCacheProcessDefinition) {
+ this.masterCacheProcessDefinition = masterCacheProcessDefinition;
+ }
+
}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index faa4eb0..803ba09 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
@@ -34,6 +35,7 @@ import
org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
@@ -92,11 +94,26 @@ public class MasterSchedulerService extends Thread {
*/
private ThreadPoolExecutor masterExecService;
-
+ /**
+ * process instance execution list
+ */
private ConcurrentHashMap<Integer, WorkflowExecuteThread>
processInstanceExecMaps;
+ /**
+ * process timeout check list
+ */
ConcurrentHashMap<Integer, ProcessInstance> processTimeoutCheckList = new
ConcurrentHashMap<>();
+
+ /**
+ * task time out checkout list
+ */
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new
ConcurrentHashMap<>();
+ /**
+ * key:code-version
+ * value: processDefinition
+ */
+ HashMap<String, ProcessDefinition> processDefinitionCacheMaps = new
HashMap<>();
+
private StateWheelExecuteThread stateWheelExecuteThread;
/**
@@ -112,7 +129,6 @@ public class MasterSchedulerService extends Thread {
taskTimeoutCheckList,
this.processInstanceExecMaps,
masterConfig.getStateWheelInterval() *
Constants.SLEEP_TIME_MILLIS);
-
}
@Override
@@ -165,7 +181,6 @@ public class MasterSchedulerService extends Thread {
*/
private void scheduleProcess() throws Exception {
- int activeCount = masterExecService.getActiveCount();
// make sure to scan and delete command table in one transaction
Command command = findOneCommand();
if (command != null) {
@@ -173,7 +188,12 @@ public class MasterSchedulerService extends Thread {
try {
ProcessInstance processInstance =
processService.handleCommand(logger,
getLocalAddress(),
- this.masterConfig.getMasterExecThreads() -
activeCount, command);
+ command,
+ processDefinitionCacheMaps);
+ if (!masterConfig.getMasterCacheProcessDefinition()
+ && processDefinitionCacheMaps.size() > 0) {
+ processDefinitionCacheMaps.clear();
+ }
if (processInstance != null) {
WorkflowExecuteThread workflowExecuteThread = new
WorkflowExecuteThread(
processInstance
diff --git a/dolphinscheduler-server/src/main/resources/master.properties
b/dolphinscheduler-server/src/main/resources/master.properties
index cc45622..fbe8f6e 100644
--- a/dolphinscheduler-server/src/main/resources/master.properties
+++ b/dolphinscheduler-server/src/main/resources/master.properties
@@ -39,6 +39,9 @@
# master commit task interval, the unit is millisecond
#master.task.commit.interval=1000
+# master cache process definition, default: true
+#master.cache.process.definition=true
+
# master max cpuload avg, only higher than the system cpu load average, master
server can schedule. default value -1: the number of cpu cores * 2
#master.max.cpuload.avg=-1
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 3c3b41d..a92302e 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -208,13 +208,12 @@ public class ProcessService {
*
* @param logger logger
* @param host host
- * @param validThreadNum validThreadNum
* @param command found command
+ * @param processDefinitionCacheMaps
* @return process instance
*/
- @Transactional(rollbackFor = Exception.class)
- public ProcessInstance handleCommand(Logger logger, String host, int
validThreadNum, Command command) {
- ProcessInstance processInstance = constructProcessInstance(command,
host);
+ public ProcessInstance handleCommand(Logger logger, String host, Command
command, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
+ ProcessInstance processInstance = constructProcessInstance(command,
host, processDefinitionCacheMaps);
// cannot construct process instance, return null
if (processInstance == null) {
logger.error("scan command, command parameter is error: {}",
command);
@@ -235,7 +234,6 @@ public class ProcessService {
* @param command command
* @param message message
*/
- @Transactional(rollbackFor = Exception.class)
public void moveToErrorCommand(Command command, String message) {
ErrorCommand errorCommand = new ErrorCommand(command, message);
this.errorCommandMapper.insert(errorCommand);
@@ -287,15 +285,6 @@ public class ProcessService {
}
/**
- * find one command from queue list
- *
- * @return command
- */
- public Command findOneCommand() {
- return commandMapper.getOneToRun();
- }
-
- /**
* get command page
*
* @param pageSize
@@ -547,7 +536,9 @@ public class ProcessService {
processInstance.getWorkerGroup(),
processInstance.getEnvironmentCode(),
processInstance.getProcessInstancePriority(),
- processInstance.getDryRun()
+ processInstance.getDryRun(),
+ processInstance.getId(),
+ processInstance.getProcessDefinitionVersion()
);
saveCommand(command);
return;
@@ -744,90 +735,74 @@ public class ProcessService {
/**
* construct process instance according to one command.
*
- * @param command command
- * @param host host
+ * @param command command
+ * @param host host
+ * @param processDefinitionCacheMaps
* @return process instance
*/
- private ProcessInstance constructProcessInstance(Command command, String
host) {
+ private ProcessInstance constructProcessInstance(Command command, String
host, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
ProcessInstance processInstance;
+ ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType();
- Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
-
- ProcessDefinition processDefinition =
getProcessDefinitionByCommand(command.getProcessDefinitionCode(), cmdParam);
+ String key = String.format("%d-%d",
command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
+ if (processDefinitionCacheMaps.containsKey(key)) {
+ processDefinition = processDefinitionCacheMaps.get(key);
+ } else {
+ processDefinition =
this.findProcessDefinition(command.getProcessDefinitionCode(),
command.getProcessDefinitionVersion());
+ if (processDefinition != null) {
+ processDefinitionCacheMaps.put(key, processDefinition);
+ }
+ }
if (processDefinition == null) {
logger.error("cannot find the work process define! define code :
{}", command.getProcessDefinitionCode());
return null;
}
-
+ Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
+ int processInstanceId = command.getProcessInstanceId();
+ if (processInstanceId == 0) {
+ processInstance = generateNewProcessInstance(processDefinition,
command, cmdParam);
+ } else {
+ processInstance =
this.findProcessInstanceDetailById(processInstanceId);
+ if (processInstance == null) {
+ return processInstance;
+ }
+ }
if (cmdParam != null) {
- int processInstanceId = 0;
- // recover from failure or pause tasks
- if
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
- String processId =
cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING);
- processInstanceId = Integer.parseInt(processId);
- if (processInstanceId == 0) {
- logger.error("command parameter is error, [
ProcessInstanceId ] is 0");
- return null;
- }
- } else if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
- // sub process map
- String pId = cmdParam.get(Constants.CMD_PARAM_SUB_PROCESS);
- processInstanceId = Integer.parseInt(pId);
- } else if
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD)) {
- // waiting thread command
- String pId =
cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD);
- processInstanceId = Integer.parseInt(pId);
+ CommandType commandTypeIfComplement =
getCommandTypeIfComplement(processInstance, command);
+ // reset global params while repeat running is needed by cmdParam
+ if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
+ setGlobalParamIfCommanded(processDefinition, cmdParam);
}
- if (processInstanceId == 0) {
- processInstance =
generateNewProcessInstance(processDefinition, command, cmdParam);
- } else {
- processInstance =
this.findProcessInstanceDetailById(processInstanceId);
- if (processInstance == null) {
- return processInstance;
- }
- CommandType commandTypeIfComplement =
getCommandTypeIfComplement(processInstance, command);
-
- // reset global params while repeat running is needed by
cmdParam
- if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
- setGlobalParamIfCommanded(processDefinition, cmdParam);
- }
-
- // Recalculate global parameters after rerun.
-
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
+ // Recalculate global parameters after rerun.
+ processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
commandTypeIfComplement,
processInstance.getScheduleTime()));
- processInstance.setProcessDefinition(processDefinition);
- }
- //reset command parameter
- if (processInstance.getCommandParam() != null) {
- Map<String, String> processCmdParam =
JSONUtils.toMap(processInstance.getCommandParam());
- for (Map.Entry<String, String> entry :
processCmdParam.entrySet()) {
- if (!cmdParam.containsKey(entry.getKey())) {
- cmdParam.put(entry.getKey(), entry.getValue());
- }
+ processInstance.setProcessDefinition(processDefinition);
+ }
+ //reset command parameter
+ if (processInstance.getCommandParam() != null) {
+ Map<String, String> processCmdParam =
JSONUtils.toMap(processInstance.getCommandParam());
+ for (Map.Entry<String, String> entry : processCmdParam.entrySet())
{
+ if (!cmdParam.containsKey(entry.getKey())) {
+ cmdParam.put(entry.getKey(), entry.getValue());
}
}
- // reset command parameter if sub process
- if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
- processInstance.setCommandParam(command.getCommandParam());
- }
- } else {
- // generate one new process instance
- processInstance = generateNewProcessInstance(processDefinition,
command, cmdParam);
+ }
+ // reset command parameter if sub process
+ if (cmdParam != null &&
cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
+ processInstance.setCommandParam(command.getCommandParam());
}
if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
logger.error("command parameter check failed!");
return null;
}
-
if (command.getScheduleTime() != null) {
processInstance.setScheduleTime(command.getScheduleTime());
}
processInstance.setHost(host);
-
ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXECUTION;
int runTime = processInstance.getRunTimes();
switch (commandType) {
@@ -846,7 +821,7 @@ public class ProcessService {
initTaskInstance(this.findTaskInstanceById(taskId));
}
cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
- String.join(Constants.COMMA,
convertIntListToString(failedList)));
+ String.join(Constants.COMMA,
convertIntListToString(failedList)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;
@@ -859,7 +834,7 @@ public class ProcessService {
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
List<Integer> suspendedNodeList =
this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
List<Integer> stopNodeList =
findTaskIdByInstanceState(processInstance.getId(),
- ExecutionStatus.KILL);
+ ExecutionStatus.KILL);
suspendedNodeList.addAll(stopNodeList);
for (Integer taskId : suspendedNodeList) {
// initialize the pause state
@@ -1274,7 +1249,7 @@ public class ProcessService {
}
}
String processParam = getSubWorkFlowParam(instanceMap,
parentProcessInstance, fatherParams);
-
+ int subProcessInstanceId = childInstance == null ? 0 :
childInstance.getId();
return new Command(
commandType,
TaskDependType.TASK_POST,
@@ -1288,7 +1263,9 @@ public class ProcessService {
task.getWorkerGroup(),
task.getEnvironmentCode(),
parentProcessInstance.getProcessInstancePriority(),
- parentProcessInstance.getDryRun()
+ parentProcessInstance.getDryRun(),
+ subProcessInstanceId,
+ parentProcessInstance.getProcessDefinitionVersion()
);
}
@@ -1886,6 +1863,8 @@ public class ProcessService {
//2 insert into recover command
Command cmd = new Command();
cmd.setProcessDefinitionCode(processDefinition.getCode());
+ cmd.setProcessDefinitionVersion(processDefinition.getVersion());
+ cmd.setProcessInstanceId(processInstance.getId());
cmd.setCommandParam(String.format("{\"%s\":%d}",
Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index 1de5c56..3c9f165 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -101,6 +101,7 @@ public class ProcessScheduleJob implements Job {
command.setWorkerGroup(workerGroup);
command.setWarningType(schedule.getWarningType());
command.setProcessInstancePriority(schedule.getProcessInstancePriority());
+ command.setProcessDefinitionVersion(processDefinition.getVersion());
getProcessService().createCommand(command);
}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index e13558d..2112563 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -82,7 +82,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.collect.Lists;
/**
* process service test
@@ -119,6 +118,8 @@ public class ProcessServiceTest {
@Mock
private ResourceMapper resourceMapper;
+ private HashMap<String, ProcessDefinition> processDefinitionCacheMaps =
new HashMap<>();
+
@Test
public void testCreateSubCommand() {
ProcessInstance parentInstance = new ProcessInstance();
@@ -240,56 +241,65 @@ public class ProcessServiceTest {
//cannot construct process instance, return null;
String host = "127.0.0.1";
- int validThreadNum = 1;
Command command = new Command();
command.setProcessDefinitionCode(222);
command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING +
"\":\"111\",\""
+ CMD_PARAM_SUB_PROCESS_DEFINE_ID + "\":\"222\"}");
- Assert.assertNull(processService.handleCommand(logger, host,
validThreadNum, command));
+ Assert.assertNull(processService.handleCommand(logger, host, command,
processDefinitionCacheMaps));
+ int definitionVersion = 1;
+ long definitionCode = 123;
+ int processInstanceId = 222;
//there is not enough thread for this command
Command command1 = new Command();
- command1.setProcessDefinitionCode(123);
+ command1.setProcessDefinitionCode(definitionCode);
+ command1.setProcessDefinitionVersion(definitionVersion);
command1.setCommandParam("{\"ProcessInstanceId\":222}");
command1.setCommandType(CommandType.START_PROCESS);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(123);
processDefinition.setName("test");
- processDefinition.setVersion(1);
- processDefinition.setCode(11L);
+ processDefinition.setVersion(definitionVersion);
+ processDefinition.setCode(definitionCode);
processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
ProcessInstance processInstance = new ProcessInstance();
- processInstance.setId(222);
- processInstance.setProcessDefinitionCode(11L);
- processInstance.setProcessDefinitionVersion(1);
-
Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode())).thenReturn(processDefinition);
+ processInstance.setId(processInstanceId);
+ processInstance.setProcessDefinitionCode(definitionCode);
+ processInstance.setProcessDefinitionVersion(definitionVersion);
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new
ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
- Assert.assertNotNull(processService.handleCommand(logger, host,
validThreadNum, command1));
+ Assert.assertNotNull(processService.handleCommand(logger, host,
command1, processDefinitionCacheMaps));
Command command2 = new Command();
command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
- command2.setProcessDefinitionCode(123);
+ command2.setProcessDefinitionCode(definitionCode);
+ command2.setProcessDefinitionVersion(definitionVersion);
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
+ command2.setProcessInstanceId(processInstanceId);
- Assert.assertNotNull(processService.handleCommand(logger, host,
validThreadNum, command2));
+ Assert.assertNotNull(processService.handleCommand(logger, host,
command2, processDefinitionCacheMaps));
Command command3 = new Command();
- command3.setProcessDefinitionCode(123);
+ command3.setProcessDefinitionCode(definitionCode);
+ command3.setProcessDefinitionVersion(definitionVersion);
+ command3.setProcessInstanceId(processInstanceId);
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
- Assert.assertNotNull(processService.handleCommand(logger, host,
validThreadNum, command3));
+ Assert.assertNotNull(processService.handleCommand(logger, host,
command3, processDefinitionCacheMaps));
Command command4 = new Command();
- command4.setProcessDefinitionCode(123);
+ command4.setProcessDefinitionCode(definitionCode);
+ command4.setProcessDefinitionVersion(definitionVersion);
command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
command4.setCommandType(CommandType.REPEAT_RUNNING);
- Assert.assertNotNull(processService.handleCommand(logger, host,
validThreadNum, command4));
+ command4.setProcessInstanceId(processInstanceId);
+ Assert.assertNotNull(processService.handleCommand(logger, host,
command4, processDefinitionCacheMaps));
Command command5 = new Command();
- command5.setProcessDefinitionCode(123);
+ command5.setProcessDefinitionCode(definitionCode);
+ command5.setProcessDefinitionVersion(definitionVersion);
HashMap<String, String> startParams = new HashMap<>();
startParams.put("startParam1", "testStartParam1");
HashMap<String, String> commandParams = new HashMap<>();
@@ -297,7 +307,7 @@ public class ProcessServiceTest {
command5.setCommandParam(JSONUtils.toJsonString(commandParams));
command5.setCommandType(CommandType.START_PROCESS);
command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
- ProcessInstance processInstance1 =
processService.handleCommand(logger, host, validThreadNum, command5);
+ ProcessInstance processInstance1 =
processService.handleCommand(logger, host, command5,
processDefinitionCacheMaps);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
}
diff --git a/sql/dolphinscheduler_h2.sql b/sql/dolphinscheduler_h2.sql
index 1565ab4..c2394ef 100644
--- a/sql/dolphinscheduler_h2.sql
+++ b/sql/dolphinscheduler_h2.sql
@@ -328,6 +328,8 @@ CREATE TABLE t_ds_command
worker_group varchar(64),
environment_code bigint(20) DEFAULT '-1',
dry_run int NULL DEFAULT 0,
+ process_instance_id int(11) DEFAULT 0,
+ process_definition_version int(11) DEFAULT 0,
PRIMARY KEY (id),
KEY priority_id_index (process_instance_priority, id)
);
@@ -381,6 +383,8 @@ CREATE TABLE t_ds_error_command
environment_code bigint(20) DEFAULT '-1',
message text,
dry_run int NULL DEFAULT 0,
+ process_instance_id int(11) DEFAULT 0,
+ process_definition_version int(11) DEFAULT 0,
PRIMARY KEY (id)
);
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index 8aa1519..a54e873 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -317,22 +317,24 @@ CREATE TABLE `t_ds_alertgroup`(
-- ----------------------------
DROP TABLE IF EXISTS `t_ds_command`;
CREATE TABLE `t_ds_command` (
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
- `command_type` tinyint(4) DEFAULT NULL COMMENT 'Command type: 0 start
workflow, 1 start execution from current node, 2 resume fault-tolerant
workflow, 3 resume pause process, 4 start execution from failed node, 5
complement, 6 schedule, 7 rerun, 8 pause, 9 stop, 10 resume waiting thread',
- `process_definition_code` bigint(20) DEFAULT NULL COMMENT 'process
definition code',
- `command_param` text COMMENT 'json command parameters',
- `task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'Node dependency type: 0
current node, 1 forward, 2 backward',
- `failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'Failed policy: 0 end, 1
continue',
- `warning_type` tinyint(4) DEFAULT '0' COMMENT 'Alarm type: 0 is not sent, 1
process is sent successfully, 2 process is sent failed, 3 process is sent
successfully and all failures are sent',
- `warning_group_id` int(11) DEFAULT NULL COMMENT 'warning group',
- `schedule_time` datetime DEFAULT NULL COMMENT 'schedule time',
- `start_time` datetime DEFAULT NULL COMMENT 'start time',
- `executor_id` int(11) DEFAULT NULL COMMENT 'executor id',
- `update_time` datetime DEFAULT NULL COMMENT 'update time',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
+ `command_type` tinyint(4) DEFAULT NULL COMMENT 'Command type: 0
start workflow, 1 start execution from current node, 2 resume fault-tolerant
workflow, 3 resume pause process, 4 start execution from failed node, 5
complement, 6 schedule, 7 rerun, 8 pause, 9 stop, 10 resume waiting thread',
+ `process_definition_code` bigint(20) DEFAULT NULL COMMENT 'process
definition code',
+ `command_param` text COMMENT 'json command parameters',
+ `task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'Node dependency
type: 0 current node, 1 forward, 2 backward',
+ `failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'Failed policy: 0
end, 1 continue',
+ `warning_type` tinyint(4) DEFAULT '0' COMMENT 'Alarm type: 0 is
not sent, 1 process is sent successfully, 2 process is sent failed, 3 process
is sent successfully and all failures are sent',
+ `warning_group_id` int(11) DEFAULT NULL COMMENT 'warning group',
+ `schedule_time` datetime DEFAULT NULL COMMENT 'schedule time',
+ `start_time` datetime DEFAULT NULL COMMENT 'start time',
+ `executor_id` int(11) DEFAULT NULL COMMENT 'executor id',
+ `update_time` datetime DEFAULT NULL COMMENT 'update time',
`process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance
priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
- `worker_group` varchar(64) COMMENT 'worker group',
- `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
- `dry_run` int NULL DEFAULT 0 COMMENT 'dry run flag:0 normal, 1 dry run',
+ `worker_group` varchar(64) COMMENT 'worker group',
+ `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment
code',
+ `dry_run` int NULL DEFAULT 0 COMMENT 'dry run flag:0
normal, 1 dry run',
+ `process_instance_id` int(11) DEFAULT 0 COMMENT 'process instance id',
+ `process_definition_version` int(11) DEFAULT 0 COMMENT 'process definition
version',
PRIMARY KEY (`id`),
KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
@@ -384,6 +386,8 @@ CREATE TABLE `t_ds_error_command` (
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`message` text COMMENT 'message',
`dry_run` int NULL DEFAULT NULL COMMENT 'dry run flag: 0 normal, 1 dry run',
+ `process_instance_id` int(11) DEFAULT 0 COMMENT 'process instance id: 0',
+ `process_definition_version` int(11) DEFAULT 0 COMMENT 'process definition
version',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index 27f5259..2f02cf2 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -240,21 +240,23 @@ CREATE TABLE t_ds_alertgroup(
DROP TABLE IF EXISTS t_ds_command;
CREATE TABLE t_ds_command (
id int NOT NULL ,
- command_type int DEFAULT NULL ,
- process_definition_code bigint NOT NULL ,
- command_param text ,
- task_depend_type int DEFAULT NULL ,
- failure_strategy int DEFAULT '0' ,
- warning_type int DEFAULT '0' ,
- warning_group_id int DEFAULT NULL ,
- schedule_time timestamp DEFAULT NULL ,
- start_time timestamp DEFAULT NULL ,
- executor_id int DEFAULT NULL ,
- update_time timestamp DEFAULT NULL ,
+ command_type int DEFAULT NULL ,
+ process_definition_code bigint NOT NULL ,
+ command_param text ,
+ task_depend_type int DEFAULT NULL ,
+ failure_strategy int DEFAULT '0' ,
+ warning_type int DEFAULT '0' ,
+ warning_group_id int DEFAULT NULL ,
+ schedule_time timestamp DEFAULT NULL ,
+ start_time timestamp DEFAULT NULL ,
+ executor_id int DEFAULT NULL ,
+ update_time timestamp DEFAULT NULL ,
process_instance_priority int DEFAULT NULL ,
- worker_group varchar(64),
- environment_code bigint DEFAULT '-1',
- dry_run int DEFAULT '0' ,
+ worker_group varchar(64),
+ environment_code bigint DEFAULT '-1',
+ dry_run int DEFAULT '0' ,
+ process_instance_id int DEFAULT 0,
+ process_definition_version int DEFAULT 0,
PRIMARY KEY (id)
) ;
@@ -284,23 +286,24 @@ CREATE TABLE t_ds_datasource (
DROP TABLE IF EXISTS t_ds_error_command;
CREATE TABLE t_ds_error_command (
- id int NOT NULL ,
- command_type int DEFAULT NULL ,
- executor_id int DEFAULT NULL ,
- process_definition_code bigint NOT NULL ,
- command_param text ,
- task_depend_type int DEFAULT NULL ,
- failure_strategy int DEFAULT '0' ,
- warning_type int DEFAULT '0' ,
- warning_group_id int DEFAULT NULL ,
- schedule_time timestamp DEFAULT NULL ,
- start_time timestamp DEFAULT NULL ,
- update_time timestamp DEFAULT NULL ,
+ id int NOT NULL ,
+ command_type int DEFAULT NULL ,
+ process_definition_code bigint NOT NULL ,
+ command_param text ,
+ task_depend_type int DEFAULT NULL ,
+ failure_strategy int DEFAULT '0' ,
+ warning_type int DEFAULT '0' ,
+ warning_group_id int DEFAULT NULL ,
+ schedule_time timestamp DEFAULT NULL ,
+ start_time timestamp DEFAULT NULL ,
+ executor_id int DEFAULT NULL ,
+ update_time timestamp DEFAULT NULL ,
process_instance_priority int DEFAULT NULL ,
- worker_group varchar(64),
- environment_code bigint DEFAULT '-1',
- message text ,
- dry_ru int DEFAULT '0' ,
+ worker_group varchar(64),
+ environment_code bigint DEFAULT '-1',
+ dry_run int DEFAULT '0' ,
+ process_instance_id int DEFAULT 0,
+ process_definition_version int DEFAULT 0,
PRIMARY KEY (id)
);
--