This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch 2.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0-prepare by this push:
     new 6ffd019  [cherry-pick][6471-6502]cherry-pick 6471 && 6502 to 
2.0-prepare (#6508)
6ffd019 is described below

commit 6ffd019683c84b61378a56895f2903ec460311c4
Author: OS <[email protected]>
AuthorDate: Tue Oct 12 17:32:04 2021 +0800

    [cherry-pick][6471-6502]cherry-pick 6471 && 6502 to 2.0-prepare (#6508)
---
 .../api/service/impl/ExecutorServiceImpl.java      |  16 ++-
 .../dolphinscheduler/dao/entity/Command.java       |  41 ++++++-
 .../dolphinscheduler/dao/mapper/CommandMapper.java |   7 --
 .../dolphinscheduler/dao/mapper/CommandMapper.xml  |  10 --
 .../dao/mapper/CommandMapperTest.java              |   4 +-
 .../server/master/config/MasterConfig.java         |  12 ++
 .../master/runner/MasterSchedulerService.java      |  28 ++++-
 .../src/main/resources/master.properties           |   3 +
 .../service/process/ProcessService.java            | 133 +++++++++------------
 .../service/quartz/ProcessScheduleJob.java         |   1 +
 .../service/process/ProcessServiceTest.java        |  48 +++++---
 sql/dolphinscheduler_h2.sql                        |   4 +
 sql/dolphinscheduler_mysql.sql                     |  34 +++---
 sql/dolphinscheduler_postgre.sql                   |  63 +++++-----
 14 files changed, 235 insertions(+), 169 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 5042a03..fe5f910 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -283,13 +283,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
 
         switch (executeType) {
             case REPEAT_RUNNING:
-                result = insertCommand(loginUser, processInstanceId, 
processDefinition.getCode(), CommandType.REPEAT_RUNNING, startParams);
+                result = insertCommand(loginUser, processInstanceId, 
processDefinition.getCode(), processDefinition.getVersion(), 
CommandType.REPEAT_RUNNING, startParams);
                 break;
             case RECOVER_SUSPENDED_PROCESS:
-                result = insertCommand(loginUser, processInstanceId, 
processDefinition.getCode(), CommandType.RECOVER_SUSPENDED_PROCESS, 
startParams);
+                result = insertCommand(loginUser, processInstanceId, 
processDefinition.getCode(), processDefinition.getVersion(), 
CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
                 break;
             case START_FAILURE_TASK_PROCESS:
-                result = insertCommand(loginUser, processInstanceId, 
processDefinition.getCode(), CommandType.START_FAILURE_TASK_PROCESS, 
startParams);
+                result = insertCommand(loginUser, processInstanceId, 
processDefinition.getCode(), processDefinition.getVersion(), 
CommandType.START_FAILURE_TASK_PROCESS, startParams);
                 break;
             case STOP:
                 if (processInstance.getState() == ExecutionStatus.READY_STOP) {
@@ -409,10 +409,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
      * @param loginUser           login user
      * @param instanceId          instance id
      * @param processDefinitionCode process definition code
+     * @param version
      * @param commandType         command type
      * @return insert result code
      */
-    private Map<String, Object> insertCommand(User loginUser, Integer 
instanceId, long processDefinitionCode, CommandType commandType, String 
startParams) {
+    private Map<String, Object> insertCommand(User loginUser, Integer 
instanceId, long processDefinitionCode, int processVersion, CommandType 
commandType, String startParams) {
         Map<String, Object> result = new HashMap<>();
 
         //To add startParams only when repeat running is needed
@@ -427,6 +428,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         command.setProcessDefinitionCode(processDefinitionCode);
         command.setCommandParam(JSONUtils.toJsonString(cmdParam));
         command.setExecutorId(loginUser.getId());
+        command.setProcessDefinitionVersion(processVersion);
+        command.setProcessInstanceId(instanceId);
 
         if (!processService.verifyIsNeedCreateCommand(command)) {
             putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, 
processDefinitionCode);
@@ -545,6 +548,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         command.setWorkerGroup(workerGroup);
         command.setEnvironmentCode(environmentCode);
         command.setDryRun(dryRun);
+        ProcessDefinition processDefinition = 
processService.findProcessDefinitionByCode(processDefineCode);
+        if (processDefinition != null) {
+            
command.setProcessDefinitionVersion(processDefinition.getVersion());
+        }
+        command.setProcessInstanceId(0);
 
         Date start = null;
         Date end = null;
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
index b1ed217..ae2ff62 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
@@ -132,6 +132,12 @@ public class Command {
     @TableField("dry_run")
     private int dryRun;
 
+    @TableField("process_instance_id")
+    private int processInstanceId;
+
+    @TableField("process_definition_version")
+    private int processDefinitionVersion;
+
     public Command() {
         this.taskDependType = TaskDependType.TASK_POST;
         this.failureStrategy = FailureStrategy.CONTINUE;
@@ -152,7 +158,10 @@ public class Command {
             String workerGroup,
             Long environmentCode,
             Priority processInstancePriority,
-            int dryRun) {
+            int dryRun,
+            int processInstanceId,
+            int processDefinitionVersion
+    ) {
         this.commandType = commandType;
         this.executorId = executorId;
         this.processDefinitionCode = processDefinitionCode;
@@ -168,6 +177,8 @@ public class Command {
         this.environmentCode = environmentCode;
         this.processInstancePriority = processInstancePriority;
         this.dryRun = dryRun;
+        this.processInstanceId = processInstanceId;
+        this.processDefinitionVersion = processDefinitionVersion;
     }
 
     public TaskDependType getTaskDependType() {
@@ -298,6 +309,22 @@ public class Command {
         this.dryRun = dryRun;
     }
 
+    public int getProcessInstanceId() {
+        return processInstanceId;
+    }
+
+    public void setProcessInstanceId(int processInstanceId) {
+        this.processInstanceId = processInstanceId;
+    }
+
+    public int getProcessDefinitionVersion() {
+        return processDefinitionVersion;
+    }
+
+    public void setProcessDefinitionVersion(int processDefinitionVersion) {
+        this.processDefinitionVersion = processDefinitionVersion;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -353,8 +380,13 @@ public class Command {
         if (processInstancePriority != command.processInstancePriority) {
             return false;
         }
+        if (processInstanceId != command.processInstanceId) {
+            return false;
+        }
+        if (processDefinitionVersion != command.getProcessDefinitionVersion()) 
{
+            return false;
+        }
         return !(updateTime != null ? !updateTime.equals(command.updateTime) : 
command.updateTime != null);
-
     }
 
     @Override
@@ -375,6 +407,8 @@ public class Command {
         result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 
0);
         result = 31 * result + (environmentCode != null ? 
environmentCode.hashCode() : 0);
         result = 31 * result + dryRun;
+        result = 31 * result + processInstanceId;
+        result = 31 * result + processDefinitionVersion;
         return result;
     }
 
@@ -397,7 +431,10 @@ public class Command {
                 + ", workerGroup='" + workerGroup + '\''
                 + ", environmentCode='" + environmentCode + '\''
                 + ", dryRun='" + dryRun + '\''
+                + ", processInstanceId='" + processInstanceId + '\''
+                + ", processDefinitionVersion='" + processDefinitionVersion + 
'\''
                 + '}';
     }
+
 }
 
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
index 2bbfb4b..2291384 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
@@ -31,13 +31,6 @@ import java.util.List;
  */
 public interface CommandMapper extends BaseMapper<Command> {
 
-
-    /**
-     * get one command
-     * @return command
-     */
-    Command getOneToRun();
-
     /**
      * count command state
      * @param userId userId
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index 5b2d6b4..b0ea477 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -18,16 +18,6 @@
 
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd"; >
 <mapper namespace="org.apache.dolphinscheduler.dao.mapper.CommandMapper">
-    <select id="getOneToRun" 
resultType="org.apache.dolphinscheduler.dao.entity.Command">
-        select cmd.id, cmd.command_type, cmd.process_definition_code, 
cmd.command_param, cmd.task_depend_type, cmd.failure_strategy,
-        cmd.warning_type, cmd.warning_group_id, cmd.schedule_time, 
cmd.start_time, cmd.executor_id, cmd.update_time,
-        cmd.process_instance_priority, cmd.worker_group, cmd.environment_code, 
cmd.dry_run
-        from t_ds_command cmd
-        join t_ds_process_definition definition on cmd.process_definition_code 
= definition.code
-        where definition.release_state = 1 AND definition.flag = 1
-        order by cmd.update_time asc
-        limit 1
-    </select>
     <select id="countCommandState" 
resultType="org.apache.dolphinscheduler.dao.entity.CommandCount">
         select cmd.command_type as command_type, count(1) as count
         from t_ds_command cmd, t_ds_process_definition process
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
index dc9dbee..0266ac0 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
@@ -147,7 +147,7 @@ public class CommandMapperTest {
 
         createCommand(CommandType.START_PROCESS, processDefinition.getCode());
 
-        Command actualCommand = commandMapper.getOneToRun();
+        List<Command> actualCommand = commandMapper.queryCommandPage(1,0);
 
         assertNotNull(actualCommand);
     }
@@ -259,6 +259,8 @@ public class CommandMapperTest {
         command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
         command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
         command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        command.setProcessInstanceId(0);
+        command.setProcessDefinitionVersion(0);
         commandMapper.insert(command);
 
         return command;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 6c2e2a1..124eceb 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -60,6 +60,9 @@ public class MasterConfig {
     @Value("${master.reserved.memory:0.3}")
     private double masterReservedMemory;
 
+    @Value("${master.cache.process.definition:true}")
+    private boolean masterCacheProcessDefinition;
+
     public int getListenPort() {
         return listenPort;
     }
@@ -150,4 +153,13 @@ public class MasterConfig {
     public void setStateWheelInterval(int stateWheelInterval) {
         this.stateWheelInterval = stateWheelInterval;
     }
+
+    public boolean getMasterCacheProcessDefinition() {
+        return masterCacheProcessDefinition;
+    }
+
+    public void setMasterCacheProcessDefinition(boolean 
masterCacheProcessDefinition) {
+        this.masterCacheProcessDefinition = masterCacheProcessDefinition;
+    }
+
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index faa4eb0..803ba09 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
@@ -34,6 +35,7 @@ import 
org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -92,11 +94,26 @@ public class MasterSchedulerService extends Thread {
      */
     private ThreadPoolExecutor masterExecService;
 
-
+    /**
+     * process instance execution list
+     */
     private ConcurrentHashMap<Integer, WorkflowExecuteThread> 
processInstanceExecMaps;
+    /**
+     * process timeout check list
+     */
     ConcurrentHashMap<Integer, ProcessInstance> processTimeoutCheckList = new 
ConcurrentHashMap<>();
+
+    /**
+     * task time out checkout list
+     */
     ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new 
ConcurrentHashMap<>();
 
+    /**
+     * key:code-version
+     * value: processDefinition
+     */
+    HashMap<String, ProcessDefinition> processDefinitionCacheMaps = new 
HashMap<>();
+
     private StateWheelExecuteThread stateWheelExecuteThread;
 
     /**
@@ -112,7 +129,6 @@ public class MasterSchedulerService extends Thread {
                 taskTimeoutCheckList,
                 this.processInstanceExecMaps,
                 masterConfig.getStateWheelInterval() * 
Constants.SLEEP_TIME_MILLIS);
-
     }
 
     @Override
@@ -165,7 +181,6 @@ public class MasterSchedulerService extends Thread {
      */
     private void scheduleProcess() throws Exception {
 
-        int activeCount = masterExecService.getActiveCount();
         // make sure to scan and delete command  table in one transaction
         Command command = findOneCommand();
         if (command != null) {
@@ -173,7 +188,12 @@ public class MasterSchedulerService extends Thread {
             try {
                 ProcessInstance processInstance = 
processService.handleCommand(logger,
                         getLocalAddress(),
-                        this.masterConfig.getMasterExecThreads() - 
activeCount, command);
+                        command,
+                        processDefinitionCacheMaps);
+                if (!masterConfig.getMasterCacheProcessDefinition()
+                        && processDefinitionCacheMaps.size() > 0) {
+                    processDefinitionCacheMaps.clear();
+                }
                 if (processInstance != null) {
                     WorkflowExecuteThread workflowExecuteThread = new 
WorkflowExecuteThread(
                             processInstance
diff --git a/dolphinscheduler-server/src/main/resources/master.properties 
b/dolphinscheduler-server/src/main/resources/master.properties
index cc45622..fbe8f6e 100644
--- a/dolphinscheduler-server/src/main/resources/master.properties
+++ b/dolphinscheduler-server/src/main/resources/master.properties
@@ -39,6 +39,9 @@
 # master commit task interval, the unit is millisecond
 #master.task.commit.interval=1000
 
+# master cache process definition, default: true
+#master.cache.process.definition=true
+
 # master max cpuload avg, only higher than the system cpu load average, master 
server can schedule. default value -1: the number of cpu cores * 2
 #master.max.cpuload.avg=-1
 
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 3c3b41d..a92302e 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -208,13 +208,12 @@ public class ProcessService {
      *
      * @param logger logger
      * @param host host
-     * @param validThreadNum validThreadNum
      * @param command found command
+     * @param processDefinitionCacheMaps
      * @return process instance
      */
-    @Transactional(rollbackFor = Exception.class)
-    public ProcessInstance handleCommand(Logger logger, String host, int 
validThreadNum, Command command) {
-        ProcessInstance processInstance = constructProcessInstance(command, 
host);
+    public ProcessInstance handleCommand(Logger logger, String host, Command 
command, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
+        ProcessInstance processInstance = constructProcessInstance(command, 
host, processDefinitionCacheMaps);
         // cannot construct process instance, return null
         if (processInstance == null) {
             logger.error("scan command, command parameter is error: {}", 
command);
@@ -235,7 +234,6 @@ public class ProcessService {
      * @param command command
      * @param message message
      */
-    @Transactional(rollbackFor = Exception.class)
     public void moveToErrorCommand(Command command, String message) {
         ErrorCommand errorCommand = new ErrorCommand(command, message);
         this.errorCommandMapper.insert(errorCommand);
@@ -287,15 +285,6 @@ public class ProcessService {
     }
 
     /**
-     * find one command from queue list
-     *
-     * @return command
-     */
-    public Command findOneCommand() {
-        return commandMapper.getOneToRun();
-    }
-
-    /**
      * get command page
      *
      * @param pageSize
@@ -547,7 +536,9 @@ public class ProcessService {
                 processInstance.getWorkerGroup(),
                 processInstance.getEnvironmentCode(),
                 processInstance.getProcessInstancePriority(),
-                processInstance.getDryRun()
+                processInstance.getDryRun(),
+                processInstance.getId(),
+                processInstance.getProcessDefinitionVersion()
             );
             saveCommand(command);
             return;
@@ -744,90 +735,74 @@ public class ProcessService {
     /**
      * construct process instance according to one command.
      *
-     * @param command command
-     * @param host host
+     * @param command                    command
+     * @param host                       host
+     * @param processDefinitionCacheMaps
      * @return process instance
      */
-    private ProcessInstance constructProcessInstance(Command command, String 
host) {
+    private ProcessInstance constructProcessInstance(Command command, String 
host, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
         ProcessInstance processInstance;
+        ProcessDefinition processDefinition;
         CommandType commandType = command.getCommandType();
-        Map<String, String> cmdParam = 
JSONUtils.toMap(command.getCommandParam());
-
-        ProcessDefinition processDefinition = 
getProcessDefinitionByCommand(command.getProcessDefinitionCode(), cmdParam);
+        String key = String.format("%d-%d", 
command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
+        if (processDefinitionCacheMaps.containsKey(key)) {
+            processDefinition = processDefinitionCacheMaps.get(key);
+        } else {
+            processDefinition = 
this.findProcessDefinition(command.getProcessDefinitionCode(), 
command.getProcessDefinitionVersion());
+            if (processDefinition != null) {
+                processDefinitionCacheMaps.put(key, processDefinition);
+            }
+        }
         if (processDefinition == null) {
             logger.error("cannot find the work process define! define code : 
{}", command.getProcessDefinitionCode());
             return null;
         }
-
+        Map<String, String> cmdParam = 
JSONUtils.toMap(command.getCommandParam());
+        int processInstanceId = command.getProcessInstanceId();
+        if (processInstanceId == 0) {
+            processInstance = generateNewProcessInstance(processDefinition, 
command, cmdParam);
+        } else {
+            processInstance = 
this.findProcessInstanceDetailById(processInstanceId);
+            if (processInstance == null) {
+                return processInstance;
+            }
+        }
         if (cmdParam != null) {
-            int processInstanceId = 0;
-            // recover from failure or pause tasks
-            if 
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
-                String processId = 
cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING);
-                processInstanceId = Integer.parseInt(processId);
-                if (processInstanceId == 0) {
-                    logger.error("command parameter is error, [ 
ProcessInstanceId ] is 0");
-                    return null;
-                }
-            } else if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
-                // sub process map
-                String pId = cmdParam.get(Constants.CMD_PARAM_SUB_PROCESS);
-                processInstanceId = Integer.parseInt(pId);
-            } else if 
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD)) {
-                // waiting thread command
-                String pId = 
cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD);
-                processInstanceId = Integer.parseInt(pId);
+            CommandType commandTypeIfComplement = 
getCommandTypeIfComplement(processInstance, command);
+            // reset global params while repeat running is needed by cmdParam
+            if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
+                setGlobalParamIfCommanded(processDefinition, cmdParam);
             }
 
-            if (processInstanceId == 0) {
-                processInstance = 
generateNewProcessInstance(processDefinition, command, cmdParam);
-            } else {
-                processInstance = 
this.findProcessInstanceDetailById(processInstanceId);
-                if (processInstance == null) {
-                    return processInstance;
-                }
-                CommandType commandTypeIfComplement = 
getCommandTypeIfComplement(processInstance, command);
-
-                // reset global params while repeat running is needed by 
cmdParam
-                if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
-                    setGlobalParamIfCommanded(processDefinition, cmdParam);
-                }
-
-                // Recalculate global parameters after rerun.
-                
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
+            // Recalculate global parameters after rerun.
+            processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
                     processDefinition.getGlobalParamMap(),
                     processDefinition.getGlobalParamList(),
                     commandTypeIfComplement,
                     processInstance.getScheduleTime()));
-                processInstance.setProcessDefinition(processDefinition);
-            }
-            //reset command parameter
-            if (processInstance.getCommandParam() != null) {
-                Map<String, String> processCmdParam = 
JSONUtils.toMap(processInstance.getCommandParam());
-                for (Map.Entry<String, String> entry : 
processCmdParam.entrySet()) {
-                    if (!cmdParam.containsKey(entry.getKey())) {
-                        cmdParam.put(entry.getKey(), entry.getValue());
-                    }
+            processInstance.setProcessDefinition(processDefinition);
+        }
+        //reset command parameter
+        if (processInstance.getCommandParam() != null) {
+            Map<String, String> processCmdParam = 
JSONUtils.toMap(processInstance.getCommandParam());
+            for (Map.Entry<String, String> entry : processCmdParam.entrySet()) 
{
+                if (!cmdParam.containsKey(entry.getKey())) {
+                    cmdParam.put(entry.getKey(), entry.getValue());
                 }
             }
-            // reset command parameter if sub process
-            if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
-                processInstance.setCommandParam(command.getCommandParam());
-            }
-        } else {
-            // generate one new process instance
-            processInstance = generateNewProcessInstance(processDefinition, 
command, cmdParam);
+        }
+        // reset command parameter if sub process
+        if (cmdParam != null && 
cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
+            processInstance.setCommandParam(command.getCommandParam());
         }
         if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
             logger.error("command parameter check failed!");
             return null;
         }
-
         if (command.getScheduleTime() != null) {
             processInstance.setScheduleTime(command.getScheduleTime());
         }
         processInstance.setHost(host);
-
         ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXECUTION;
         int runTime = processInstance.getRunTimes();
         switch (commandType) {
@@ -846,7 +821,7 @@ public class ProcessService {
                     initTaskInstance(this.findTaskInstanceById(taskId));
                 }
                 cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
-                    String.join(Constants.COMMA, 
convertIntListToString(failedList)));
+                        String.join(Constants.COMMA, 
convertIntListToString(failedList)));
                 
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 processInstance.setRunTimes(runTime + 1);
                 break;
@@ -859,7 +834,7 @@ public class ProcessService {
                 
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
                 List<Integer> suspendedNodeList = 
this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
                 List<Integer> stopNodeList = 
findTaskIdByInstanceState(processInstance.getId(),
-                    ExecutionStatus.KILL);
+                        ExecutionStatus.KILL);
                 suspendedNodeList.addAll(stopNodeList);
                 for (Integer taskId : suspendedNodeList) {
                     // initialize the pause state
@@ -1274,7 +1249,7 @@ public class ProcessService {
             }
         }
         String processParam = getSubWorkFlowParam(instanceMap, 
parentProcessInstance, fatherParams);
-
+        int subProcessInstanceId = childInstance == null ? 0 : 
childInstance.getId();
         return new Command(
             commandType,
             TaskDependType.TASK_POST,
@@ -1288,7 +1263,9 @@ public class ProcessService {
             task.getWorkerGroup(),
             task.getEnvironmentCode(),
             parentProcessInstance.getProcessInstancePriority(),
-            parentProcessInstance.getDryRun()
+            parentProcessInstance.getDryRun(),
+            subProcessInstanceId,
+            parentProcessInstance.getProcessDefinitionVersion()
         );
     }
 
@@ -1886,6 +1863,8 @@ public class ProcessService {
         //2 insert into recover command
         Command cmd = new Command();
         cmd.setProcessDefinitionCode(processDefinition.getCode());
+        cmd.setProcessDefinitionVersion(processDefinition.getVersion());
+        cmd.setProcessInstanceId(processInstance.getId());
         cmd.setCommandParam(String.format("{\"%s\":%d}", 
Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
         cmd.setExecutorId(processInstance.getExecutorId());
         cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index 1de5c56..3c9f165 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -101,6 +101,7 @@ public class ProcessScheduleJob implements Job {
         command.setWorkerGroup(workerGroup);
         command.setWarningType(schedule.getWarningType());
         
command.setProcessInstancePriority(schedule.getProcessInstancePriority());
+        command.setProcessDefinitionVersion(processDefinition.getVersion());
 
         getProcessService().createCommand(command);
     }
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index e13558d..2112563 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -82,7 +82,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.collect.Lists;
 
 /**
  * process service test
@@ -119,6 +118,8 @@ public class ProcessServiceTest {
     @Mock
     private ResourceMapper resourceMapper;
 
+    private HashMap<String, ProcessDefinition> processDefinitionCacheMaps = 
new HashMap<>();
+
     @Test
     public void testCreateSubCommand() {
         ProcessInstance parentInstance = new ProcessInstance();
@@ -240,56 +241,65 @@ public class ProcessServiceTest {
 
         //cannot construct process instance, return null;
         String host = "127.0.0.1";
-        int validThreadNum = 1;
         Command command = new Command();
         command.setProcessDefinitionCode(222);
         command.setCommandType(CommandType.REPEAT_RUNNING);
         command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + 
"\":\"111\",\""
             + CMD_PARAM_SUB_PROCESS_DEFINE_ID + "\":\"222\"}");
-        Assert.assertNull(processService.handleCommand(logger, host, 
validThreadNum, command));
+        Assert.assertNull(processService.handleCommand(logger, host, command, 
processDefinitionCacheMaps));
 
+        int definitionVersion = 1;
+        long definitionCode = 123;
+        int processInstanceId = 222;
         //there is not enough thread for this command
         Command command1 = new Command();
-        command1.setProcessDefinitionCode(123);
+        command1.setProcessDefinitionCode(definitionCode);
+        command1.setProcessDefinitionVersion(definitionVersion);
         command1.setCommandParam("{\"ProcessInstanceId\":222}");
         command1.setCommandType(CommandType.START_PROCESS);
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setId(123);
         processDefinition.setName("test");
-        processDefinition.setVersion(1);
-        processDefinition.setCode(11L);
+        processDefinition.setVersion(definitionVersion);
+        processDefinition.setCode(definitionCode);
         
processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
         ProcessInstance processInstance = new ProcessInstance();
-        processInstance.setId(222);
-        processInstance.setProcessDefinitionCode(11L);
-        processInstance.setProcessDefinitionVersion(1);
-        
Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode())).thenReturn(processDefinition);
+        processInstance.setId(processInstanceId);
+        processInstance.setProcessDefinitionCode(definitionCode);
+        processInstance.setProcessDefinitionVersion(definitionVersion);
         
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
             processInstance.getProcessDefinitionVersion())).thenReturn(new 
ProcessDefinitionLog(processDefinition));
         
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
-        Assert.assertNotNull(processService.handleCommand(logger, host, 
validThreadNum, command1));
+        Assert.assertNotNull(processService.handleCommand(logger, host, 
command1, processDefinitionCacheMaps));
 
         Command command2 = new Command();
         
command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
-        command2.setProcessDefinitionCode(123);
+        command2.setProcessDefinitionCode(definitionCode);
+        command2.setProcessDefinitionVersion(definitionVersion);
         command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
+        command2.setProcessInstanceId(processInstanceId);
 
-        Assert.assertNotNull(processService.handleCommand(logger, host, 
validThreadNum, command2));
+        Assert.assertNotNull(processService.handleCommand(logger, host, 
command2, processDefinitionCacheMaps));
 
         Command command3 = new Command();
-        command3.setProcessDefinitionCode(123);
+        command3.setProcessDefinitionCode(definitionCode);
+        command3.setProcessDefinitionVersion(definitionVersion);
+        command3.setProcessInstanceId(processInstanceId);
         command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
         command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
-        Assert.assertNotNull(processService.handleCommand(logger, host, 
validThreadNum, command3));
+        Assert.assertNotNull(processService.handleCommand(logger, host, 
command3, processDefinitionCacheMaps));
 
         Command command4 = new Command();
-        command4.setProcessDefinitionCode(123);
+        command4.setProcessDefinitionCode(definitionCode);
+        command4.setProcessDefinitionVersion(definitionVersion);
         
command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
         command4.setCommandType(CommandType.REPEAT_RUNNING);
-        Assert.assertNotNull(processService.handleCommand(logger, host, 
validThreadNum, command4));
+        command4.setProcessInstanceId(processInstanceId);
+        Assert.assertNotNull(processService.handleCommand(logger, host, 
command4, processDefinitionCacheMaps));
 
         Command command5 = new Command();
-        command5.setProcessDefinitionCode(123);
+        command5.setProcessDefinitionCode(definitionCode);
+        command5.setProcessDefinitionVersion(definitionVersion);
         HashMap<String, String> startParams = new HashMap<>();
         startParams.put("startParam1", "testStartParam1");
         HashMap<String, String> commandParams = new HashMap<>();
@@ -297,7 +307,7 @@ public class ProcessServiceTest {
         command5.setCommandParam(JSONUtils.toJsonString(commandParams));
         command5.setCommandType(CommandType.START_PROCESS);
         command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
-        ProcessInstance processInstance1 = 
processService.handleCommand(logger, host, validThreadNum, command5);
+        ProcessInstance processInstance1 = 
processService.handleCommand(logger, host, command5, 
processDefinitionCacheMaps);
         
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
     }
 
diff --git a/sql/dolphinscheduler_h2.sql b/sql/dolphinscheduler_h2.sql
index 1565ab4..c2394ef 100644
--- a/sql/dolphinscheduler_h2.sql
+++ b/sql/dolphinscheduler_h2.sql
@@ -328,6 +328,8 @@ CREATE TABLE t_ds_command
     worker_group              varchar(64),
     environment_code          bigint(20) DEFAULT '-1',
     dry_run                   int NULL DEFAULT 0,
+    process_instance_id       int(11) DEFAULT 0,
+    process_definition_version int(11) DEFAULT 0,
     PRIMARY KEY (id),
     KEY                       priority_id_index (process_instance_priority, id)
 );
@@ -381,6 +383,8 @@ CREATE TABLE t_ds_error_command
     environment_code          bigint(20) DEFAULT '-1',
     message                   text,
     dry_run                   int NULL DEFAULT 0,
+    process_instance_id       int(11) DEFAULT 0,
+    process_definition_version int(11) DEFAULT 0,
     PRIMARY KEY (id)
 );
 
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index 8aa1519..a54e873 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -317,22 +317,24 @@ CREATE TABLE `t_ds_alertgroup`(
 -- ----------------------------
 DROP TABLE IF EXISTS `t_ds_command`;
 CREATE TABLE `t_ds_command` (
-  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
-  `command_type` tinyint(4) DEFAULT NULL COMMENT 'Command type: 0 start 
workflow, 1 start execution from current node, 2 resume fault-tolerant 
workflow, 3 resume pause process, 4 start execution from failed node, 5 
complement, 6 schedule, 7 rerun, 8 pause, 9 stop, 10 resume waiting thread',
-  `process_definition_code` bigint(20) DEFAULT NULL COMMENT 'process 
definition code',
-  `command_param` text COMMENT 'json command parameters',
-  `task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'Node dependency type: 0 
current node, 1 forward, 2 backward',
-  `failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'Failed policy: 0 end, 1 
continue',
-  `warning_type` tinyint(4) DEFAULT '0' COMMENT 'Alarm type: 0 is not sent, 1 
process is sent successfully, 2 process is sent failed, 3 process is sent 
successfully and all failures are sent',
-  `warning_group_id` int(11) DEFAULT NULL COMMENT 'warning group',
-  `schedule_time` datetime DEFAULT NULL COMMENT 'schedule time',
-  `start_time` datetime DEFAULT NULL COMMENT 'start time',
-  `executor_id` int(11) DEFAULT NULL COMMENT 'executor id',
-  `update_time` datetime DEFAULT NULL COMMENT 'update time',
+  `id`                        int(11)    NOT NULL AUTO_INCREMENT COMMENT 'key',
+  `command_type`              tinyint(4) DEFAULT NULL COMMENT 'Command type: 0 
start workflow, 1 start execution from current node, 2 resume fault-tolerant 
workflow, 3 resume pause process, 4 start execution from failed node, 5 
complement, 6 schedule, 7 rerun, 8 pause, 9 stop, 10 resume waiting thread',
+  `process_definition_code`   bigint(20) DEFAULT NULL COMMENT 'process 
definition code',
+  `command_param`             text COMMENT 'json command parameters',
+  `task_depend_type`          tinyint(4) DEFAULT NULL COMMENT 'Node dependency 
type: 0 current node, 1 forward, 2 backward',
+  `failure_strategy`          tinyint(4) DEFAULT '0' COMMENT 'Failed policy: 0 
end, 1 continue',
+  `warning_type`              tinyint(4) DEFAULT '0' COMMENT 'Alarm type: 0 is 
not sent, 1 process is sent successfully, 2 process is sent failed, 3 process 
is sent successfully and all failures are sent',
+  `warning_group_id`          int(11) DEFAULT NULL COMMENT 'warning group',
+  `schedule_time`             datetime DEFAULT NULL COMMENT 'schedule time',
+  `start_time`                datetime DEFAULT NULL COMMENT 'start time',
+  `executor_id`               int(11) DEFAULT NULL COMMENT 'executor id',
+  `update_time`               datetime DEFAULT NULL COMMENT 'update time',
   `process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance 
priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
-  `worker_group` varchar(64)  COMMENT 'worker group',
-  `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
-  `dry_run` int NULL DEFAULT 0 COMMENT 'dry run flag:0 normal, 1 dry run',
+  `worker_group`              varchar(64)  COMMENT 'worker group',
+  `environment_code`          bigint(20) DEFAULT '-1' COMMENT 'environment 
code',
+  `dry_run`                   int NULL DEFAULT 0 COMMENT 'dry run flag:0 
normal, 1 dry run',
+  `process_instance_id`       int(11) DEFAULT 0 COMMENT 'process instance id',
+  `process_definition_version` int(11) DEFAULT 0 COMMENT 'process definition 
version',
   PRIMARY KEY (`id`),
   KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE
 ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
@@ -384,6 +386,8 @@ CREATE TABLE `t_ds_error_command` (
   `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
   `message` text COMMENT 'message',
   `dry_run` int NULL DEFAULT NULL COMMENT 'dry run flag: 0 normal, 1 dry run',
+  `process_instance_id` int(11) DEFAULT 0 COMMENT 'process instance id: 0',
+  `process_definition_version` int(11) DEFAULT 0 COMMENT 'process definition 
version',
   PRIMARY KEY (`id`) USING BTREE
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
 
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index 27f5259..2f02cf2 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -240,21 +240,23 @@ CREATE TABLE t_ds_alertgroup(
 DROP TABLE IF EXISTS t_ds_command;
 CREATE TABLE t_ds_command (
   id int NOT NULL  ,
-  command_type int DEFAULT NULL ,
-  process_definition_code bigint NOT NULL ,
-  command_param text ,
-  task_depend_type int DEFAULT NULL ,
-  failure_strategy int DEFAULT '0' ,
-  warning_type int DEFAULT '0' ,
-  warning_group_id int DEFAULT NULL ,
-  schedule_time timestamp DEFAULT NULL ,
-  start_time timestamp DEFAULT NULL ,
-  executor_id int DEFAULT NULL ,
-  update_time timestamp DEFAULT NULL ,
+  command_type              int DEFAULT NULL ,
+  process_definition_code   bigint NOT NULL ,
+  command_param             text ,
+  task_depend_type          int DEFAULT NULL ,
+  failure_strategy          int DEFAULT '0' ,
+  warning_type              int DEFAULT '0' ,
+  warning_group_id          int DEFAULT NULL ,
+  schedule_time             timestamp DEFAULT NULL ,
+  start_time                timestamp DEFAULT NULL ,
+  executor_id               int DEFAULT NULL ,
+  update_time               timestamp DEFAULT NULL ,
   process_instance_priority int DEFAULT NULL ,
-  worker_group varchar(64),
-  environment_code bigint DEFAULT '-1',
-  dry_run int DEFAULT '0' ,
+  worker_group              varchar(64),
+  environment_code          bigint DEFAULT '-1',
+  dry_run                   int DEFAULT '0' ,
+  process_instance_id       int DEFAULT 0,
+  process_definition_version int DEFAULT 0,
   PRIMARY KEY (id)
 ) ;
 
@@ -284,23 +286,24 @@ CREATE TABLE t_ds_datasource (
 
 DROP TABLE IF EXISTS t_ds_error_command;
 CREATE TABLE t_ds_error_command (
-  id int NOT NULL ,
-  command_type int DEFAULT NULL ,
-  executor_id int DEFAULT NULL ,
-  process_definition_code bigint NOT NULL ,
-  command_param text ,
-  task_depend_type int DEFAULT NULL ,
-  failure_strategy int DEFAULT '0' ,
-  warning_type int DEFAULT '0' ,
-  warning_group_id int DEFAULT NULL ,
-  schedule_time timestamp DEFAULT NULL ,
-  start_time timestamp DEFAULT NULL ,
-  update_time timestamp DEFAULT NULL ,
+  id int NOT NULL  ,
+  command_type              int DEFAULT NULL ,
+  process_definition_code   bigint NOT NULL ,
+  command_param             text ,
+  task_depend_type          int DEFAULT NULL ,
+  failure_strategy          int DEFAULT '0' ,
+  warning_type              int DEFAULT '0' ,
+  warning_group_id          int DEFAULT NULL ,
+  schedule_time             timestamp DEFAULT NULL ,
+  start_time                timestamp DEFAULT NULL ,
+  executor_id               int DEFAULT NULL ,
+  update_time               timestamp DEFAULT NULL ,
   process_instance_priority int DEFAULT NULL ,
-  worker_group varchar(64),
-  environment_code bigint DEFAULT '-1',
-  message text ,
-  dry_ru int DEFAULT '0' ,
+  worker_group              varchar(64),
+  environment_code          bigint DEFAULT '-1',
+  dry_run                   int DEFAULT '0' ,
+  process_instance_id       int DEFAULT 0,
+  process_definition_version int DEFAULT 0,
   PRIMARY KEY (id)
 );
 --

Reply via email to