This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3e411d0  [Improvement][Code style] FIX SPELL WAITTING TO WAITING , 
etc. (#4118)
3e411d0 is described below

commit 3e411d075fffa1efd58484a1a29c54f7bfab9983
Author: felix.wang <[email protected]>
AuthorDate: Tue Dec 1 08:58:55 2020 +0800

    [Improvement][Code style] FIX SPELL WAITTING TO WAITING , etc. (#4118)
    
    * FIX SPELL
    
    * FIX SPELL AND  Optimizing code conventions
    
    * add ut  cannot construct process instance, return null;
    
    * add ut testExportProcessMetaData
    
    * add ut testExportProcessMetaData
    
    * add ut testImportProcessSchedule
    
    * add ut MasterExecThreadTest
    
    * add ut MasterExecThreadTest
    
    * add ut testSubProcessViewTree
    
    * add ut testComplementWithStartNodeList
    
    * add ut testRecurseFindSubProcessId
    
    * add ut testRecurseFindSubProcessId
    
    * add ut testRecurseFindSubProcessId
---
 .../api/service/ExecutorService.java               | 193 +++++-----
 .../service/impl/ProcessDefinitionServiceImpl.java |  30 +-
 .../api/service/ExecutorService2Test.java          |  28 +-
 .../api/service/ProcessDefinitionServiceTest.java  | 170 +++++++--
 .../apache/dolphinscheduler/common/Constants.java  |  16 +-
 .../server/master/runner/MasterExecThread.java     | 406 +++++++++++----------
 .../server/master/MasterExecThreadTest.java        | 117 ++++--
 .../service/process/ProcessService.java            |  64 ++--
 .../service/process/ProcessServiceTest.java        | 211 ++++++++++-
 9 files changed, 849 insertions(+), 386 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index fb735ec..7a0fd0f 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -14,39 +14,62 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.api.service;
 
+import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
+import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
 
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.*;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.text.ParseException;
-import java.util.*;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-
 /**
  * executor service
  */
 @Service
-public class ExecutorService extends BaseService{
+public class ExecutorService extends BaseService {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ExecutorService.class);
 
@@ -73,22 +96,22 @@ public class ExecutorService extends BaseService{
     /**
      * execute process instance
      *
-     * @param loginUser             login user
-     * @param projectName           project name
-     * @param processDefinitionId   process Definition Id
-     * @param cronTime              cron time
-     * @param commandType           command type
-     * @param failureStrategy       failuer strategy
-     * @param startNodeList         start nodelist
-     * @param taskDependType        node dependency type
-     * @param warningType           warning type
-     * @param warningGroupId         notify group id
-     * @param receivers             receivers
-     * @param receiversCc           receivers cc
+     * @param loginUser login user
+     * @param projectName project name
+     * @param processDefinitionId process Definition Id
+     * @param cronTime cron time
+     * @param commandType command type
+     * @param failureStrategy failuer strategy
+     * @param startNodeList start nodelist
+     * @param taskDependType node dependency type
+     * @param warningType warning type
+     * @param warningGroupId notify group id
+     * @param receivers receivers
+     * @param receiversCc receivers cc
      * @param processInstancePriority process instance priority
      * @param workerGroup worker group name
      * @param runMode run mode
-     * @param timeout               timeout
+     * @param timeout timeout
      * @return execute process instance code
      * @throws ParseException Parse Exception
      */
@@ -101,23 +124,23 @@ public class ExecutorService extends BaseService{
         Map<String, Object> result = new HashMap<>();
         // timeout is invalid
         if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
-            putMsg(result,Status.TASK_TIMEOUT_PARAMS_ERROR);
+            putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR);
             return result;
         }
         Project project = projectMapper.queryByName(projectName);
         Map<String, Object> checkResultAndAuth = checkResultAndAuth(loginUser, 
projectName, project);
-        if (checkResultAndAuth != null){
+        if (checkResultAndAuth != null) {
             return checkResultAndAuth;
         }
 
         // check process define release state
         ProcessDefinition processDefinition = 
processDefinitionMapper.selectById(processDefinitionId);
         result = checkProcessDefinitionValid(processDefinition, 
processDefinitionId);
-        if(result.get(Constants.STATUS) != Status.SUCCESS){
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
 
-        if (!checkTenantSuitable(processDefinition)){
+        if (!checkTenantSuitable(processDefinition)) {
             logger.error("there is not any valid tenant for the process 
definition: id:{},name:{}, ",
                     processDefinition.getId(), processDefinition.getName());
             putMsg(result, Status.TENANT_NOT_SUITABLE);
@@ -129,14 +152,13 @@ public class ExecutorService extends BaseService{
             return result;
         }
 
-
         /**
          * create command
          */
         int create = this.createCommand(commandType, processDefinitionId,
                 taskDependType, failureStrategy, startNodeList, cronTime, 
warningType, loginUser.getId(),
-                warningGroupId, runMode,processInstancePriority, workerGroup);
-        if(create > 0 ){
+                warningGroupId, runMode, processInstancePriority, workerGroup);
+        if (create > 0) {
             /**
              * according to the process definition ID updateProcessInstance 
and CC recipient
              */
@@ -152,6 +174,7 @@ public class ExecutorService extends BaseService{
 
     /**
      * check whether master exists
+     *
      * @param result result
      * @return master exists return true , otherwise return false
      */
@@ -167,7 +190,6 @@ public class ExecutorService extends BaseService{
         return true;
     }
 
-
     /**
      * check whether the process definition can be executed
      *
@@ -175,22 +197,20 @@ public class ExecutorService extends BaseService{
      * @param processDefineId process definition id
      * @return check result code
      */
-    public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition 
processDefinition, int processDefineId){
+    public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition 
processDefinition, int processDefineId) {
         Map<String, Object> result = new HashMap<>();
         if (processDefinition == null) {
             // check process definition exists
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,processDefineId);
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
         } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) 
{
             // check process definition online
-            putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,processDefineId);
-        }else{
+            putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineId);
+        } else {
             result.put(Constants.STATUS, Status.SUCCESS);
         }
         return result;
     }
 
-
-
     /**
      * do action to process instance:pause, stop, repeat, recover from pause, 
recover from stop
      *
@@ -214,7 +234,6 @@ public class ExecutorService extends BaseService{
             return result;
         }
 
-
         ProcessInstance processInstance = 
processService.findProcessInstanceDetailById(processInstanceId);
         if (processInstance == null) {
             putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, 
processInstanceId);
@@ -222,7 +241,7 @@ public class ExecutorService extends BaseService{
         }
 
         ProcessDefinition processDefinition = 
processService.findProcessDefineById(processInstance.getProcessDefinitionId());
-        if(executeType != ExecuteType.STOP && executeType != 
ExecuteType.PAUSE){
+        if (executeType != ExecuteType.STOP && executeType != 
ExecuteType.PAUSE) {
             result = checkProcessDefinitionValid(processDefinition, 
processInstance.getProcessDefinitionId());
             if (result.get(Constants.STATUS) != Status.SUCCESS) {
                 return result;
@@ -234,7 +253,7 @@ public class ExecutorService extends BaseService{
         if (status != Status.SUCCESS) {
             return checkResult;
         }
-        if (!checkTenantSuitable(processDefinition)){
+        if (!checkTenantSuitable(processDefinition)) {
             logger.error("there is not any valid tenant for the process 
definition: id:{},name:{}, ",
                     processDefinition.getId(), processDefinition.getName());
             putMsg(result, Status.TENANT_NOT_SUITABLE);
@@ -275,6 +294,7 @@ public class ExecutorService extends BaseService{
 
     /**
      * check tenant suitable
+     *
      * @param processDefinition process definition
      * @return true if tenant suitable, otherwise return false
      */
@@ -315,7 +335,7 @@ public class ExecutorService extends BaseService{
                 }
                 break;
             case RECOVER_SUSPENDED_PROCESS:
-                if (executionStatus.typeIsPause()|| 
executionStatus.typeIsCancel()) {
+                if (executionStatus.typeIsPause() || 
executionStatus.typeIsCancel()) {
                     checkResult = true;
                 }
                 break;
@@ -323,7 +343,7 @@ public class ExecutorService extends BaseService{
                 break;
         }
         if (!checkResult) {
-            putMsg(result,Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, 
processInstance.getName(), executionStatus.toString(), executeType.toString());
+            putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, 
processInstance.getName(), executionStatus.toString(), executeType.toString());
         } else {
             putMsg(result, Status.SUCCESS);
         }
@@ -331,7 +351,7 @@ public class ExecutorService extends BaseService{
     }
 
     /**
-     *  prepare to update process instance command type and status
+     * prepare to update process instance command type and status
      *
      * @param processInstance process instance
      * @param commandType command type
@@ -370,11 +390,11 @@ public class ExecutorService extends BaseService{
         command.setCommandType(commandType);
         command.setProcessDefinitionId(processDefinitionId);
         command.setCommandParam(String.format("{\"%s\":%d}",
-                CMDPARAM_RECOVER_PROCESS_ID_STRING, instanceId));
+                CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId));
         command.setExecutorId(loginUser.getId());
 
-        if(!processService.verifyIsNeedCreateCommand(command)){
-            putMsg(result, 
Status.PROCESS_INSTANCE_EXECUTING_COMMAND,processDefinitionId);
+        if (!processService.verifyIsNeedCreateCommand(command)) {
+            putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, 
processDefinitionId);
             return result;
         }
 
@@ -391,28 +411,29 @@ public class ExecutorService extends BaseService{
 
     /**
      * check if sub processes are offline before starting process definition
+     *
      * @param processDefineId process definition id
      * @return check result code
      */
     public Map<String, Object> startCheckByProcessDefinedId(int 
processDefineId) {
         Map<String, Object> result = new HashMap<>();
 
-        if (processDefineId == 0){
+        if (processDefineId == 0) {
             logger.error("process definition id is null");
-            putMsg(result,Status.REQUEST_PARAMS_NOT_VALID_ERROR,"process 
definition id");
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "process 
definition id");
         }
         List<Integer> ids = new ArrayList<>();
         processService.recurseFindSubProcessId(processDefineId, ids);
         Integer[] idArray = ids.toArray(new Integer[ids.size()]);
-        if (!ids.isEmpty()){
+        if (!ids.isEmpty()) {
             List<ProcessDefinition> processDefinitionList = 
processDefinitionMapper.queryDefinitionListByIdList(idArray);
-            if (processDefinitionList != null){
-                for (ProcessDefinition processDefinition : 
processDefinitionList){
+            if (processDefinitionList != null) {
+                for (ProcessDefinition processDefinition : 
processDefinitionList) {
                     /**
                      * if there is no online process, exit directly
                      */
-                    if (processDefinition.getReleaseState() != 
ReleaseState.ONLINE){
-                        putMsg(result,Status.PROCESS_DEFINE_NOT_RELEASE, 
processDefinition.getName());
+                    if (processDefinition.getReleaseState() != 
ReleaseState.ONLINE) {
+                        putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, 
processDefinition.getName());
                         logger.info("not release process definition id: {} , 
name : {}",
                                 processDefinition.getId(), 
processDefinition.getName());
                         return result;
@@ -431,13 +452,13 @@ public class ExecutorService extends BaseService{
      * @param processInstanceId process instance id
      * @return receivers cc list
      */
-    public Map<String, Object> getReceiverCc(Integer processDefineId,Integer 
processInstanceId) {
+    public Map<String, Object> getReceiverCc(Integer processDefineId, Integer 
processInstanceId) {
         Map<String, Object> result = new HashMap<>();
-        logger.info("processInstanceId {}",processInstanceId);
-        if(processDefineId == null && processInstanceId == null){
+        logger.info("processInstanceId {}", processInstanceId);
+        if (processDefineId == null && processInstanceId == null) {
             throw new RuntimeException("You must set values for parameters 
processDefineId or processInstanceId");
         }
-        if(processDefineId == null && processInstanceId != null) {
+        if (processDefineId == null && processInstanceId != null) {
             ProcessInstance processInstance = 
processInstanceMapper.selectById(processInstanceId);
             if (processInstance == null) {
                 throw new RuntimeException("processInstanceId is not exists");
@@ -445,24 +466,24 @@ public class ExecutorService extends BaseService{
             processDefineId = processInstance.getProcessDefinitionId();
         }
         ProcessDefinition processDefinition = 
processDefinitionMapper.selectById(processDefineId);
-        if (processDefinition == null){
-            throw new RuntimeException(String.format("processDefineId %d is 
not exists",processDefineId));
+        if (processDefinition == null) {
+            throw new RuntimeException(String.format("processDefineId %d is 
not exists", processDefineId));
         }
 
         String receivers = processDefinition.getReceivers();
         String receiversCc = processDefinition.getReceiversCc();
-        Map<String,String> dataMap = new HashMap<>();
-        dataMap.put(Constants.RECEIVERS,receivers);
-        dataMap.put(Constants.RECEIVERS_CC,receiversCc);
+        Map<String, String> dataMap = new HashMap<>();
+        dataMap.put(Constants.RECEIVERS, receivers);
+        dataMap.put(Constants.RECEIVERS_CC, receiversCc);
 
         result.put(Constants.DATA_LIST, dataMap);
         putMsg(result, Status.SUCCESS);
         return result;
     }
 
-
     /**
      * create command
+     *
      * @param commandType commandType
      * @param processDefineId processDefineId
      * @param nodeDep nodeDep
@@ -476,37 +497,36 @@ public class ExecutorService extends BaseService{
      * @param processInstancePriority processInstancePriority
      * @param workerGroup workerGroup
      * @return command id
-     * @throws ParseException
      */
     private int createCommand(CommandType commandType, int processDefineId,
                               TaskDependType nodeDep, FailureStrategy 
failureStrategy,
                               String startNodeList, String schedule, 
WarningType warningType,
                               int executorId, int warningGroupId,
-                              RunMode runMode,Priority 
processInstancePriority, String workerGroup) throws ParseException {
+                              RunMode runMode, Priority 
processInstancePriority, String workerGroup) throws ParseException {
 
         /**
          * instantiate command schedule instance
          */
         Command command = new Command();
 
-        Map<String,String> cmdParam = new HashMap<>();
-        if(commandType == null){
+        Map<String, String> cmdParam = new HashMap<>();
+        if (commandType == null) {
             command.setCommandType(CommandType.START_PROCESS);
-        }else{
+        } else {
             command.setCommandType(commandType);
         }
         command.setProcessDefinitionId(processDefineId);
-        if(nodeDep != null){
+        if (nodeDep != null) {
             command.setTaskDependType(nodeDep);
         }
-        if(failureStrategy != null){
+        if (failureStrategy != null) {
             command.setFailureStrategy(failureStrategy);
         }
 
-        if(StringUtils.isNotEmpty(startNodeList)){
-            cmdParam.put(CMDPARAM_START_NODE_NAMES, startNodeList);
+        if (StringUtils.isNotEmpty(startNodeList)) {
+            cmdParam.put(CMD_PARAM_START_NODE_NAMES, startNodeList);
         }
-        if(warningType != null){
+        if (warningType != null) {
             command.setWarningType(warningType);
         }
         command.setCommandParam(JSONUtils.toJsonString(cmdParam));
@@ -517,32 +537,32 @@ public class ExecutorService extends BaseService{
 
         Date start = null;
         Date end = null;
-        if(StringUtils.isNotEmpty(schedule)){
+        if (StringUtils.isNotEmpty(schedule)) {
             String[] interval = schedule.split(",");
-            if(interval.length == 2){
+            if (interval.length == 2) {
                 start = DateUtils.getScheduleDate(interval[0]);
                 end = DateUtils.getScheduleDate(interval[1]);
             }
         }
 
         // determine whether to complement
-        if(commandType == CommandType.COMPLEMENT_DATA){
+        if (commandType == CommandType.COMPLEMENT_DATA) {
             runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
-            if(null != start && null != end && !start.after(end)){
-                if(runMode == RunMode.RUN_MODE_SERIAL){
+            if (null != start && null != end && !start.after(end)) {
+                if (runMode == RunMode.RUN_MODE_SERIAL) {
                     cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, 
DateUtils.dateToString(start));
                     cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, 
DateUtils.dateToString(end));
                     command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                     return processService.createCommand(command);
-                }else if (runMode == RunMode.RUN_MODE_PARALLEL){
+                } else if (runMode == RunMode.RUN_MODE_PARALLEL) {
                     List<Schedule> schedules = 
processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
                     List<Date> listDate = new LinkedList<>();
-                    if(!CollectionUtils.isEmpty(schedules)){
+                    if (!CollectionUtils.isEmpty(schedules)) {
                         for (Schedule item : schedules) {
                             
listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab()));
                         }
                     }
-                    if(!CollectionUtils.isEmpty(listDate)){
+                    if (!CollectionUtils.isEmpty(listDate)) {
                         // loop by schedule date
                         for (Date date : listDate) {
                             cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, 
DateUtils.dateToString(date));
@@ -551,10 +571,10 @@ public class ExecutorService extends BaseService{
                             processService.createCommand(command);
                         }
                         return listDate.size();
-                    }else{
+                    } else {
                         // loop by day
                         int runCunt = 0;
-                        while(!start.after(end)) {
+                        while (!start.after(end)) {
                             runCunt += 1;
                             cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, 
DateUtils.dateToString(start));
                             cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, 
DateUtils.dateToString(start));
@@ -565,11 +585,11 @@ public class ExecutorService extends BaseService{
                         return runCunt;
                     }
                 }
-            }else{
+            } else {
                 logger.error("there is not valid schedule date for the process 
definition: id:{},date:{}",
                         processDefineId, schedule);
             }
-        }else{
+        } else {
             command.setCommandParam(JSONUtils.toJsonString(cmdParam));
             return processService.createCommand(command);
         }
@@ -579,11 +599,6 @@ public class ExecutorService extends BaseService{
 
     /**
      * check result and auth
-     *
-     * @param loginUser
-     * @param projectName
-     * @param project
-     * @return
      */
     private Map<String, Object> checkResultAndAuth(User loginUser, String 
projectName, Project project) {
         // check project auth
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index e86ea83..e4b01e4 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
 
 import org.apache.dolphinscheduler.api.dto.ProcessMeta;
 import org.apache.dolphinscheduler.api.dto.treeview.Instance;
@@ -78,7 +78,6 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -86,7 +85,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -161,13 +159,14 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @return create result code
      * @throws JsonProcessingException JsonProcessingException
      */
+    @Override
     public Map<String, Object> createProcessDefinition(User loginUser,
                                                        String projectName,
                                                        String name,
                                                        String 
processDefinitionJson,
                                                        String desc,
                                                        String locations,
-                                                       String connects)  {
+                                                       String connects) {
 
         Map<String, Object> result = new HashMap<>();
         Project project = projectMapper.queryByName(projectName);
@@ -230,6 +229,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
 
     /**
      * get resource ids
+     *
      * @param processData process data
      * @return resource ids
      */
@@ -264,6 +264,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
         }
         return sb.toString();
     }
+
     /**
      * query process definition list
      *
@@ -271,6 +272,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @param projectName project name
      * @return definition list
      */
+    @Override
     public Map<String, Object> queryProcessDefinitionList(User loginUser, 
String projectName) {
 
         HashMap<String, Object> result = new HashMap<>(5);
@@ -300,6 +302,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @param userId user id
      * @return process definition page
      */
+    @Override
     public Map<String, Object> queryProcessDefinitionListPaging(User 
loginUser, String projectName, String searchVal, Integer pageNo, Integer 
pageSize, Integer userId) {
 
         Map<String, Object> result = new HashMap<>();
@@ -332,6 +335,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @param processId process definition id
      * @return process definition detail
      */
+    @Override
     public Map<String, Object> queryProcessDefinitionById(User loginUser, 
String projectName, Integer processId) {
 
         Map<String, Object> result = new HashMap<>();
@@ -366,6 +370,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @param connects connects for nodes
      * @return update result code
      */
+    @Override
     public Map<String, Object> updateProcessDefinition(User loginUser,
                                                        String projectName,
                                                        int id,
@@ -455,6 +460,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @param name name
      * @return true if process definition name not exists, otherwise false
      */
+    @Override
     public Map<String, Object> verifyProcessDefinitionName(User loginUser, 
String projectName, String name) {
 
         Map<String, Object> result = new HashMap<>();
@@ -482,6 +488,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @param processDefinitionId process definition id
      * @return delete result code
      */
+    @Override
     @Transactional(rollbackFor = RuntimeException.class)
     public Map<String, Object> deleteProcessDefinitionById(User loginUser, 
String projectName, Integer processDefinitionId) {
 
@@ -513,9 +520,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
             return result;
         }
         // check process instances is already running
-        List<ProcessInstance> processInstances =  
processInstanceService.queryByProcessDefineIdAndStatus(processDefinitionId, 
Constants.NOT_TERMINATED_STATES);
+        List<ProcessInstance> processInstances = 
processInstanceService.queryByProcessDefineIdAndStatus(processDefinitionId, 
Constants.NOT_TERMINATED_STATES);
         if (CollectionUtils.isNotEmpty(processInstances)) {
-            putMsg(result, 
Status.DELETE_PROCESS_DEFINITION_BY_ID_FAIL,processInstances.size());
+            putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_ID_FAIL, 
processInstances.size());
             return result;
         }
 
@@ -554,6 +561,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @param releaseState release state
      * @return release result code
      */
+    @Override
     @Transactional(rollbackFor = RuntimeException.class)
     public Map<String, Object> releaseProcessDefinition(User loginUser, String 
projectName, int id, int releaseState) {
         HashMap<String, Object> result = new HashMap<>();
@@ -621,6 +629,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
     /**
      * batch export process definition by ids
      */
+    @Override
     public void batchExportProcessDefinitionByIds(User loginUser, String 
projectName, String processDefinitionIds, HttpServletResponse response) {
 
         if (StringUtils.isEmpty(processDefinitionIds)) {
@@ -699,6 +708,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
 
     /**
      * get export process metadata string
+     *
      * @param processDefinitionId process definition id
      * @param processDefinition process definition
      * @return export process metadata string
@@ -790,6 +800,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @param currentProjectName current project name
      * @return import process
      */
+    @Override
     @Transactional(rollbackFor = RuntimeException.class)
     public Map<String, Object> importProcessDefinition(User loginUser, 
MultipartFile file, String currentProjectName) {
         Map<String, Object> result = new HashMap<>(5);
@@ -1123,6 +1134,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @param processDefinitionJson process definition json
      * @return check result code
      */
+    @Override
     public Map<String, Object> checkProcessNodeList(ProcessData processData, 
String processDefinitionJson) {
 
         Map<String, Object> result = new HashMap<>();
@@ -1174,6 +1186,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @param defineId define id
      * @return task node list
      */
+    @Override
     public Map<String, Object> getTaskNodeListByDefinitionId(Integer defineId) 
{
         Map<String, Object> result = new HashMap<>();
 
@@ -1210,6 +1223,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @param defineIdList define id list
      * @return task node list
      */
+    @Override
     public Map<String, Object> getTaskNodeListByDefinitionIdList(String 
defineIdList) {
         Map<String, Object> result = new HashMap<>();
 
@@ -1247,6 +1261,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @param projectId project id
      * @return process definitions in the project
      */
+    @Override
     public Map<String, Object> queryProcessDefinitionAllByProjectId(Integer 
projectId) {
 
         HashMap<String, Object> result = new HashMap<>(5);
@@ -1266,6 +1281,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
      * @return tree view json data
      * @throws Exception exception
      */
+    @Override
     public Map<String, Object> viewTree(Integer processId, Integer limit) 
throws Exception {
         Map<String, Object> result = new HashMap<>();
 
@@ -1350,7 +1366,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseService implements
                             String taskJson = taskInstance.getTaskJson();
                             taskNode = JSONUtils.parseObject(taskJson, 
TaskNode.class);
                             subProcessId = 
Integer.parseInt(JSONUtils.parseObject(
-                                    
taskNode.getParams()).path(CMDPARAM_SUB_PROCESS_DEFINE_ID).asText());
+                                    
taskNode.getParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
                         }
                         treeViewDto.getInstances().add(new 
Instance(taskInstance.getId(), taskInstance.getName(), 
taskInstance.getTaskType(), taskInstance.getState().toString()
                                 , taskInstance.getStartTime(), 
taskInstance.getEndTime(), taskInstance.getHost(), 
DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), 
subProcessId));
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
index a4c0c6b..9f8d1b2 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.api.service;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -137,9 +138,31 @@ public class ExecutorService2Test {
             Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
             verify(processService, times(1)).createCommand(any(Command.class));
         } catch (Exception e) {
+            //ignore
+        }
+    }
+
+    /**
+     * not complement
+     */
+    @Test
+    public void testComplementWithStartNodeList() throws ParseException {
+        try {
+            
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+            Map<String, Object> result = 
executorService.execProcessInstance(loginUser, projectName,
+                    processDefinitionId, cronTime, CommandType.START_PROCESS,
+                    null, "n1,n2",
+                    null, null, 0,
+                    "", "", RunMode.RUN_MODE_SERIAL,
+                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
+            Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+            verify(processService, times(1)).createCommand(any(Command.class));
+        } catch (Exception e) {
+            //ignore
         }
     }
 
+
     /**
      * date error
      */
@@ -156,6 +179,7 @@ public class ExecutorService2Test {
             Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, 
result.get(Constants.STATUS));
             verify(processService, times(0)).createCommand(any(Command.class));
         } catch (Exception e) {
+            //ignore
         }
     }
 
@@ -175,6 +199,7 @@ public class ExecutorService2Test {
             Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
             verify(processService, times(1)).createCommand(any(Command.class));
         } catch (Exception e) {
+            //ignore
         }
     }
 
@@ -194,6 +219,7 @@ public class ExecutorService2Test {
             Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
             verify(processService, 
times(31)).createCommand(any(Command.class));
         } catch (Exception e) {
+            //ignore
         }
     }
 
@@ -213,10 +239,10 @@ public class ExecutorService2Test {
             Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
             verify(processService, 
times(15)).createCommand(any(Command.class));
         } catch (Exception e) {
+            //ignore
         }
     }
 
-
     @Test
     public void testNoMsterServers() throws ParseException {
         Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new 
ArrayList<Server>());
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 55d7b09..e197e56 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -18,6 +18,8 @@
 package org.apache.dolphinscheduler.api.service;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 import org.apache.dolphinscheduler.api.dto.ProcessMeta;
 import org.apache.dolphinscheduler.api.enums.Status;
@@ -66,6 +68,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletResponse;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -83,33 +88,6 @@ import 
com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 @RunWith(MockitoJUnitRunner.class)
 public class ProcessDefinitionServiceTest {
 
-    @InjectMocks
-    private ProcessDefinitionServiceImpl processDefinitionService;
-
-    @Mock
-    private ProcessDefinitionMapper processDefineMapper;
-
-    @Mock
-    private ProjectMapper projectMapper;
-
-    @Mock
-    private ProjectServiceImpl projectService;
-
-    @Mock
-    private ScheduleMapper scheduleMapper;
-
-    @Mock
-    private ProcessService processService;
-
-    @Mock
-    private ProcessInstanceService processInstanceService;
-
-    @Mock
-    private TaskInstanceMapper taskInstanceMapper;
-
-    @Mock
-    private ProcessDefinitionVersionService processDefinitionVersionService;
-
     private static final String SHELL_JSON = "{\n"
             + "    \"globalParams\": [\n"
             + "        \n"
@@ -150,7 +128,6 @@ public class ProcessDefinitionServiceTest {
             + "    \"tenantId\": 1,\n"
             + "    \"timeout\": 0\n"
             + "}";
-
     private static final String CYCLE_SHELL_JSON = "{\n"
             + "    \"globalParams\": [\n"
             + "        \n"
@@ -253,6 +230,24 @@ public class ProcessDefinitionServiceTest {
             + "    \"tenantId\": 1,\n"
             + "    \"timeout\": 0\n"
             + "}";
+    @InjectMocks
+    private ProcessDefinitionServiceImpl processDefinitionService;
+    @Mock
+    private ProcessDefinitionMapper processDefineMapper;
+    @Mock
+    private ProjectMapper projectMapper;
+    @Mock
+    private ProjectServiceImpl projectService;
+    @Mock
+    private ScheduleMapper scheduleMapper;
+    @Mock
+    private ProcessService processService;
+    @Mock
+    private ProcessInstanceService processInstanceService;
+    @Mock
+    private TaskInstanceMapper taskInstanceMapper;
+    @Mock
+    private ProcessDefinitionVersionService processDefinitionVersionService;
 
     @Test
     public void testQueryProcessDefinitionList() {
@@ -751,6 +746,70 @@ public class ProcessDefinitionServiceTest {
         
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(),
 "shell-1")).thenReturn(taskInstance);
         Map<String, Object> taskNotNuLLRes = 
processDefinitionService.viewTree(46, 10);
         Assert.assertEquals(Status.SUCCESS, 
taskNotNuLLRes.get(Constants.STATUS));
+
+    }
+
+    @Test
+    public void testSubProcessViewTree() throws Exception {
+
+        ProcessDefinition processDefinition = getProcessDefinition();
+        processDefinition.setProcessDefinitionJson(SHELL_JSON);
+        List<ProcessInstance> processInstanceList = new ArrayList<>();
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        processInstance.setName("test_instance");
+        processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        processInstance.setHost("192.168.xx.xx");
+        processInstance.setStartTime(new Date());
+        processInstance.setEndTime(new Date());
+        processInstanceList.add(processInstance);
+
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setStartTime(new Date());
+        taskInstance.setEndTime(new Date());
+        taskInstance.setTaskType("SUB_PROCESS");
+        taskInstance.setId(1);
+        taskInstance.setName("test_task_instance");
+        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        taskInstance.setHost("192.168.xx.xx");
+        taskInstance.setTaskJson("{\n"
+                + "  \"conditionResult\": {\n"
+                + "    \"failedNode\": [\n"
+                + "      \"\"\n"
+                + "    ],\n"
+                + "    \"successNode\": [\n"
+                + "      \"\"\n"
+                + "    ]\n"
+                + "  },\n"
+                + "  \"delayTime\": \"0\",\n"
+                + "  \"dependence\": {},\n"
+                + "  \"description\": \"\",\n"
+                + "  \"id\": \"1\",\n"
+                + "  \"maxRetryTimes\": \"0\",\n"
+                + "  \"name\": \"test_task_instance\",\n"
+                + "  \"params\": {\n"
+                + "    \"processDefinitionId\": \"222\",\n"
+                + "    \"resourceList\": []\n"
+                + "  },\n"
+                + "  \"preTasks\": [],\n"
+                + "  \"retryInterval\": \"1\",\n"
+                + "  \"runFlag\": \"NORMAL\",\n"
+                + "  \"taskInstancePriority\": \"MEDIUM\",\n"
+                + "  \"timeout\": {\n"
+                + "    \"enable\": false,\n"
+                + "    \"interval\": null,\n"
+                + "    \"strategy\": \"\"\n"
+                + "  },\n"
+                + "  \"type\": \"SUB_PROCESS\",\n"
+                + "  \"workerGroup\": \"default\"\n"
+                + "}");
+        //task instance exist
+        
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
+        Mockito.when(processInstanceService.queryByProcessDefineId(46, 
10)).thenReturn(processInstanceList);
+        
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(),
 "shell-1")).thenReturn(taskInstance);
+        Map<String, Object> taskNotNuLLRes = 
processDefinitionService.viewTree(46, 10);
+        Assert.assertEquals(Status.SUCCESS, 
taskNotNuLLRes.get(Constants.STATUS));
+
     }
 
     @Test
@@ -973,7 +1032,7 @@ public class ProcessDefinitionServiceTest {
     }
 
     @Test
-    public void testBatchExportProcessDefinitionByIds() {
+    public void testBatchExportProcessDefinitionByIds() throws IOException {
         processDefinitionService.batchExportProcessDefinitionByIds(
                 null, null, null, null);
 
@@ -991,6 +1050,28 @@ public class ProcessDefinitionServiceTest {
 
         processDefinitionService.batchExportProcessDefinitionByIds(
                 loginUser, projectName, "1", null);
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setId(1);
+        
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+                + 
"{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+                + 
",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+                + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo 
\\\"123123\\\"\",\"resourceList\":[]}"
+                + 
",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+                + 
",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+                + 
",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
+        Map<String, Object> checkResult = new HashMap<>();
+        checkResult.put(Constants.STATUS, Status.SUCCESS);
+        
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
+        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, 
projectName)).thenReturn(checkResult);
+        
Mockito.when(processDefineMapper.queryByDefineId(1)).thenReturn(processDefinition);
+        HttpServletResponse response = mock(HttpServletResponse.class);
+
+        ServletOutputStream outputStream = mock(ServletOutputStream.class);
+        when(response.getOutputStream()).thenReturn(outputStream);
+        processDefinitionService.batchExportProcessDefinitionByIds(
+                loginUser, projectName, "1", response);
+
     }
 
     @Test
@@ -1182,4 +1263,35 @@ public class ProcessDefinitionServiceTest {
             result.put(Constants.MSG, status.getMsg());
         }
     }
+
+    @Test
+    public void testExportProcessMetaData() {
+        Integer processDefinitionId = 111;
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setId(processDefinitionId);
+        
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+                + "{\"failedNode\":[\"\"],\"successNode\":"
+                + "[\"\"]},\"delayTime\":\"0\",\"dependence\":{},"
+                + 
"\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\","
+                + "\"params\":{\"localParams\":[],\"rawScript\":\"echo 
\\\"123123\\\"\",\"resourceList\":[]},"
+                + 
"\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+                + 
"\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\","
+                + 
"\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
+        
Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinitionId,
 processDefinition));
+    }
+
+    @Test
+    public void testImportProcessSchedule() {
+        User loginUser = new User();
+        loginUser.setId(1);
+        loginUser.setUserType(UserType.ADMIN_USER);
+        Integer processDefinitionId = 111;
+        String processDefinitionName = "testProcessDefinition";
+        String projectName = "project_test1";
+        Map<String, Object> result = new HashMap<>();
+        putMsg(result, Status.PROJECT_NOT_FOUNT);
+        ProcessMeta processMeta = new ProcessMeta();
+        Assert.assertEquals(0, 
processDefinitionService.importProcessSchedule(loginUser, projectName, 
processMeta, processDefinitionName, processDefinitionId));
+    }
+
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 4a696d2..e473428 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -442,21 +442,21 @@ public final class Constants {
     /**
      * command parameter keys
      */
-    public static final String CMDPARAM_RECOVER_PROCESS_ID_STRING = 
"ProcessInstanceId";
+    public static final String CMD_PARAM_RECOVER_PROCESS_ID_STRING = 
"ProcessInstanceId";
 
-    public static final String CMDPARAM_RECOVERY_START_NODE_STRING = 
"StartNodeIdList";
+    public static final String CMD_PARAM_RECOVERY_START_NODE_STRING = 
"StartNodeIdList";
 
-    public static final String CMDPARAM_RECOVERY_WAITTING_THREAD = 
"WaittingThreadInstanceId";
+    public static final String CMD_PARAM_RECOVERY_WAITING_THREAD = 
"WaitingThreadInstanceId";
 
-    public static final String CMDPARAM_SUB_PROCESS = "processInstanceId";
+    public static final String CMD_PARAM_SUB_PROCESS = "processInstanceId";
 
-    public static final String CMDPARAM_EMPTY_SUB_PROCESS = "0";
+    public static final String CMD_PARAM_EMPTY_SUB_PROCESS = "0";
 
-    public static final String CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID = 
"parentProcessInstanceId";
+    public static final String CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID = 
"parentProcessInstanceId";
 
-    public static final String CMDPARAM_SUB_PROCESS_DEFINE_ID = 
"processDefinitionId";
+    public static final String CMD_PARAM_SUB_PROCESS_DEFINE_ID = 
"processDefinitionId";
 
-    public static final String CMDPARAM_START_NODE_NAMES = "StartNodeNameList";
+    public static final String CMD_PARAM_START_NODE_NAMES = 
"StartNodeNameList";
 
     /**
      * complement data start date
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 2bc8031..66de086 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -14,12 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.master.runner;
 
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_RECOVERY_START_NODE_STRING;
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_START_NODE_NAMES;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
 import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
 import static 
org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
 
@@ -35,7 +36,6 @@ import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.process.ProcessDag;
-import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@@ -87,22 +87,18 @@ public class MasterExecThread implements Runnable {
      * logger of MasterExecThread
      */
     private static final Logger logger = 
LoggerFactory.getLogger(MasterExecThread.class);
-
-    /**
-     * process instance
-     */
-    private ProcessInstance processInstance;
-
     /**
-     *  runing TaskNode
+     * runing TaskNode
      */
-    private final Map<MasterBaseTaskExecThread,Future<Boolean>> activeTaskNode 
= new ConcurrentHashMap<>();
-
+    private final Map<MasterBaseTaskExecThread, Future<Boolean>> 
activeTaskNode = new ConcurrentHashMap<>();
     /**
      * task exec service
      */
     private final ExecutorService taskExecService;
-
+    /**
+     * process instance
+     */
+    private ProcessInstance processInstance;
     /**
      * submit failure nodes
      */
@@ -116,7 +112,7 @@ public class MasterExecThread implements Runnable {
     /**
      * error task list
      */
-    private Map<String,TaskInstance> errorTaskList = new ConcurrentHashMap<>();
+    private Map<String, TaskInstance> errorTaskList = new 
ConcurrentHashMap<>();
 
     /**
      * complete task list
@@ -159,7 +155,7 @@ public class MasterExecThread implements Runnable {
     private DAG<String, TaskNode, TaskNodeRelation> dag;
 
     /**
-     *  process service
+     * process service
      */
     private ProcessService processService;
 
@@ -172,9 +168,16 @@ public class MasterExecThread implements Runnable {
      *
      */
     private NettyRemotingClient nettyRemotingClient;
+    /**
+     * submit post node
+     *
+     * @param parentNodeName parent node name
+     */
+    private Map<String, Object> propToValue = new ConcurrentHashMap<String, 
Object>();
 
     /**
      * constructor of MasterExecThread
+     *
      * @param processInstance processInstance
      * @param processService processService
      * @param nettyRemotingClient nettyRemotingClient
@@ -195,39 +198,36 @@ public class MasterExecThread implements Runnable {
         this.alertManager = alertManager;
     }
 
-
-
-
     @Override
     public void run() {
 
         // process instance is null
-        if (processInstance == null){
+        if (processInstance == null) {
             logger.info("process instance is not exists");
             return;
         }
 
         // check to see if it's done
-        if (processInstance.getState().typeIsFinished()){
-            logger.info("process instance is done : 
{}",processInstance.getId());
+        if (processInstance.getState().typeIsFinished()) {
+            logger.info("process instance is done : {}", 
processInstance.getId());
             return;
         }
 
         try {
-            if (processInstance.isComplementData() &&  Flag.NO == 
processInstance.getIsSubProcess()){
+            if (processInstance.isComplementData() && Flag.NO == 
processInstance.getIsSubProcess()) {
                 // sub process complement data
                 executeComplementProcess();
-            }else{
+            } else {
                 // execute flow
                 executeProcess();
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             logger.error("master exec thread exception", e);
             logger.error("process execute failed, process id:{}", 
processInstance.getId());
             processInstance.setState(ExecutionStatus.FAILURE);
             processInstance.setEndTime(new Date());
             processService.updateProcessInstance(processInstance);
-        }finally {
+        } finally {
             taskExecService.shutdown();
             // post handle
             postHandle();
@@ -236,6 +236,7 @@ public class MasterExecThread implements Runnable {
 
     /**
      * execute process
+     *
      * @throws Exception exception
      */
     private void executeProcess() throws Exception {
@@ -246,6 +247,7 @@ public class MasterExecThread implements Runnable {
 
     /**
      * execute complement process
+     *
      * @throws Exception exception
      */
     private void executeComplementProcess() throws Exception {
@@ -260,7 +262,7 @@ public class MasterExecThread implements Runnable {
         int processDefinitionId = processInstance.getProcessDefinitionId();
         List<Schedule> schedules = 
processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
         List<Date> listDate = Lists.newLinkedList();
-        if(!CollectionUtils.isEmpty(schedules)){
+        if (!CollectionUtils.isEmpty(schedules)) {
             for (Schedule schedule : schedules) {
                 listDate.addAll(CronUtils.getSelfFireDateList(startDate, 
endDate, schedule.getCrontab()));
             }
@@ -268,26 +270,26 @@ public class MasterExecThread implements Runnable {
         // get first fire date
         Iterator<Date> iterator = null;
         Date scheduleDate = null;
-        if(!CollectionUtils.isEmpty(listDate)) {
+        if (!CollectionUtils.isEmpty(listDate)) {
             iterator = listDate.iterator();
             scheduleDate = iterator.next();
             processInstance.setScheduleTime(scheduleDate);
             processService.updateProcessInstance(processInstance);
-        }else{
+        } else {
             scheduleDate = processInstance.getScheduleTime();
-            if(scheduleDate == null){
+            if (scheduleDate == null) {
                 scheduleDate = startDate;
             }
         }
 
-        while(Stopper.isRunning()){
+        while (Stopper.isRunning()) {
 
             logger.info("process {} start to complement {} data",
                     processInstance.getId(), 
DateUtils.dateToString(scheduleDate));
             // prepare dag and other info
             prepareProcess();
 
-            if(dag == null){
+            if (dag == null) {
                 logger.error("process {} dag is null, please check out 
parameters",
                         processInstance.getId());
                 processInstance.setState(ExecutionStatus.SUCCESS);
@@ -300,23 +302,23 @@ public class MasterExecThread implements Runnable {
 
             endProcess();
             // process instance failure ,no more complements
-            if(!processInstance.getState().typeIsSuccess()){
+            if (!processInstance.getState().typeIsSuccess()) {
                 logger.info("process {} state {}, complement not completely!",
                         processInstance.getId(), processInstance.getState());
                 break;
             }
             //  current process instance success ,next execute
-            if(null == iterator){
+            if (null == iterator) {
                 // loop by day
                 scheduleDate = DateUtils.getSomeDay(scheduleDate, 1);
-                if(scheduleDate.after(endDate)){
+                if (scheduleDate.after(endDate)) {
                     // all success
                     logger.info("process {} complement completely!", 
processInstance.getId());
                     break;
                 }
-            }else{
+            } else {
                 // loop by schedule date
-                if(!iterator.hasNext()){
+                if (!iterator.hasNext()) {
                     // all success
                     logger.info("process {} complement completely!", 
processInstance.getId());
                     break;
@@ -326,8 +328,8 @@ public class MasterExecThread implements Runnable {
             // flow end
             // execute next process instance complement data
             processInstance.setScheduleTime(scheduleDate);
-            
if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){
-                cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
+            if 
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
+                
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
                 
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
             }
 
@@ -343,9 +345,9 @@ public class MasterExecThread implements Runnable {
         }
     }
 
-
     /**
      * prepare process parameter
+     *
      * @throws Exception exception
      */
     private void prepareProcess() throws Exception {
@@ -358,23 +360,22 @@ public class MasterExecThread implements Runnable {
         logger.info("prepare process :{} end", processInstance.getId());
     }
 
-
     /**
      * process end handle
      */
     private void endProcess() {
         processInstance.setEndTime(new Date());
         processService.updateProcessInstance(processInstance);
-        if(processInstance.getState().typeIsWaitingThread()){
+        if (processInstance.getState().typeIsWaitingThread()) {
             processService.createRecoveryWaitingThreadCommand(null, 
processInstance);
         }
         List<TaskInstance> taskInstances = 
processService.findValidTaskListByProcessId(processInstance.getId());
         alertManager.sendAlertProcessInstance(processInstance, taskInstances);
     }
 
-
     /**
-     *  generate process dag
+     * generate process dag
+     *
      * @throws Exception exception
      */
     private void buildFlowDag() throws Exception {
@@ -386,7 +387,7 @@ public class MasterExecThread implements Runnable {
         List<String> startNodeNameList = 
parseStartNodeName(processInstance.getCommandParam());
         ProcessDag processDag = 
generateFlowDag(processInstance.getProcessInstanceJson(),
                 startNodeNameList, recoveryNameList, 
processInstance.getTaskDependType());
-        if(processDag == null){
+        if (processDag == null) {
             logger.error("processDag is null");
             return;
         }
@@ -397,7 +398,7 @@ public class MasterExecThread implements Runnable {
     /**
      * init task queue
      */
-    private void initTaskQueue(){
+    private void initTaskQueue() {
 
         taskFailedSubmit = false;
         activeTaskNode.clear();
@@ -405,14 +406,14 @@ public class MasterExecThread implements Runnable {
         completeTaskList.clear();
         errorTaskList.clear();
         List<TaskInstance> taskInstanceList = 
processService.findValidTaskListByProcessId(processInstance.getId());
-        for(TaskInstance task : taskInstanceList){
-            if(task.isTaskComplete()){
+        for (TaskInstance task : taskInstanceList) {
+            if (task.isTaskComplete()) {
                 completeTaskList.put(task.getName(), task);
             }
-            if(task.isConditionsTask() || 
DagHelper.haveConditionsAfterNode(task.getName(), dag)){
+            if (task.isConditionsTask() || 
DagHelper.haveConditionsAfterNode(task.getName(), dag)) {
                 continue;
             }
-            if(task.getState().typeIsFailure() && !task.taskCanRetry()){
+            if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
                 errorTaskList.put(task.getName(), task);
             }
         }
@@ -441,18 +442,19 @@ public class MasterExecThread implements Runnable {
 
     /**
      * submit task to execute
+     *
      * @param taskInstance task instance
      * @return TaskInstance
      */
     private TaskInstance submitTaskExec(TaskInstance taskInstance) {
         MasterBaseTaskExecThread abstractExecThread = null;
-        if(taskInstance.isSubProcess()){
+        if (taskInstance.isSubProcess()) {
             abstractExecThread = new SubProcessTaskExecThread(taskInstance);
-        }else if(taskInstance.isDependTask()){
+        } else if (taskInstance.isDependTask()) {
             abstractExecThread = new DependentTaskExecThread(taskInstance);
-        }else if(taskInstance.isConditionsTask()){
+        } else if (taskInstance.isConditionsTask()) {
             abstractExecThread = new ConditionsTaskExecThread(taskInstance);
-        }else {
+        } else {
             abstractExecThread = new MasterTaskExecThread(taskInstance);
         }
         Future<Boolean> future = taskExecService.submit(abstractExecThread);
@@ -463,13 +465,14 @@ public class MasterExecThread implements Runnable {
     /**
      * find task instance in db.
      * in case submit more than one same name task in the same time.
+     *
      * @param taskName task name
      * @return TaskInstance
      */
-    private TaskInstance findTaskIfExists(String taskName){
+    private TaskInstance findTaskIfExists(String taskName) {
         List<TaskInstance> taskInstanceList = 
processService.findValidTaskListByProcessId(this.processInstance.getId());
-        for(TaskInstance taskInstance : taskInstanceList){
-            if(taskInstance.getName().equals(taskName)){
+        for (TaskInstance taskInstance : taskInstanceList) {
+            if (taskInstance.getName().equals(taskName)) {
                 return taskInstance;
             }
         }
@@ -478,15 +481,16 @@ public class MasterExecThread implements Runnable {
 
     /**
      * encapsulation task
-     * @param processInstance   process instance
-     * @param nodeName          node name
+     *
+     * @param processInstance process instance
+     * @param nodeName node name
      * @return TaskInstance
      */
     private TaskInstance createTaskInstance(ProcessInstance processInstance, 
String nodeName,
                                             TaskNode taskNode) {
 
         TaskInstance taskInstance = findTaskIfExists(nodeName);
-        if(taskInstance == null){
+        if (taskInstance == null) {
             taskInstance = new TaskInstance();
             // task name
             taskInstance.setName(nodeName);
@@ -519,9 +523,9 @@ public class MasterExecThread implements Runnable {
             taskInstance.setRetryInterval(taskNode.getRetryInterval());
 
             // task instance priority
-            if(taskNode.getTaskInstancePriority() == null){
+            if (taskNode.getTaskInstancePriority() == null) {
                 taskInstance.setTaskInstancePriority(Priority.MEDIUM);
-            }else{
+            } else {
                 
taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
             }
 
@@ -530,7 +534,7 @@ public class MasterExecThread implements Runnable {
             String taskWorkerGroup = 
StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : 
taskNode.getWorkerGroup();
             if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && 
taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
                 taskInstance.setWorkerGroup(processWorkerGroup);
-            }else {
+            } else {
                 taskInstance.setWorkerGroup(taskWorkerGroup);
             }
 
@@ -540,15 +544,10 @@ public class MasterExecThread implements Runnable {
         return taskInstance;
     }
 
-    /**
-     * submit post node
-     * @param parentNodeName parent node name
-     */
-    private Map<String,Object> propToValue = new ConcurrentHashMap<String, 
Object>();
-    private void submitPostNode(String parentNodeName){
+    private void submitPostNode(String parentNodeName) {
         Set<String> submitTaskNodeList = 
DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, 
completeTaskList);
         List<TaskInstance> taskInstances = new ArrayList<>();
-        for(String taskNode : submitTaskNodeList){
+        for (String taskNode : submitTaskNodeList) {
             try {
                 VarPoolUtils.convertVarPoolToMap(propToValue, 
processInstance.getVarPool());
             } catch (ParseException e) {
@@ -558,23 +557,23 @@ public class MasterExecThread implements Runnable {
             TaskNode taskNodeObject = dag.getNode(taskNode);
             VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue);
             taskInstances.add(createTaskInstance(processInstance, taskNode,
-                taskNodeObject));
+                    taskNodeObject));
         }
 
         // if previous node success , post node submit
-        for(TaskInstance task : taskInstances){
+        for (TaskInstance task : taskInstances) {
 
-            if(readyToSubmitTaskList.containsKey(task.getName())){
+            if (readyToSubmitTaskList.containsKey(task.getName())) {
                 continue;
             }
 
-            if(completeTaskList.containsKey(task.getName())){
+            if (completeTaskList.containsKey(task.getName())) {
                 logger.info("task {} has already run success", task.getName());
                 continue;
             }
-            if(task.getState().typeIsPause() || 
task.getState().typeIsCancel()){
+            if (task.getState().typeIsPause() || 
task.getState().typeIsCancel()) {
                 logger.info("task {} stopped, the state is {}", 
task.getName(), task.getState());
-            }else{
+            } else {
                 addTaskToStandByList(task);
             }
         }
@@ -582,36 +581,37 @@ public class MasterExecThread implements Runnable {
 
     /**
      * determine whether the dependencies of the task node are complete
+     *
      * @return DependResult
      */
     private DependResult isTaskDepsComplete(String taskName) {
 
         Collection<String> startNodes = dag.getBeginNode();
         // if vertex,returns true directly
-        if(startNodes.contains(taskName)){
+        if (startNodes.contains(taskName)) {
             return DependResult.SUCCESS;
         }
         TaskNode taskNode = dag.getNode(taskName);
         List<String> depNameList = taskNode.getDepList();
-        for(String depsNode : depNameList ){
-            if(!dag.containsNode(depsNode)
+        for (String depsNode : depNameList) {
+            if (!dag.containsNode(depsNode)
                     || forbiddenTaskList.containsKey(depsNode)
-                    || skipTaskNodeList.containsKey(depsNode)){
+                    || skipTaskNodeList.containsKey(depsNode)) {
                 continue;
             }
             // dependencies must be fully completed
-            if(!completeTaskList.containsKey(depsNode)){
+            if (!completeTaskList.containsKey(depsNode)) {
                 return DependResult.WAITING;
             }
             ExecutionStatus depTaskState = 
completeTaskList.get(depsNode).getState();
-            if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){
+            if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
                 return DependResult.WAITING;
             }
             // ignore task state if current task is condition
-            if(taskNode.isConditionsTask()){
+            if (taskNode.isConditionsTask()) {
                 continue;
             }
-            if(!dependTaskSuccess(depsNode, taskName)){
+            if (!dependTaskSuccess(depsNode, taskName)) {
                 return DependResult.FAILED;
             }
         }
@@ -621,20 +621,17 @@ public class MasterExecThread implements Runnable {
 
     /**
      * depend node is completed, but here need check the condition task branch 
is the next node
-     * @param dependNodeName
-     * @param nextNodeName
-     * @return
      */
-    private boolean dependTaskSuccess(String dependNodeName, String 
nextNodeName){
-        if(dag.getNode(dependNodeName).isConditionsTask()){
+    private boolean dependTaskSuccess(String dependNodeName, String 
nextNodeName) {
+        if (dag.getNode(dependNodeName).isConditionsTask()) {
             //condition task need check the branch to run
             List<String> nextTaskList = 
DagHelper.parseConditionTask(dependNodeName, skipTaskNodeList, dag, 
completeTaskList);
-            if(!nextTaskList.contains(nextNodeName)){
+            if (!nextTaskList.contains(nextNodeName)) {
                 return false;
             }
-        }else {
+        } else {
             ExecutionStatus depTaskState = 
completeTaskList.get(dependNodeName).getState();
-            if(depTaskState.typeIsFailure()){
+            if (depTaskState.typeIsFailure()) {
                 return false;
             }
         }
@@ -643,13 +640,14 @@ public class MasterExecThread implements Runnable {
 
     /**
      * query task instance by complete state
+     *
      * @param state state
      * @return task instance list
      */
-    private List<TaskInstance> getCompleteTaskByState(ExecutionStatus state){
+    private List<TaskInstance> getCompleteTaskByState(ExecutionStatus state) {
         List<TaskInstance> resultList = new ArrayList<>();
-        for (Map.Entry<String, TaskInstance> entry: 
completeTaskList.entrySet()) {
-            if(entry.getValue().getState() == state){
+        for (Map.Entry<String, TaskInstance> entry : 
completeTaskList.entrySet()) {
+            if (entry.getValue().getState() == state) {
                 resultList.add(entry.getValue());
             }
         }
@@ -657,18 +655,19 @@ public class MasterExecThread implements Runnable {
     }
 
     /**
-     *  where there are ongoing tasks
+     * where there are ongoing tasks
+     *
      * @param state state
      * @return ExecutionStatus
      */
-    private ExecutionStatus runningState(ExecutionStatus state){
+    private ExecutionStatus runningState(ExecutionStatus state) {
         if (state == ExecutionStatus.READY_STOP
                 || state == ExecutionStatus.READY_PAUSE
                 || state == ExecutionStatus.WAITTING_THREAD
                 || state == ExecutionStatus.DELAY_EXECUTION) {
             // if the running task is not completed, the state remains 
unchanged
             return state;
-        }else{
+        } else {
             return ExecutionStatus.RUNNING_EXECUTION;
         }
     }
@@ -678,12 +677,12 @@ public class MasterExecThread implements Runnable {
      *
      * @return Boolean whether has failed task
      */
-    private boolean hasFailedTask(){
+    private boolean hasFailedTask() {
 
-        if(this.taskFailedSubmit){
+        if (this.taskFailedSubmit) {
             return true;
         }
-        if(this.errorTaskList.size() > 0){
+        if (this.errorTaskList.size() > 0) {
             return true;
         }
         return this.dependFailedTask.size() > 0;
@@ -694,9 +693,9 @@ public class MasterExecThread implements Runnable {
      *
      * @return Boolean whether process instance failed
      */
-    private boolean processFailed(){
-        if(hasFailedTask()) {
-            if(processInstance.getFailureStrategy() == FailureStrategy.END){
+    private boolean processFailed() {
+        if (hasFailedTask()) {
+            if (processInstance.getFailureStrategy() == FailureStrategy.END) {
                 return true;
             }
             if (processInstance.getFailureStrategy() == 
FailureStrategy.CONTINUE) {
@@ -708,9 +707,10 @@ public class MasterExecThread implements Runnable {
 
     /**
      * whether task for waiting thread
+     *
      * @return Boolean whether has waiting thread task
      */
-    private boolean hasWaitingThreadTask(){
+    private boolean hasWaitingThreadTask() {
         List<TaskInstance> waitingList = 
getCompleteTaskByState(ExecutionStatus.WAITTING_THREAD);
         return CollectionUtils.isNotEmpty(waitingList);
     }
@@ -720,74 +720,75 @@ public class MasterExecThread implements Runnable {
      * 1,failed retry task in the preparation queue , returns to failure 
directly
      * 2,exists pause task,complement not completed, pending submission of 
tasks, return to suspension
      * 3,success
+     *
      * @return ExecutionStatus
      */
-    private ExecutionStatus processReadyPause(){
-        if(hasRetryTaskInStandBy()){
+    private ExecutionStatus processReadyPause() {
+        if (hasRetryTaskInStandBy()) {
             return ExecutionStatus.FAILURE;
         }
 
         List<TaskInstance> pauseList = 
getCompleteTaskByState(ExecutionStatus.PAUSE);
-        if(CollectionUtils.isNotEmpty(pauseList)
+        if (CollectionUtils.isNotEmpty(pauseList)
                 || !isComplementEnd()
-                || readyToSubmitTaskList.size() > 0){
+                || readyToSubmitTaskList.size() > 0) {
             return ExecutionStatus.PAUSE;
-        }else{
+        } else {
             return ExecutionStatus.SUCCESS;
         }
     }
 
-
     /**
      * generate the latest process instance status by the tasks state
+     *
      * @return process instance execution status
      */
-    private ExecutionStatus getProcessInstanceState(){
+    private ExecutionStatus getProcessInstanceState() {
         ProcessInstance instance = 
processService.findProcessInstanceById(processInstance.getId());
         ExecutionStatus state = instance.getState();
 
-        if(activeTaskNode.size() > 0 || hasRetryTaskInStandBy()){
+        if (activeTaskNode.size() > 0 || hasRetryTaskInStandBy()) {
             // active task and retry task exists
             return runningState(state);
         }
         // process failure
-        if(processFailed()){
+        if (processFailed()) {
             return ExecutionStatus.FAILURE;
         }
 
         // waiting thread
-        if(hasWaitingThreadTask()){
+        if (hasWaitingThreadTask()) {
             return ExecutionStatus.WAITTING_THREAD;
         }
 
         // pause
-        if(state == ExecutionStatus.READY_PAUSE){
+        if (state == ExecutionStatus.READY_PAUSE) {
             return processReadyPause();
         }
 
         // stop
-        if(state == ExecutionStatus.READY_STOP){
+        if (state == ExecutionStatus.READY_STOP) {
             List<TaskInstance> stopList = 
getCompleteTaskByState(ExecutionStatus.STOP);
             List<TaskInstance> killList = 
getCompleteTaskByState(ExecutionStatus.KILL);
-            if(CollectionUtils.isNotEmpty(stopList)
+            if (CollectionUtils.isNotEmpty(stopList)
                     || CollectionUtils.isNotEmpty(killList)
-                    || !isComplementEnd()){
+                    || !isComplementEnd()) {
                 return ExecutionStatus.STOP;
-            }else{
+            } else {
                 return ExecutionStatus.SUCCESS;
             }
         }
 
         // success
-        if(state == ExecutionStatus.RUNNING_EXECUTION){
+        if (state == ExecutionStatus.RUNNING_EXECUTION) {
             List<TaskInstance> killTasks = 
getCompleteTaskByState(ExecutionStatus.KILL);
-            if(readyToSubmitTaskList.size() > 0){
+            if (readyToSubmitTaskList.size() > 0) {
                 //tasks currently pending submission, no retries, indicating 
that depend is waiting to complete
                 return ExecutionStatus.RUNNING_EXECUTION;
-            }else if(CollectionUtils.isNotEmpty(killTasks)){
+            } else if (CollectionUtils.isNotEmpty(killTasks)) {
                 // tasks maybe killed manually
                 return ExecutionStatus.FAILURE;
-            }else{
+            } else {
                 //  if the waiting queue is empty and the status is in 
progress, then success
                 return ExecutionStatus.SUCCESS;
             }
@@ -798,15 +799,14 @@ public class MasterExecThread implements Runnable {
 
     /**
      * whether standby task list have retry tasks
-     * @return
      */
     private boolean retryTaskExists() {
 
         boolean result = false;
 
-        for(String taskName : readyToSubmitTaskList.keySet()){
+        for (String taskName : readyToSubmitTaskList.keySet()) {
             TaskInstance task = readyToSubmitTaskList.get(taskName);
-            if(task.getState().typeIsFailure()){
+            if (task.getState().typeIsFailure()) {
                 result = true;
                 break;
             }
@@ -816,10 +816,11 @@ public class MasterExecThread implements Runnable {
 
     /**
      * whether complement end
+     *
      * @return Boolean whether is complement end
      */
     private boolean isComplementEnd() {
-        if(!processInstance.isComplementData()){
+        if (!processInstance.isComplementData()) {
             return true;
         }
 
@@ -828,7 +829,7 @@ public class MasterExecThread implements Runnable {
             Date endTime = 
DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
             return processInstance.getScheduleTime().equals(endTime);
         } catch (Exception e) {
-            logger.error("complement end failed ",e);
+            logger.error("complement end failed ", e);
             return false;
         }
     }
@@ -839,7 +840,7 @@ public class MasterExecThread implements Runnable {
      */
     private void updateProcessInstanceState() {
         ExecutionStatus state = getProcessInstanceState();
-        if(processInstance.getState() != state){
+        if (processInstance.getState() != state) {
             logger.info(
                     "work flow process instance [id: {}, name:{}], state 
change from {} to {}, cmd type: {}",
                     processInstance.getId(), processInstance.getName(),
@@ -856,38 +857,42 @@ public class MasterExecThread implements Runnable {
 
     /**
      * get task dependency result
+     *
      * @param taskInstance task instance
      * @return DependResult
      */
-    private DependResult getDependResultForTask(TaskInstance taskInstance){
+    private DependResult getDependResultForTask(TaskInstance taskInstance) {
         return isTaskDepsComplete(taskInstance.getName());
     }
 
     /**
      * add task to standby list
+     *
      * @param taskInstance task instance
      */
-    private void addTaskToStandByList(TaskInstance taskInstance){
+    private void addTaskToStandByList(TaskInstance taskInstance) {
         logger.info("add task to stand by list: {}", taskInstance.getName());
         readyToSubmitTaskList.putIfAbsent(taskInstance.getName(), 
taskInstance);
     }
 
     /**
      * remove task from stand by list
+     *
      * @param taskInstance task instance
      */
-    private void removeTaskFromStandbyList(TaskInstance taskInstance){
+    private void removeTaskFromStandbyList(TaskInstance taskInstance) {
         logger.info("remove task from stand by list: {}", 
taskInstance.getName());
         readyToSubmitTaskList.remove(taskInstance.getName());
     }
 
     /**
      * has retry task in standby
+     *
      * @return Boolean whether has retry task in standby
      */
-    private boolean hasRetryTaskInStandBy(){
-        for (Map.Entry<String, TaskInstance> entry: 
readyToSubmitTaskList.entrySet()) {
-            if(entry.getValue().getState().typeIsFailure()){
+    private boolean hasRetryTaskInStandBy() {
+        for (Map.Entry<String, TaskInstance> entry : 
readyToSubmitTaskList.entrySet()) {
+            if (entry.getValue().getState().typeIsFailure()) {
                 return true;
             }
         }
@@ -897,44 +902,44 @@ public class MasterExecThread implements Runnable {
     /**
      * submit and watch the tasks, until the work flow stop
      */
-    private void runProcess(){
+    private void runProcess() {
         // submit start node
         submitPostNode(null);
         boolean sendTimeWarning = false;
-        while(!processInstance.isProcessInstanceStop() && Stopper.isRunning()){
+        while (!processInstance.isProcessInstanceStop() && 
Stopper.isRunning()) {
 
             // send warning email if process time out.
-            if(!sendTimeWarning && checkProcessTimeOut(processInstance) ){
+            if (!sendTimeWarning && checkProcessTimeOut(processInstance)) {
                 alertManager.sendProcessTimeoutAlert(processInstance,
                         
processService.findProcessDefineById(processInstance.getProcessDefinitionId()));
                 sendTimeWarning = true;
             }
-            for(Map.Entry<MasterBaseTaskExecThread,Future<Boolean>> entry: 
activeTaskNode.entrySet()) {
+            for (Map.Entry<MasterBaseTaskExecThread, Future<Boolean>> entry : 
activeTaskNode.entrySet()) {
                 Future<Boolean> future = entry.getValue();
-                TaskInstance task  = entry.getKey().getTaskInstance();
+                TaskInstance task = entry.getKey().getTaskInstance();
 
-                if(!future.isDone()){
+                if (!future.isDone()) {
                     continue;
                 }
 
                 // node monitor thread complete
                 task = this.processService.findTaskInstanceById(task.getId());
 
-                if(task == null){
+                if (task == null) {
                     this.taskFailedSubmit = true;
                     activeTaskNode.remove(entry.getKey());
                     continue;
                 }
 
                 // node monitor thread complete
-                if(task.getState().typeIsFinished()){
+                if (task.getState().typeIsFinished()) {
                     activeTaskNode.remove(entry.getKey());
                 }
 
                 logger.info("task :{}, id:{} complete, state is {} ",
                         task.getName(), task.getId(), task.getState());
                 // node success , post node submit
-                if(task.getState() == ExecutionStatus.SUCCESS){
+                if (task.getState() == ExecutionStatus.SUCCESS) {
                     processInstance.setVarPool(task.getVarPool());
                     processService.updateProcessInstance(processInstance);
                     completeTaskList.put(task.getName(), task);
@@ -942,20 +947,20 @@ public class MasterExecThread implements Runnable {
                     continue;
                 }
                 // node fails, retry first, and then execute the failure 
process
-                if(task.getState().typeIsFailure()){
-                    if(task.getState() == 
ExecutionStatus.NEED_FAULT_TOLERANCE){
+                if (task.getState().typeIsFailure()) {
+                    if (task.getState() == 
ExecutionStatus.NEED_FAULT_TOLERANCE) {
                         this.recoverToleranceFaultTaskList.add(task);
                     }
-                    if(task.taskCanRetry()){
+                    if (task.taskCanRetry()) {
                         addTaskToStandByList(task);
-                    }else{
+                    } else {
                         completeTaskList.put(task.getName(), task);
-                        if( task.isConditionsTask()
-                            || 
DagHelper.haveConditionsAfterNode(task.getName(), dag)) {
+                        if (task.isConditionsTask()
+                                || 
DagHelper.haveConditionsAfterNode(task.getName(), dag)) {
                             submitPostNode(task.getName());
-                        }else{
+                        } else {
                             errorTaskList.put(task.getName(), task);
-                            if(processInstance.getFailureStrategy() == 
FailureStrategy.END){
+                            if (processInstance.getFailureStrategy() == 
FailureStrategy.END) {
                                 killTheOtherTasks();
                             }
                         }
@@ -966,30 +971,30 @@ public class MasterExecThread implements Runnable {
                 completeTaskList.put(task.getName(), task);
             }
             // send alert
-            if(CollectionUtils.isNotEmpty(this.recoverToleranceFaultTaskList)){
+            if 
(CollectionUtils.isNotEmpty(this.recoverToleranceFaultTaskList)) {
                 alertManager.sendAlertWorkerToleranceFault(processInstance, 
recoverToleranceFaultTaskList);
                 this.recoverToleranceFaultTaskList.clear();
             }
             // updateProcessInstance completed task status
             // failure priority is higher than pause
             // if a task fails, other suspended tasks need to be reset kill
-            if(errorTaskList.size() > 0){
-                for(Map.Entry<String, TaskInstance> entry: 
completeTaskList.entrySet()) {
+            if (errorTaskList.size() > 0) {
+                for (Map.Entry<String, TaskInstance> entry : 
completeTaskList.entrySet()) {
                     TaskInstance completeTask = entry.getValue();
-                    if(completeTask.getState()== ExecutionStatus.PAUSE){
+                    if (completeTask.getState() == ExecutionStatus.PAUSE) {
                         completeTask.setState(ExecutionStatus.KILL);
                         completeTaskList.put(entry.getKey(), completeTask);
                         processService.updateTaskInstance(completeTask);
                     }
                 }
             }
-            if(canSubmitTaskToQueue()){
+            if (canSubmitTaskToQueue()) {
                 submitStandByTask();
             }
             try {
                 Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (InterruptedException e) {
-                logger.error(e.getMessage(),e);
+                logger.error(e.getMessage(), e);
                 Thread.currentThread().interrupt();
             }
             updateProcessInstanceState();
@@ -1000,29 +1005,30 @@ public class MasterExecThread implements Runnable {
 
     /**
      * whether check process time out
+     *
      * @param processInstance task instance
      * @return true if time out of process instance > running time of process 
instance
      */
     private boolean checkProcessTimeOut(ProcessInstance processInstance) {
-        if(processInstance.getTimeout() == 0 ){
+        if (processInstance.getTimeout() == 0) {
             return false;
         }
 
         Date now = new Date();
-        long runningTime =  DateUtils.diffMin(now, 
processInstance.getStartTime());
+        long runningTime = DateUtils.diffMin(now, 
processInstance.getStartTime());
 
         return runningTime > processInstance.getTimeout();
     }
 
     /**
      * whether can submit task to queue
+     *
      * @return boolean
      */
     private boolean canSubmitTaskToQueue() {
         return OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), 
masterConfig.getMasterReservedMemory());
     }
 
-
     /**
      * close the on going tasks
      */
@@ -1036,7 +1042,7 @@ public class MasterExecThread implements Runnable {
 
             TaskInstance taskInstance = taskExecThread.getTaskInstance();
             taskInstance = 
processService.findTaskInstanceById(taskInstance.getId());
-            if(taskInstance != null && 
taskInstance.getState().typeIsFinished()){
+            if (taskInstance != null && 
taskInstance.getState().typeIsFinished()) {
                 continue;
             }
 
@@ -1052,16 +1058,19 @@ public class MasterExecThread implements Runnable {
 
     /**
      * whether the retry interval is timed out
+     *
      * @param taskInstance task instance
      * @return Boolean
      */
-    private boolean retryTaskIntervalOverTime(TaskInstance taskInstance){
-        if(taskInstance.getState() != ExecutionStatus.FAILURE){
+    private boolean retryTaskIntervalOverTime(TaskInstance taskInstance) {
+        if (taskInstance.getState() != ExecutionStatus.FAILURE) {
             return true;
         }
-        if(taskInstance.getId() == 0 ||
-                taskInstance.getMaxRetryTimes() ==0 ||
-                taskInstance.getRetryInterval() == 0 ){
+        if (taskInstance.getId() == 0
+                ||
+                taskInstance.getMaxRetryTimes() == 0
+                ||
+                taskInstance.getRetryInterval() == 0) {
             return true;
         }
         Date now = new Date();
@@ -1073,62 +1082,64 @@ public class MasterExecThread implements Runnable {
     /**
      * handling the list of tasks to be submitted
      */
-    private void submitStandByTask(){
-        for(Map.Entry<String, TaskInstance> entry: 
readyToSubmitTaskList.entrySet()) {
+    private void submitStandByTask() {
+        for (Map.Entry<String, TaskInstance> entry : 
readyToSubmitTaskList.entrySet()) {
             TaskInstance task = entry.getValue();
             DependResult dependResult = getDependResultForTask(task);
-            if(DependResult.SUCCESS == dependResult){
-                if(retryTaskIntervalOverTime(task)){
+            if (DependResult.SUCCESS == dependResult) {
+                if (retryTaskIntervalOverTime(task)) {
                     submitTaskExec(task);
                     removeTaskFromStandbyList(task);
                 }
-            }else if(DependResult.FAILED == dependResult){
+            } else if (DependResult.FAILED == dependResult) {
                 // if the dependency fails, the current node is not submitted 
and the state changes to failure.
                 dependFailedTask.put(entry.getKey(), task);
                 removeTaskFromStandbyList(task);
-                logger.info("task {},id:{} depend result : {}",task.getName(), 
task.getId(), dependResult);
+                logger.info("task {},id:{} depend result : {}", 
task.getName(), task.getId(), dependResult);
             }
         }
     }
 
     /**
      * get recovery task instance
+     *
      * @param taskId task id
      * @return recovery task instance
      */
-    private TaskInstance getRecoveryTaskInstance(String taskId){
-        if(!StringUtils.isNotEmpty(taskId)){
+    private TaskInstance getRecoveryTaskInstance(String taskId) {
+        if (!StringUtils.isNotEmpty(taskId)) {
             return null;
         }
         try {
             Integer intId = Integer.valueOf(taskId);
             TaskInstance task = processService.findTaskInstanceById(intId);
-            if(task == null){
-                logger.error("start node id cannot be found: {}",  taskId);
-            }else {
+            if (task == null) {
+                logger.error("start node id cannot be found: {}", taskId);
+            } else {
                 return task;
             }
-        }catch (Exception e){
-            logger.error("get recovery task instance failed ",e);
+        } catch (Exception e) {
+            logger.error("get recovery task instance failed ", e);
         }
         return null;
     }
 
     /**
      * get start task instance list
+     *
      * @param cmdParam command param
      * @return task instance list
      */
-    private List<TaskInstance> getStartTaskInstanceList(String cmdParam){
+    private List<TaskInstance> getStartTaskInstanceList(String cmdParam) {
 
         List<TaskInstance> instanceList = new ArrayList<>();
         Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
 
-        if(paramMap != null && 
paramMap.containsKey(CMDPARAM_RECOVERY_START_NODE_STRING)){
-            String[] idList = 
paramMap.get(CMDPARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
-            for(String nodeId : idList){
+        if (paramMap != null && 
paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) {
+            String[] idList = 
paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
+            for (String nodeId : idList) {
                 TaskInstance task = getRecoveryTaskInstance(nodeId);
-                if(task != null){
+                if (task != null) {
                     instanceList.add(task);
                 }
             }
@@ -1138,17 +1149,18 @@ public class MasterExecThread implements Runnable {
 
     /**
      * parse "StartNodeNameList" from cmd param
+     *
      * @param cmdParam command param
      * @return start node name list
      */
-    private List<String> parseStartNodeName(String cmdParam){
+    private List<String> parseStartNodeName(String cmdParam) {
         List<String> startNodeNameList = new ArrayList<>();
         Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
-        if(paramMap == null){
+        if (paramMap == null) {
             return startNodeNameList;
         }
-        if(paramMap.containsKey(CMDPARAM_START_NODE_NAMES)){
-            startNodeNameList = 
Arrays.asList(paramMap.get(CMDPARAM_START_NODE_NAMES).split(Constants.COMMA));
+        if (paramMap.containsKey(CMD_PARAM_START_NODE_NAMES)) {
+            startNodeNameList = 
Arrays.asList(paramMap.get(CMD_PARAM_START_NODE_NAMES).split(Constants.COMMA));
         }
         return startNodeNameList;
     }
@@ -1156,11 +1168,12 @@ public class MasterExecThread implements Runnable {
     /**
      * generate start node name list from parsing command param;
      * if "StartNodeIdList" exists in command param, return StartNodeIdList
+     *
      * @return recovery node name list
      */
-    private List<String> getRecoveryNodeNameList(){
+    private List<String> getRecoveryNodeNameList() {
         List<String> recoveryNodeNameList = new ArrayList<>();
-        if(CollectionUtils.isNotEmpty(recoverNodeIdList)) {
+        if (CollectionUtils.isNotEmpty(recoverNodeIdList)) {
             for (TaskInstance task : recoverNodeIdList) {
                 recoveryNodeNameList.add(task.getName());
             }
@@ -1170,17 +1183,18 @@ public class MasterExecThread implements Runnable {
 
     /**
      * generate flow dag
+     *
      * @param processDefinitionJson process definition json
-     * @param startNodeNameList     start node name list
-     * @param recoveryNodeNameList  recovery node name list
-     * @param depNodeType           depend node type
+     * @param startNodeNameList start node name list
+     * @param recoveryNodeNameList recovery node name list
+     * @param depNodeType depend node type
      * @return ProcessDag           process dag
-     * @throws Exception            exception
+     * @throws Exception exception
      */
     public ProcessDag generateFlowDag(String processDefinitionJson,
                                       List<String> startNodeNameList,
                                       List<String> recoveryNodeNameList,
-                                      TaskDependType depNodeType)throws 
Exception{
+                                      TaskDependType depNodeType) throws 
Exception {
         return DagHelper.generateFlowDag(processDefinitionJson, 
startNodeNameList, recoveryNodeNameList, depNodeType);
     }
 }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
index bf1e7e2..6979a93 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
@@ -14,19 +14,41 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.master;
 
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.common.enums.*;
+import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.runner.MasterExecThread;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.text.ParseException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,15 +58,6 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.springframework.context.ApplicationContext;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.text.ParseException;
-import java.util.*;
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.mock;
 
 /**
  * test for MasterExecThread
@@ -66,7 +79,7 @@ public class MasterExecThreadTest {
     private ApplicationContext applicationContext;
 
     @Before
-    public void init() throws Exception{
+    public void init() throws Exception {
         processService = mock(ProcessService.class);
 
         applicationContext = mock(ApplicationContext.class);
@@ -92,7 +105,7 @@ public class MasterExecThreadTest {
         masterExecThread = PowerMockito.spy(new MasterExecThread(
                 processInstance
                 , processService
-                ,null, null, config));
+                , null, null, config));
         // prepareProcess init dag
         Field dag = MasterExecThread.class.getDeclaredField("dag");
         dag.setAccessible(true);
@@ -106,18 +119,17 @@ public class MasterExecThreadTest {
 
     /**
      * without schedule
-     * @throws ParseException
      */
     @Test
     public void testParallelWithOutSchedule() throws ParseException {
-        try{
+        try {
             
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
             Method method = 
MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
             method.setAccessible(true);
             method.invoke(masterExecThread);
             // one create save, and 1-30 for next save, and last day 20 no save
             verify(processService, 
times(20)).saveProcessInstance(processInstance);
-        }catch (Exception e){
+        } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
         }
@@ -125,27 +137,86 @@ public class MasterExecThreadTest {
 
     /**
      * with schedule
-     * @throws ParseException
      */
     @Test
-    public void testParallelWithSchedule() throws ParseException {
-        try{
+    public void testParallelWithSchedule() {
+        try {
             
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
             Method method = 
MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
             method.setAccessible(true);
             method.invoke(masterExecThread);
             // one create save, and 9(1 to 20 step 2) for next save, and last 
day 31 no save
             verify(processService, 
times(9)).saveProcessInstance(processInstance);
-        }catch (Exception e){
+        } catch (Exception e) {
+            Assert.fail();
+        }
+    }
+
+    @Test
+    public void testParseStartNodeName() throws ParseException {
+        try {
+            Map<String, String> cmdParam = new HashMap<>();
+            cmdParam.put(CMD_PARAM_START_NODE_NAMES, "t1,t2,t3");
+            
Mockito.when(processInstance.getCommandParam()).thenReturn(JSONUtils.toJsonString(cmdParam));
+            Class<MasterExecThread> masterExecThreadClass = 
MasterExecThread.class;
+            Method method = 
masterExecThreadClass.getDeclaredMethod("parseStartNodeName", String.class);
+            method.setAccessible(true);
+            List<String> nodeNames = (List<String>) 
method.invoke(masterExecThread, JSONUtils.toJsonString(cmdParam));
+            Assert.assertEquals(3, nodeNames.size());
+        } catch (Exception e) {
+            Assert.fail();
+        }
+    }
+
+    @Test
+    public void testRetryTaskIntervalOverTime() {
+        try {
+            TaskInstance taskInstance = new TaskInstance();
+            taskInstance.setId(0);
+            taskInstance.setMaxRetryTimes(0);
+            taskInstance.setRetryInterval(0);
+            taskInstance.setState(ExecutionStatus.FAILURE);
+            Class<MasterExecThread> masterExecThreadClass = 
MasterExecThread.class;
+            Method method = 
masterExecThreadClass.getDeclaredMethod("retryTaskIntervalOverTime", 
TaskInstance.class);
+            method.setAccessible(true);
+            Assert.assertTrue((Boolean) method.invoke(masterExecThread, 
taskInstance));
+        } catch (Exception e) {
+            Assert.fail();
+        }
+    }
+
+    @Test
+    public void testGetStartTaskInstanceList() {
+        try {
+            TaskInstance taskInstance1 = new TaskInstance();
+            taskInstance1.setId(1);
+            TaskInstance taskInstance2 = new TaskInstance();
+            taskInstance2.setId(2);
+            TaskInstance taskInstance3 = new TaskInstance();
+            taskInstance3.setId(3);
+            TaskInstance taskInstance4 = new TaskInstance();
+            taskInstance4.setId(4);
+            Map<String, String> cmdParam = new HashMap<>();
+            cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4");
+            
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance1);
+            
Mockito.when(processService.findTaskInstanceById(2)).thenReturn(taskInstance2);
+            
Mockito.when(processService.findTaskInstanceById(3)).thenReturn(taskInstance3);
+            
Mockito.when(processService.findTaskInstanceById(4)).thenReturn(taskInstance4);
+            Class<MasterExecThread> masterExecThreadClass = 
MasterExecThread.class;
+            Method method = 
masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", 
String.class);
+            method.setAccessible(true);
+            List<TaskInstance> taskInstances = (List<TaskInstance>) 
method.invoke(masterExecThread, JSONUtils.toJsonString(cmdParam));
+            Assert.assertEquals(4, taskInstances.size());
+        } catch (Exception e) {
             Assert.fail();
         }
     }
 
-    private List<Schedule> zeroSchedulerList(){
+    private List<Schedule> zeroSchedulerList() {
         return Collections.EMPTY_LIST;
     }
 
-    private List<Schedule> oneSchedulerList(){
+    private List<Schedule> oneSchedulerList() {
         List<Schedule> schedulerList = new LinkedList<>();
         Schedule schedule = new Schedule();
         schedule.setCrontab("0 0 0 1/2 * ?");
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 38f9573..6262985 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.service.process;
 
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_EMPTY_SUB_PROCESS;
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING;
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS;
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID;
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
 import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
 
 import static java.util.stream.Collectors.toSet;
@@ -266,14 +266,14 @@ public class ProcessService {
 
         if (cmdTypeMap.containsKey(commandType)) {
             ObjectNode cmdParamObj = 
JSONUtils.parseObject(command.getCommandParam());
-            int processInstanceId = 
cmdParamObj.path(CMDPARAM_RECOVER_PROCESS_ID_STRING).asInt();
+            int processInstanceId = 
cmdParamObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt();
 
             List<Command> commands = commandMapper.selectList(null);
             // for all commands
             for (Command tmpCommand : commands) {
                 if (cmdTypeMap.containsKey(tmpCommand.getCommandType())) {
                     ObjectNode tempObj = 
JSONUtils.parseObject(tmpCommand.getCommandParam());
-                    if (tempObj != null && processInstanceId == 
tempObj.path(CMDPARAM_RECOVER_PROCESS_ID_STRING).asInt()) {
+                    if (tempObj != null && processInstanceId == 
tempObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt()) {
                         isNeedCreate = false;
                         break;
                     }
@@ -439,7 +439,7 @@ public class ProcessService {
             for (TaskNode taskNode : taskNodeList) {
                 String parameter = taskNode.getParams();
                 ObjectNode parameterJson = JSONUtils.parseObject(parameter);
-                if (parameterJson.get(CMDPARAM_SUB_PROCESS_DEFINE_ID) != null) 
{
+                if (parameterJson.get(CMD_PARAM_SUB_PROCESS_DEFINE_ID) != 
null) {
                     SubProcessParameters subProcessParam = 
JSONUtils.parseObject(parameter, SubProcessParameters.class);
                     ids.add(subProcessParam.getProcessDefinitionId());
                     
recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(), ids);
@@ -468,7 +468,7 @@ public class ProcessService {
             return;
         }
         Map<String, String> cmdParam = new HashMap<>();
-        cmdParam.put(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD, 
String.valueOf(processInstance.getId()));
+        cmdParam.put(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD, 
String.valueOf(processInstance.getId()));
         // process instance quit by "waiting thread" state
         if (originCommand == null) {
             Command command = new Command(
@@ -613,8 +613,8 @@ public class ProcessService {
     private Boolean checkCmdParam(Command command, Map<String, String> 
cmdParam) {
         if (command.getTaskDependType() == TaskDependType.TASK_ONLY || 
command.getTaskDependType() == TaskDependType.TASK_PRE) {
             if (cmdParam == null
-                || !cmdParam.containsKey(Constants.CMDPARAM_START_NODE_NAMES)
-                || 
cmdParam.get(Constants.CMDPARAM_START_NODE_NAMES).isEmpty()) {
+                || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES)
+                || 
cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) {
                 logger.error("command node depend type is {}, but start nodes 
is null ", command.getTaskDependType());
                 return false;
             }
@@ -647,20 +647,20 @@ public class ProcessService {
         if (cmdParam != null) {
             Integer processInstanceId = 0;
             // recover from failure or pause tasks
-            if 
(cmdParam.containsKey(Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING)) {
-                String processId = 
cmdParam.get(Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING);
+            if 
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
+                String processId = 
cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING);
                 processInstanceId = Integer.parseInt(processId);
                 if (processInstanceId == 0) {
                     logger.error("command parameter is error, [ 
ProcessInstanceId ] is 0");
                     return null;
                 }
-            } else if (cmdParam.containsKey(Constants.CMDPARAM_SUB_PROCESS)) {
+            } else if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
                 // sub process map
-                String pId = cmdParam.get(Constants.CMDPARAM_SUB_PROCESS);
+                String pId = cmdParam.get(Constants.CMD_PARAM_SUB_PROCESS);
                 processInstanceId = Integer.parseInt(pId);
-            } else if 
(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD)) {
+            } else if 
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD)) {
                 // waiting thread command
-                String pId = 
cmdParam.get(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD);
+                String pId = 
cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD);
                 processInstanceId = Integer.parseInt(pId);
             }
             if (processInstanceId == 0) {
@@ -681,7 +681,7 @@ public class ProcessService {
                 }
             }
             // reset command parameter if sub process
-            if (cmdParam.containsKey(Constants.CMDPARAM_SUB_PROCESS)) {
+            if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
                 processInstance.setCommandParam(command.getCommandParam());
             }
         } else {
@@ -708,14 +708,14 @@ public class ProcessService {
                 List<Integer> failedList = 
this.findTaskIdByInstanceState(processInstance.getId(), 
ExecutionStatus.FAILURE);
                 List<Integer> toleranceList = 
this.findTaskIdByInstanceState(processInstance.getId(), 
ExecutionStatus.NEED_FAULT_TOLERANCE);
                 List<Integer> killedList = 
this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.KILL);
-                cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
+                
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
 
                 failedList.addAll(killedList);
                 failedList.addAll(toleranceList);
                 for (Integer taskId : failedList) {
                     initTaskInstance(this.findTaskInstanceById(taskId));
                 }
-                cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING,
+                cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
                     String.join(Constants.COMMA, 
convertIntListToString(failedList)));
                 
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 processInstance.setRunTimes(runTime + 1);
@@ -726,7 +726,7 @@ public class ProcessService {
                 break;
             case RECOVER_SUSPENDED_PROCESS:
                 // find pause tasks and init task's state
-                cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
+                
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
                 List<Integer> suspendedNodeList = 
this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
                 List<Integer> stopNodeList = 
findTaskIdByInstanceState(processInstance.getId(),
                     ExecutionStatus.KILL);
@@ -735,7 +735,7 @@ public class ProcessService {
                     // initialize the pause state
                     initTaskInstance(this.findTaskInstanceById(taskId));
                 }
-                cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING, 
String.join(",", convertIntListToString(suspendedNodeList)));
+                cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING, 
String.join(",", convertIntListToString(suspendedNodeList)));
                 
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 processInstance.setRunTimes(runTime + 1);
                 break;
@@ -755,8 +755,8 @@ public class ProcessService {
                 break;
             case REPEAT_RUNNING:
                 // delete the recover task names from command parameter
-                if 
(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)) {
-                    
cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
+                if 
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
+                    
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
                     
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 }
                 // delete all the valid tasks when repeat running
@@ -835,16 +835,16 @@ public class ProcessService {
         }
         Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
         // write sub process id into cmd param.
-        if (paramMap.containsKey(CMDPARAM_SUB_PROCESS)
-            && 
CMDPARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMDPARAM_SUB_PROCESS))) {
-            paramMap.remove(CMDPARAM_SUB_PROCESS);
-            paramMap.put(CMDPARAM_SUB_PROCESS, 
String.valueOf(subProcessInstance.getId()));
+        if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS)
+            && 
CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
+            paramMap.remove(CMD_PARAM_SUB_PROCESS);
+            paramMap.put(CMD_PARAM_SUB_PROCESS, 
String.valueOf(subProcessInstance.getId()));
             
subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
             subProcessInstance.setIsSubProcess(Flag.YES);
             this.saveProcessInstance(subProcessInstance);
         }
         // copy parent instance user def params to sub process..
-        String parentInstanceId = 
paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
+        String parentInstanceId = 
paramMap.get(CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
         if (StringUtils.isNotEmpty(parentInstanceId)) {
             ProcessInstance parentInstance = 
findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
             if (parentInstance != null) {
@@ -1058,7 +1058,7 @@ public class ProcessService {
         CommandType commandType = getSubCommandType(parentProcessInstance, 
childInstance);
         TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), 
TaskNode.class);
         Map<String, String> subProcessParam = 
JSONUtils.toMap(taskNode.getParams());
-        Integer childDefineId = 
Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
+        Integer childDefineId = 
Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
         String processParam = getSubWorkFlowParam(instanceMap, 
parentProcessInstance);
 
         return new Command(
@@ -1443,7 +1443,7 @@ public class ProcessService {
      * @return create process instance result
      */
     public int createWorkProcessInstanceMap(ProcessInstanceMap 
processInstanceMap) {
-        Integer count = 0;
+        int count = 0;
         if (processInstanceMap != null) {
             return processInstanceMapMapper.insert(processInstanceMap);
         }
@@ -1647,7 +1647,7 @@ public class ProcessService {
         //2 insert into recover command
         Command cmd = new Command();
         cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId());
-        cmd.setCommandParam(String.format("{\"%s\":%d}", 
Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
+        cmd.setCommandParam(String.format("{\"%s\":%d}", 
Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
         cmd.setExecutorId(processInstance.getExecutorId());
         cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
         createCommand(cmd);
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 74b52bb..4ac91f0 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -17,30 +17,75 @@
 
 package org.apache.dolphinscheduler.service.process;
 
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.UserMapper;
+import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.JsonNode;
 
 /**
  * process service test
  */
+@RunWith(MockitoJUnitRunner.class)
 public class ProcessServiceTest {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(CronUtilsTest.class);
+
+    @InjectMocks
+    private ProcessService processService;
+
+
+    @Mock
+    private CommandMapper commandMapper;
+
+
+    @Mock
+    private ErrorCommandMapper errorCommandMapper;
+
+    @Mock
+    private ProcessDefinitionMapper processDefineMapper;
+    @Mock
+    private ProcessInstanceMapper processInstanceMapper;
+    @Mock
+    private UserMapper userMapper;
+    @Mock
+    TaskInstanceMapper taskInstanceMapper;
+
     @Test
     public void testCreateSubCommand() {
         ProcessService processService = new ProcessService();
@@ -89,7 +134,7 @@ public class ProcessServiceTest {
         String endString = "2020-01-10 00:00:00";
         parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
         
parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
-        Map<String,String> complementMap = new HashMap<>();
+        Map<String, String> complementMap = new HashMap<>();
         complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, 
startString);
         complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, 
endString);
         parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
@@ -113,4 +158,168 @@ public class ProcessServiceTest {
         );
         Assert.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, 
command.getCommandType());
     }
+
+    @Test
+    public void testVerifyIsNeedCreateCommand() {
+
+        List<Command> commands = new ArrayList<>();
+
+        Command command = new Command();
+        command.setCommandType(CommandType.REPEAT_RUNNING);
+        command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + 
"\":\"111\"}");
+        commands.add(command);
+        Mockito.when(commandMapper.selectList(null)).thenReturn(commands);
+        Assert.assertFalse(processService.verifyIsNeedCreateCommand(command));
+
+        Command command1 = new Command();
+        command1.setCommandType(CommandType.REPEAT_RUNNING);
+        command1.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + 
"\":\"222\"}");
+        Assert.assertTrue(processService.verifyIsNeedCreateCommand(command1));
+
+        Command command2 = new Command();
+        command2.setCommandType(CommandType.PAUSE);
+        Assert.assertTrue(processService.verifyIsNeedCreateCommand(command2));
+    }
+
+    @Test
+    public void testCreateRecoveryWaitingThreadCommand() {
+
+        int id = 123;
+        Mockito.when(commandMapper.deleteById(id)).thenReturn(1);
+        ProcessInstance subProcessInstance = new ProcessInstance();
+        subProcessInstance.setIsSubProcess(Flag.YES);
+        Command originCommand = new Command();
+        originCommand.setId(id);
+        processService.createRecoveryWaitingThreadCommand(originCommand, 
subProcessInstance);
+
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(111);
+        processService.createRecoveryWaitingThreadCommand(null, 
subProcessInstance);
+
+        Command recoverCommand = new Command();
+        recoverCommand.setCommandType(CommandType.RECOVER_WAITTING_THREAD);
+        processService.createRecoveryWaitingThreadCommand(recoverCommand, 
subProcessInstance);
+
+        Command repeatRunningCommand = new Command();
+        recoverCommand.setCommandType(CommandType.REPEAT_RUNNING);
+        
processService.createRecoveryWaitingThreadCommand(repeatRunningCommand, 
subProcessInstance);
+
+        ProcessInstance subProcessInstance2 = new ProcessInstance();
+        subProcessInstance2.setId(111);
+        subProcessInstance2.setIsSubProcess(Flag.NO);
+        
processService.createRecoveryWaitingThreadCommand(repeatRunningCommand, 
subProcessInstance2);
+
+    }
+
+    @Test
+    public void testHandleCommand() {
+
+        //cannot construct process instance, return null;
+        String host = "127.0.0.1";
+        int validThreadNum = 1;
+        Command command = new Command();
+        command.setProcessDefinitionId(222);
+        command.setCommandType(CommandType.REPEAT_RUNNING);
+        command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + 
"\":\"111\",\""
+                + CMD_PARAM_SUB_PROCESS_DEFINE_ID + "\":\"222\"}");
+        
Mockito.when(processDefineMapper.selectById(command.getProcessDefinitionId())).thenReturn(null);
+        Assert.assertNull(processService.handleCommand(logger, host, 
validThreadNum, command));
+
+        //there is not enough thread for this command
+        Command command1 = new Command();
+        command1.setProcessDefinitionId(123);
+        command1.setCommandParam("{\"ProcessInstanceId\":222}");
+        command1.setCommandType(CommandType.START_PROCESS);
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setId(123);
+        processDefinition.setName("test");
+        processDefinition.setVersion(1);
+        
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+                + 
"{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+                + 
",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+                + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo 
\\\"123123\\\"\",\"resourceList\":[]}"
+                + 
",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+                + 
",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+                + 
",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(222);
+        
Mockito.when(processDefineMapper.selectById(command1.getProcessDefinitionId())).thenReturn(processDefinition);
+        
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
+        Assert.assertNotNull(processService.handleCommand(logger, host, 
validThreadNum, command1));
+
+        Command command2 = new Command();
+        
command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
+        command2.setProcessDefinitionId(123);
+        command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
+
+        Assert.assertNotNull(processService.handleCommand(logger, host, 
validThreadNum, command2));
+
+        Command command3 = new Command();
+        command3.setProcessDefinitionId(123);
+        command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
+        command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+        Assert.assertNotNull(processService.handleCommand(logger, host, 
validThreadNum, command3));
+
+        Command command4 = new Command();
+        command4.setProcessDefinitionId(123);
+        
command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
+        command4.setCommandType(CommandType.REPEAT_RUNNING);
+        Assert.assertNotNull(processService.handleCommand(logger, host, 
validThreadNum, command4));
+    }
+
+    @Test
+    public void testGetUserById() {
+        User user = new User();
+        user.setId(123);
+        Mockito.when(userMapper.selectById(123)).thenReturn(user);
+        Assert.assertEquals(user, processService.getUserById(123));
+    }
+
+    @Test
+    public void testFormatTaskAppId() {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(333);
+        taskInstance.setProcessDefinitionId(111);
+        taskInstance.setProcessInstanceId(222);
+        
Mockito.when(processService.findProcessDefineById(taskInstance.getProcessDefinitionId())).thenReturn(null);
+        
Mockito.when(processService.findProcessInstanceById(taskInstance.getProcessInstanceId())).thenReturn(null);
+        Assert.assertEquals("", processService.formatTaskAppId(taskInstance));
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setId(111);
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(222);
+        
Mockito.when(processService.findProcessDefineById(taskInstance.getProcessDefinitionId())).thenReturn(processDefinition);
+        
Mockito.when(processService.findProcessInstanceById(taskInstance.getProcessInstanceId())).thenReturn(processInstance);
+        Assert.assertEquals("111_222_333", 
processService.formatTaskAppId(taskInstance));
+
+    }
+
+    @Test
+    public void testRecurseFindSubProcessId() {
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+                + 
"{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\""
+                + 
",\"dependence\":{},\"description\":\"\",\"id\":\"tasks-76544\""
+                + 
",\"maxRetryTimes\":\"0\",\"name\":\"test\",\"params\":{\"localParams\":[],"
+                + "\"rawScript\":\"echo 
\\\"123123\\\"\",\"resourceList\":[],\"processDefinitionId\""
+                + 
":\"222\"},\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\","
+                + 
"\"taskInstancePriority\":\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":"
+                + 
"null,\"strategy\":\"\"},\"type\":\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],"
+                + "\"tenantId\":4,\"timeout\":0}");
+        int parentId = 111;
+        List<Integer> ids = new ArrayList<>();
+        ProcessDefinition processDefinition2 = new ProcessDefinition();
+        
processDefinition2.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\""
+                + 
":{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{},"
+                + 
"\"description\":\"\",\"id\":\"tasks-76544\",\"maxRetryTimes\":\"0\",\"name\":\"test\","
+                + "\"params\":{\"localParams\":[],\"rawScript\":\"echo 
\\\"123123\\\"\",\"resourceList\":[]},"
+                + 
"\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":"
+                + 
"\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":"
+                + 
"\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
+        
Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition);
+        
Mockito.when(processDefineMapper.selectById(222)).thenReturn(processDefinition2);
+        processService.recurseFindSubProcessId(parentId, ids);
+
+    }
 }

Reply via email to