This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new dc8b87e  [Feature-#6268][server-master] Serial execte process (#6185)
dc8b87e is described below

commit dc8b87e473886723cba02b064bcde6d7947891d6
Author: wangxj3 <[email protected]>
AuthorDate: Fri Nov 5 17:25:45 2021 +0800

    [Feature-#6268][server-master] Serial execte process (#6185)
    
    * add serial processInstance
    
    * fix bug
    
    * add test
    
    * fix code style
    
    * fix style code
    
    * add sql
    
    * fix sql error
    
    * add api
    
    * add test
    
    * fix code style
    
    * modify api
    
    * delete column , fix mapper
    
    * fix unimport
    
    * fix test
    
    * fix bug of missing param for Python
    
    * fix code style
    
    * fix test
    
    * fix test
    
    Co-authored-by: wangxj <wangxj31>
---
 .../controller/ProcessDefinitionController.java    |   9 +-
 .../api/service/ProcessDefinitionService.java      |   7 +-
 .../service/impl/ProcessDefinitionServiceImpl.java | 168 +++++++++++----------
 .../ProcessDefinitionControllerTest.java           |   9 +-
 .../api/service/ProcessDefinitionServiceTest.java  |   3 +-
 .../apache/dolphinscheduler/common/Constants.java  |   6 +
 .../dolphinscheduler/common/enums/CommandType.java |   4 +-
 .../common/enums/ExecutionStatus.java              |   3 +-
 .../common/enums/ProcessExecutionTypeEnum.java     |  79 ++++++++++
 .../dao/entity/ProcessDefinition.java              |  18 ++-
 .../dao/entity/ProcessDefinitionLog.java           |   2 +
 .../dao/entity/ProcessInstance.java                |  11 ++
 .../dao/mapper/ProcessInstanceMapper.java          |   7 +
 .../dao/mapper/ProcessDefinitionLogMapper.xml      |   4 +-
 .../dao/mapper/ProcessDefinitionMapper.xml         |  10 +-
 .../dao/mapper/ProcessInstanceMapMapper.xml        |   2 +-
 .../dao/mapper/ProcessInstanceMapper.xml           |  31 +++-
 .../pydolphinscheduler/core/process_definition.py  |   1 +
 .../server/PythonGatewayServer.java                |   8 +-
 .../master/runner/WorkflowExecuteThread.java       |  29 ++++
 .../server/master/WorkflowExecuteThreadTest.java   |  35 ++++-
 .../service/process/ProcessService.java            |  77 +++++++++-
 .../service/process/ProcessServiceTest.java        |  83 +++++++++-
 sql/dolphinscheduler_h2.sql                        |   3 +
 sql/dolphinscheduler_mysql.sql                     |   3 +
 sql/dolphinscheduler_postgre.sql                   |   3 +
 26 files changed, 501 insertions(+), 114 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
index d6cb1ba..6257918 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
@@ -41,6 +41,7 @@ import 
org.apache.dolphinscheduler.api.exceptions.ApiException;
 import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -126,9 +127,10 @@ public class ProcessDefinitionController extends 
BaseController {
                                           @RequestParam(value = "timeout", 
required = false, defaultValue = "0") int timeout,
                                           @RequestParam(value = "tenantCode", 
required = true) String tenantCode,
                                           @RequestParam(value = 
"taskRelationJson", required = true) String taskRelationJson,
-                                          @RequestParam(value = 
"taskDefinitionJson", required = true) String taskDefinitionJson) {
+                                          @RequestParam(value = 
"taskDefinitionJson", required = true) String taskDefinitionJson,
+                                          @RequestParam(value = 
"executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum 
executionType) {
         Map<String, Object> result = 
processDefinitionService.createProcessDefinition(loginUser, projectCode, name, 
description, globalParams,
-            locations, timeout, tenantCode, taskRelationJson, 
taskDefinitionJson);
+            locations, timeout, tenantCode, taskRelationJson, 
taskDefinitionJson,executionType);
         return returnDataList(result);
     }
 
@@ -244,10 +246,11 @@ public class ProcessDefinitionController extends 
BaseController {
                                           @RequestParam(value = "tenantCode", 
required = true) String tenantCode,
                                           @RequestParam(value = 
"taskRelationJson", required = true) String taskRelationJson,
                                           @RequestParam(value = 
"taskDefinitionJson", required = true) String taskDefinitionJson,
+                                          @RequestParam(value = 
"executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum 
executionType,
                                           @RequestParam(value = 
"releaseState", required = false, defaultValue = "OFFLINE") ReleaseState 
releaseState) {
 
         Map<String, Object> result = 
processDefinitionService.updateProcessDefinition(loginUser, projectCode, name, 
code, description, globalParams,
-            locations, timeout, tenantCode, taskRelationJson, 
taskDefinitionJson);
+            locations, timeout, tenantCode, taskRelationJson, 
taskDefinitionJson,executionType);
         //  If the update fails, the result will be returned directly
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return returnDataList(result);
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 42fce02..723717d 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.api.service;
 
 import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.dao.entity.User;
 
@@ -56,7 +57,8 @@ public interface ProcessDefinitionService {
                                                 int timeout,
                                                 String tenantCode,
                                                 String taskRelationJson,
-                                                String taskDefinitionJson);
+                                                String taskDefinitionJson,
+                                                ProcessExecutionTypeEnum 
executionType);
 
     /**
      * query process definition list
@@ -164,7 +166,8 @@ public interface ProcessDefinitionService {
                                                 int timeout,
                                                 String tenantCode,
                                                 String taskRelationJson,
-                                                String taskDefinitionJson);
+                                                String taskDefinitionJson,
+                                                ProcessExecutionTypeEnum 
executionType);
 
     /**
      * verify process definition name unique
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index d4aa93e..2fdec30 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.graph.DAG;
@@ -72,8 +73,6 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.service.permission.PermissionCheck;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import org.apache.commons.lang.StringUtils;
-
 import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -163,15 +162,15 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * create process definition
      *
-     * @param loginUser login user
-     * @param projectCode project code
-     * @param name process definition name
-     * @param description description
-     * @param globalParams global params
-     * @param locations locations for nodes
-     * @param timeout timeout
-     * @param tenantCode tenantCode
-     * @param taskRelationJson relation json for nodes
+     * @param loginUser          login user
+     * @param projectCode        project code
+     * @param name               process definition name
+     * @param description        description
+     * @param globalParams       global params
+     * @param locations          locations for nodes
+     * @param timeout            timeout
+     * @param tenantCode         tenantCode
+     * @param taskRelationJson   relation json for nodes
      * @param taskDefinitionJson taskDefinitionJson
      * @return create result code
      */
@@ -186,7 +185,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                                                        int timeout,
                                                        String tenantCode,
                                                        String taskRelationJson,
-                                                       String 
taskDefinitionJson) {
+                                                       String 
taskDefinitionJson,
+                                                       
ProcessExecutionTypeEnum executionType) {
         Project project = projectMapper.queryByCode(projectCode);
         //check user access for project
         Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode);
@@ -227,7 +227,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             return result;
         }
         ProcessDefinition processDefinition = new 
ProcessDefinition(projectCode, name, processDefinitionCode, description,
-            globalParams, locations, timeout, loginUser.getId(), tenantId);
+                globalParams, locations, timeout, loginUser.getId(), tenantId);
+        processDefinition.setExecutionType(executionType);
+
         return createDagDefine(loginUser, taskRelationList, processDefinition, 
taskDefinitionLogs);
     }
 
@@ -292,8 +294,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                 return result;
             }
             List<ProcessTaskRelation> processTaskRelations = 
taskRelationList.stream()
-                .map(processTaskRelationLog -> 
JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), 
ProcessTaskRelation.class))
-                .collect(Collectors.toList());
+                    .map(processTaskRelationLog -> 
JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), 
ProcessTaskRelation.class))
+                    .collect(Collectors.toList());
             List<TaskNode> taskNodeList = 
processService.transformTask(processTaskRelations, taskDefinitionLogs);
             if (taskNodeList.size() != taskRelationList.size()) {
                 Set<Long> postTaskCodes = 
taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet());
@@ -301,7 +303,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                 Collection<Long> codes = 
CollectionUtils.subtract(postTaskCodes, taskNodeCodes);
                 if (CollectionUtils.isNotEmpty(codes)) {
                     logger.error("the task code is not exit");
-                    putMsg(result, Status.TASK_DEFINE_NOT_EXIST, 
StringUtils.join(codes, Constants.COMMA));
+                    putMsg(result, Status.TASK_DEFINE_NOT_EXIST, 
org.apache.commons.lang.StringUtils.join(codes, Constants.COMMA));
                     return result;
                 }
             }
@@ -330,7 +332,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * query process definition list
      *
-     * @param loginUser login user
+     * @param loginUser   login user
      * @param projectCode project code
      * @return definition list
      */
@@ -352,12 +354,12 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * query process definition list paging
      *
-     * @param loginUser login user
+     * @param loginUser   login user
      * @param projectCode project code
-     * @param searchVal search value
-     * @param userId user id
-     * @param pageNo page number
-     * @param pageSize page size
+     * @param searchVal   search value
+     * @param userId      user id
+     * @param pageNo      page number
+     * @param pageSize    page size
      * @return process definition page
      */
     @Override
@@ -374,7 +376,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
 
         Page<ProcessDefinition> page = new Page<>(pageNo, pageSize);
         IPage<ProcessDefinition> processDefinitionIPage = 
processDefinitionMapper.queryDefineListPaging(
-            page, searchVal, userId, project.getCode(), isAdmin(loginUser));
+                page, searchVal, userId, project.getCode(), 
isAdmin(loginUser));
 
         List<ProcessDefinition> records = processDefinitionIPage.getRecords();
         for (ProcessDefinition pd : records) {
@@ -395,9 +397,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * query detail of process definition
      *
-     * @param loginUser login user
+     * @param loginUser   login user
      * @param projectCode project code
-     * @param code process definition code
+     * @param code        process definition code
      * @return process definition detail
      */
     @Override
@@ -447,16 +449,16 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * update  process definition
      *
-     * @param loginUser login user
-     * @param projectCode project code
-     * @param name process definition name
-     * @param code process definition code
-     * @param description description
-     * @param globalParams global params
-     * @param locations locations for nodes
-     * @param timeout timeout
-     * @param tenantCode tenantCode
-     * @param taskRelationJson relation json for nodes
+     * @param loginUser          login user
+     * @param projectCode        project code
+     * @param name               process definition name
+     * @param code               process definition code
+     * @param description        description
+     * @param globalParams       global params
+     * @param locations          locations for nodes
+     * @param timeout            timeout
+     * @param tenantCode         tenantCode
+     * @param taskRelationJson   relation json for nodes
      * @param taskDefinitionJson taskDefinitionJson
      * @return update result code
      */
@@ -472,7 +474,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                                                        int timeout,
                                                        String tenantCode,
                                                        String taskRelationJson,
-                                                       String 
taskDefinitionJson) {
+                                                       String 
taskDefinitionJson,
+                                                       
ProcessExecutionTypeEnum executionType) {
         Project project = projectMapper.queryByCode(projectCode);
         //check user access for project
         Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode);
@@ -522,6 +525,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         }
         ProcessDefinition processDefinitionDeepCopy = 
JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), 
ProcessDefinition.class);
         processDefinition.set(projectCode, name, description, globalParams, 
locations, timeout, tenantId);
+        processDefinition.setExecutionType(executionType);
         return updateDagDefine(loginUser, taskRelationList, processDefinition, 
processDefinitionDeepCopy, taskDefinitionLogs);
     }
 
@@ -551,7 +555,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
         int insertResult = processService.saveTaskRelation(loginUser, 
processDefinition.getProjectCode(),
-            processDefinition.getCode(), insertVersion, taskRelationList, 
taskDefinitionLogs);
+                processDefinition.getCode(), insertVersion, taskRelationList, 
taskDefinitionLogs);
         if (insertResult == Constants.EXIT_CODE_SUCCESS) {
             putMsg(result, Status.SUCCESS);
             result.put(Constants.DATA_LIST, processDefinition);
@@ -565,9 +569,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * verify process definition name unique
      *
-     * @param loginUser login user
+     * @param loginUser   login user
      * @param projectCode project code
-     * @param name name
+     * @param name        name
      * @return true if process definition name not exists, otherwise false
      */
     @Override
@@ -590,9 +594,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * delete process definition by code
      *
-     * @param loginUser login user
+     * @param loginUser   login user
      * @param projectCode project code
-     * @param code process definition code
+     * @param code        process definition code
      * @return delete result code
      */
     @Override
@@ -661,9 +665,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * release process definition: online / offline
      *
-     * @param loginUser login user
-     * @param projectCode project code
-     * @param code process definition code
+     * @param loginUser    login user
+     * @param projectCode  project code
+     * @param code         process definition code
      * @param releaseState release state
      * @return release result code
      */
@@ -689,7 +693,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             case ONLINE:
                 // To check resources whether they are already cancel 
authorized or deleted
                 String resourceIds = processDefinition.getResourceIds();
-                if (StringUtils.isNotBlank(resourceIds)) {
+                if 
(org.apache.commons.lang.StringUtils.isNotBlank(resourceIds)) {
                     Integer[] resourceIdArray = 
Arrays.stream(resourceIds.split(Constants.COMMA)).map(Integer::parseInt).toArray(Integer[]::new);
                     PermissionCheck<Integer> permissionCheck = new 
PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService, 
resourceIdArray, loginUser.getId(), logger);
                     try {
@@ -708,7 +712,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                 processDefinition.setReleaseState(releaseState);
                 int updateProcess = 
processDefinitionMapper.updateById(processDefinition);
                 List<Schedule> scheduleList = 
scheduleMapper.selectAllByProcessDefineArray(
-                    new long[]{processDefinition.getCode()}
+                        new long[]{processDefinition.getCode()}
                 );
                 if (updateProcess > 0 && scheduleList.size() == 1) {
                     Schedule schedule = scheduleList.get(0);
@@ -737,7 +741,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
      */
     @Override
     public void batchExportProcessDefinitionByCodes(User loginUser, long 
projectCode, String codes, HttpServletResponse response) {
-        if (StringUtils.isEmpty(codes)) {
+        if (org.apache.commons.lang.StringUtils.isEmpty(codes)) {
             return;
         }
         Project project = projectMapper.queryByCode(projectCode);
@@ -807,9 +811,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * import process definition
      *
-     * @param loginUser login user
+     * @param loginUser   login user
      * @param projectCode project code
-     * @param file process metadata json file
+     * @param file        process metadata json file
      * @return import process
      */
     @Override
@@ -1017,9 +1021,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * get task node details based on process definition
      *
-     * @param loginUser loginUser
+     * @param loginUser   loginUser
      * @param projectCode project code
-     * @param code process definition code
+     * @param code        process definition code
      * @return task node list
      */
     @Override
@@ -1046,9 +1050,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * get task node details map based on process definition
      *
-     * @param loginUser loginUser
+     * @param loginUser   loginUser
      * @param projectCode project code
-     * @param codes define codes
+     * @param codes       define codes
      * @return task node list
      */
     @Override
@@ -1083,7 +1087,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * query process definition all by project code
      *
-     * @param loginUser loginUser
+     * @param loginUser   loginUser
      * @param projectCode project code
      * @return process definitions in the project
      */
@@ -1105,7 +1109,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * Encapsulates the TreeView structure
      *
-     * @param code process definition code
+     * @param code  process definition code
      * @param limit limit
      * @return tree view json data
      */
@@ -1130,7 +1134,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         processInstanceList.forEach(processInstance -> 
processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(),
 processInstance.getEndTime())));
         List<TaskDefinitionLog> taskDefinitionList = 
processService.genTaskDefineList(processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
 processDefinition.getCode()));
         Map<Long, TaskDefinitionLog> taskDefinitionMap = 
taskDefinitionList.stream()
-            .collect(Collectors.toMap(TaskDefinitionLog::getCode, 
taskDefinitionLog -> taskDefinitionLog));
+                .collect(Collectors.toMap(TaskDefinitionLog::getCode, 
taskDefinitionLog -> taskDefinitionLog));
 
         if (limit > processInstanceList.size()) {
             limit = processInstanceList.size();
@@ -1145,8 +1149,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             ProcessInstance processInstance = processInstanceList.get(i);
             Date endTime = processInstance.getEndTime() == null ? new Date() : 
processInstance.getEndTime();
             parentTreeViewDto.getInstances().add(new 
Instance(processInstance.getId(), processInstance.getName(), 
processInstance.getProcessDefinitionCode(),
-                "", processInstance.getState().toString(), 
processInstance.getStartTime(), endTime, processInstance.getHost(),
-                DateUtils.format2Readable(endTime.getTime() - 
processInstance.getStartTime().getTime())));
+                    "", processInstance.getState().toString(), 
processInstance.getStartTime(), endTime, processInstance.getHost(),
+                    DateUtils.format2Readable(endTime.getTime() - 
processInstance.getStartTime().getTime())));
         }
 
         List<TreeViewDto> parentTreeViewDtoList = new ArrayList<>();
@@ -1184,11 +1188,11 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                         if (taskInstance.isSubProcess()) {
                             TaskDefinition taskDefinition = 
taskDefinitionMap.get(taskInstance.getTaskCode());
                             subProcessId = 
Integer.parseInt(JSONUtils.parseObject(
-                                
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
+                                    
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
                         }
                         treeViewDto.getInstances().add(new 
Instance(taskInstance.getId(), taskInstance.getName(), 
taskInstance.getTaskCode(),
-                            taskInstance.getTaskType(), 
taskInstance.getState().toString(), taskInstance.getStartTime(), 
taskInstance.getEndTime(),
-                            taskInstance.getHost(), 
DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), 
subProcessId));
+                                taskInstance.getTaskType(), 
taskInstance.getState().toString(), taskInstance.getStartTime(), 
taskInstance.getEndTime(),
+                                taskInstance.getHost(), 
DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), 
subProcessId));
                     }
                 }
                 for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {
@@ -1249,9 +1253,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * batch copy process definition
      *
-     * @param loginUser loginUser
-     * @param projectCode projectCode
-     * @param codes processDefinitionCodes
+     * @param loginUser         loginUser
+     * @param projectCode       projectCode
+     * @param codes             processDefinitionCodes
      * @param targetProjectCode targetProjectCode
      */
     @Override
@@ -1272,9 +1276,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * batch move process definition
      *
-     * @param loginUser loginUser
-     * @param projectCode projectCode
-     * @param codes processDefinitionCodes
+     * @param loginUser         loginUser
+     * @param projectCode       projectCode
+     * @param codes             processDefinitionCodes
      * @param targetProjectCode targetProjectCode
      */
     @Override
@@ -1307,7 +1311,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             return result;
         }
 
-        if (StringUtils.isEmpty(processDefinitionCodes)) {
+        if 
(org.apache.commons.lang.StringUtils.isEmpty(processDefinitionCodes)) {
             putMsg(result, Status.PROCESS_DEFINITION_CODES_IS_EMPTY, 
processDefinitionCodes);
             return result;
         }
@@ -1337,7 +1341,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         diffCode.forEach(code -> failedProcessList.add(code + "[null]"));
         for (ProcessDefinition processDefinition : processDefinitionList) {
             List<ProcessTaskRelation> processTaskRelations =
-                
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
 processDefinition.getCode());
+                    
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
 processDefinition.getCode());
             List<ProcessTaskRelationLog> taskRelationList = 
processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
             processDefinition.setProjectCode(targetProjectCode);
             if (isCopy) {
@@ -1373,10 +1377,10 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * switch the defined process definition version
      *
-     * @param loginUser login user
+     * @param loginUser   login user
      * @param projectCode project code
-     * @param code process definition code
-     * @param version the version user want to switch
+     * @param code        process definition code
+     * @param version     the version user want to switch
      * @return switch process definition version result code
      */
     @Override
@@ -1412,11 +1416,11 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * check batch operate result
      *
-     * @param srcProjectCode srcProjectCode
+     * @param srcProjectCode    srcProjectCode
      * @param targetProjectCode targetProjectCode
-     * @param result result
+     * @param result            result
      * @param failedProcessList failedProcessList
-     * @param isCopy isCopy
+     * @param isCopy            isCopy
      */
     private void checkBatchOperateResult(long srcProjectCode, long 
targetProjectCode,
                                          Map<String, Object> result, 
List<String> failedProcessList, boolean isCopy) {
@@ -1434,11 +1438,11 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * query the pagination versions info by one certain process definition 
code
      *
-     * @param loginUser login user info to check auth
+     * @param loginUser   login user info to check auth
      * @param projectCode project code
-     * @param pageNo page number
-     * @param pageSize page size
-     * @param code process definition code
+     * @param pageNo      page number
+     * @param pageSize    page size
+     * @param code        process definition code
      * @return the pagination process definition versions info of the certain 
process definition
      */
     @Override
@@ -1468,10 +1472,10 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     /**
      * delete one certain process definition by version number and process 
definition code
      *
-     * @param loginUser login user info to check auth
+     * @param loginUser   login user info to check auth
      * @param projectCode project code
-     * @param code process definition code
-     * @param version version number
+     * @param code        process definition code
+     * @param version     version number
      * @return delete result code
      */
     @Override
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
index 4737c2f..2ac632f 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
@@ -22,6 +22,7 @@ import 
org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -94,10 +95,10 @@ public class ProcessDefinitionControllerTest {
         result.put(Constants.DATA_LIST, 1);
 
         Mockito.when(processDefinitionService.createProcessDefinition(user, 
projectCode, name, description, globalParams,
-                locations, timeout, tenantCode, relationJson, 
taskDefinitionJson)).thenReturn(result);
+                locations, timeout, tenantCode, relationJson, 
taskDefinitionJson, ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result);
 
         Result response = 
processDefinitionController.createProcessDefinition(user, projectCode, name, 
description, globalParams,
-                locations, timeout, tenantCode, relationJson, 
taskDefinitionJson);
+                locations, timeout, tenantCode, relationJson, 
taskDefinitionJson,ProcessExecutionTypeEnum.PARALLEL);
         Assert.assertEquals(Status.SUCCESS.getCode(), 
response.getCode().intValue());
     }
 
@@ -157,10 +158,10 @@ public class ProcessDefinitionControllerTest {
         result.put("processDefinitionId", 1);
 
         Mockito.when(processDefinitionService.updateProcessDefinition(user, 
projectCode, name, code, description, globalParams,
-                locations, timeout, tenantCode, relationJson, 
taskDefinitionJson)).thenReturn(result);
+                locations, timeout, tenantCode, relationJson, 
taskDefinitionJson,ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result);
 
         Result response = 
processDefinitionController.updateProcessDefinition(user, projectCode, name, 
code, description, globalParams,
-                locations, timeout, tenantCode, relationJson, 
taskDefinitionJson, ReleaseState.OFFLINE);
+                locations, timeout, tenantCode, relationJson, 
taskDefinitionJson, ProcessExecutionTypeEnum.PARALLEL, ReleaseState.OFFLINE);
         Assert.assertEquals(Status.SUCCESS.getCode(), 
response.getCode().intValue());
     }
 
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 506bba3..3714de9 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.enums.WarningType;
@@ -610,7 +611,7 @@ public class ProcessDefinitionServiceTest {
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode)).thenReturn(result);
 
         Map<String, Object> updateResult = 
processDefinitionService.updateProcessDefinition(loginUser, projectCode, 
"test", 1,
-                "", "", "", 0, "root", null, null);
+                "", "", "", 0, "root", null, null, 
ProcessExecutionTypeEnum.PARALLEL);
         Assert.assertEquals(Status.DATA_IS_NOT_VALID, 
updateResult.get(Constants.STATUS));
     }
 
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index fc9e36f..fa9a490 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -876,6 +876,12 @@ public final class Constants {
         ExecutionStatus.WAITING_DEPEND.ordinal()
     };
 
+    public static final int[] RUNNING_PROCESS_STATE = new int[] {
+            ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+            ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+            ExecutionStatus.SERIAL_WAIT.ordinal()
+    };
+
     /**
      * status
      */
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index 4f24977..9bff968 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -50,7 +50,9 @@ public enum CommandType {
     REPEAT_RUNNING(7, "repeat running a process"),
     PAUSE(8, "pause a process"),
     STOP(9, "stop a process"),
-    RECOVER_WAITING_THREAD(10, "recover waiting thread");
+    RECOVER_WAITING_THREAD(10, "recover waiting thread"),
+    RECOVER_SERIAL_WAIT(11, "recover serial wait"),
+    ;
 
     CommandType(int code, String descp){
         this.code = code;
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
index 637eab2..99ba93e 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
@@ -56,7 +56,8 @@ public enum ExecutionStatus {
     WAITING_THREAD(10, "waiting thread"),
     WAITING_DEPEND(11, "waiting depend node complete"),
     DELAY_EXECUTION(12, "delay execution"),
-    FORCED_SUCCESS(13, "forced success");
+    FORCED_SUCCESS(13, "forced success"),
+    SERIAL_WAIT(14, "serial wait");
 
     ExecutionStatus(int code, String descp) {
         this.code = code;
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProcessExecutionTypeEnum.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProcessExecutionTypeEnum.java
new file mode 100644
index 0000000..50c46fd
--- /dev/null
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProcessExecutionTypeEnum.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.enums;
+
+import java.util.HashMap;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+
+public enum ProcessExecutionTypeEnum {
+
+    PARALLEL(0, "parallel"),
+    SERIAL_WAIT(1, "serial wait"),
+    SERIAL_DISCARD(2, "serial discard"),
+    SERIAL_PRIORITY(3, "serial priority");
+
+    ProcessExecutionTypeEnum(int code, String descp) {
+        this.code = code;
+        this.descp = descp;
+    }
+
+    @EnumValue
+    private final int code;
+    private final String descp;
+
+    private static HashMap<Integer, ProcessExecutionTypeEnum> 
EXECUTION_STATUS_MAP = new HashMap<>();
+
+    static {
+        for (ProcessExecutionTypeEnum executionType : 
ProcessExecutionTypeEnum.values()) {
+            EXECUTION_STATUS_MAP.put(executionType.code, executionType);
+        }
+    }
+
+    public boolean typeIsSerial() {
+        return this != PARALLEL;
+    }
+
+    public boolean typeIsSerialWait() {
+        return this == SERIAL_WAIT;
+    }
+
+    public boolean typeIsSerialDiscard() {
+        return this == SERIAL_DISCARD;
+    }
+
+    public boolean typeIsSerialPriority() {
+        return this == SERIAL_PRIORITY;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getDescp() {
+        return descp;
+    }
+
+    public static ProcessExecutionTypeEnum of(int executionType) {
+        if (EXECUTION_STATUS_MAP.containsKey(executionType)) {
+            return EXECUTION_STATUS_MAP.get(executionType);
+        }
+        throw new IllegalArgumentException("invalid status : " + 
executionType);
+    }
+
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
index 8e50ce8..2fbcac1 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.dao.entity;
 
 import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -174,8 +175,12 @@ public class ProcessDefinition {
     @TableField(exist = false)
     private int warningGroupId;
 
-    public ProcessDefinition() {
-    }
+    /**
+     * execution type
+     */
+    private ProcessExecutionTypeEnum executionType;
+
+    public ProcessDefinition() { }
 
     public ProcessDefinition(long projectCode,
                              String name,
@@ -412,6 +417,14 @@ public class ProcessDefinition {
         this.warningGroupId = warningGroupId;
     }
 
+    public ProcessExecutionTypeEnum getExecutionType() {
+        return executionType;
+    }
+
+    public void setExecutionType(ProcessExecutionTypeEnum executionType) {
+        this.executionType = executionType;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -430,6 +443,7 @@ public class ProcessDefinition {
             && Objects.equals(description, that.description)
             && Objects.equals(globalParams, that.globalParams)
             && flag == that.flag
+            && executionType == that.executionType
             && Objects.equals(locations, that.locations);
     }
 
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
index 30840e8..ee11ba7 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
@@ -67,6 +67,7 @@ public class ProcessDefinitionLog extends ProcessDefinition {
         this.setModifyBy(processDefinition.getModifyBy());
         this.setResourceIds(processDefinition.getResourceIds());
         this.setWarningGroupId(processDefinition.getWarningGroupId());
+        this.setExecutionType(processDefinition.getExecutionType());
     }
 
     public int getOperator() {
@@ -89,4 +90,5 @@ public class ProcessDefinitionLog extends ProcessDefinition {
     public boolean equals(Object o) {
         return super.equals(o);
     }
+
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 18c386b..0b7ee2f 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -238,6 +238,10 @@ public class ProcessInstance {
      * varPool string
      */
     private String varPool;
+    /**
+     * serial queue next processInstanceId
+     */
+    private int nextProcessInstanceId;
 
     /**
      * dry run flag
@@ -706,4 +710,11 @@ public class ProcessInstance {
         return Objects.hash(id);
     }
 
+    public int getNextProcessInstanceId() {
+        return nextProcessInstanceId;
+    }
+
+    public void setNextProcessInstanceId(int nextProcessInstanceId) {
+        this.nextProcessInstanceId = nextProcessInstanceId;
+    }
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index 7be58a7..6e85163 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -233,6 +233,13 @@ public interface ProcessInstanceMapper extends 
BaseMapper<ProcessInstance> {
     List<ProcessInstance> 
queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long 
processDefinitionCode,
                                                             @Param("states") 
int[] states);
 
+    List<ProcessInstance> 
queryByProcessDefineCodeAndStatusAndNextId(@Param("processDefinitionCode") Long 
processDefinitionCode,
+                                                            @Param("states") 
int[] states, @Param("id") int id);
+
     int updateGlobalParamsById(@Param("globalParams") String globalParams,
                                @Param("id") int id);
+
+    boolean updateNextProcessIdById(@Param("thisInstanceId") int 
thisInstanceId, @Param("runningInstanceId")int runningInstanceId);
+
+    ProcessInstance loadNextProcess4Serial(@Param("processDefinitionCode") 
Long processDefinitionCode,@Param("state") int state);
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
index 40afa04..ee9f7c0 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
@@ -22,14 +22,14 @@
     <sql id="baseSql">
        id, code, name, version, description, project_code,
         release_state, user_id,global_params, flag, locations,
-         warning_group_id, timeout, tenant_id,operator, operate_time, 
create_time,
+         warning_group_id, timeout, tenant_id,operator,execution_type, 
operate_time, create_time,
           update_time
     </sql>
     <select id="queryByDefinitionName" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
         select pd.id, pd.code, pd.name, pd.version, pd.description, 
pd.project_code,
         pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations,
         pd.warning_group_id, pd.timeout, pd.tenant_id,pd.operator, 
pd.operate_time, pd.create_time,
-        pd.update_time, u.user_name,p.name as project_name
+        pd.update_time, u.user_name,p.name as project_name ,pd.execution_type
         from t_ds_process_definition_log pd
         JOIN t_ds_user u ON pd.user_id = u.id
         JOIN  t_ds_project p ON pd.project_code = p.code
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
index 9f76dd1..39314bb 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
@@ -20,12 +20,12 @@
 <mapper 
namespace="org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper">
     <sql id="baseSql">
         id, code, name, version, release_state, project_code, user_id, 
description,
-        global_params, flag, locations, warning_group_id, create_time, 
timeout, tenant_id, update_time
+        global_params, flag, locations, warning_group_id, create_time, 
timeout, tenant_id, update_time,execution_type
     </sql>
 
     <select id="verifyByDefineName" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
         select pd.id, pd.code, pd.name, pd.version, pd.release_state, 
pd.project_code, pd.user_id, pd.description,
-        pd.global_params, pd.flag, pd.locations, pd.warning_group_id, 
pd.create_time, pd.timeout, pd.tenant_id, pd.update_time
+        pd.global_params, pd.flag, pd.locations, pd.warning_group_id, 
pd.create_time, pd.timeout, pd.tenant_id, pd.update_time,pd.execution_type
         from t_ds_process_definition pd
         WHERE pd.project_code = #{projectCode}
         and pd.name = #{processDefinitionName}
@@ -58,7 +58,7 @@
     <select id="queryByDefineName" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
         select pd.id, pd.code, pd.name, pd.version, pd.release_state, 
pd.project_code, p.id as project_id, pd.user_id, pd.description,
         pd.global_params, pd.flag, pd.locations, pd.warning_group_id, 
pd.create_time, pd.timeout, pd.tenant_id, pd.update_time,
-        u.user_name,p.name as project_name,t.tenant_code,q.queue,q.queue_name
+        u.user_name,p.name as 
project_name,t.tenant_code,q.queue,q.queue_name,pd.execution_type
         from t_ds_process_definition pd
         JOIN t_ds_user u ON pd.user_id = u.id
         JOIN  t_ds_project p ON pd.project_code = p.code
@@ -70,7 +70,7 @@
     <select id="queryDefineListPaging" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
         SELECT td.id, td.code, td.name, td.version, td.release_state, 
td.project_code, td.user_id, td.description,
         td.global_params, td.flag, td.warning_group_id, td.timeout, 
td.tenant_id, td.update_time, td.create_time,
-        sc.schedule_release_state, tu.user_name
+        sc.schedule_release_state, tu.user_name ,td.execution_type
         FROM t_ds_process_definition td
         left join (select process_definition_code,release_state as 
schedule_release_state from t_ds_schedules group by
         process_definition_code,release_state) sc on 
sc.process_definition_code = td.code
@@ -127,7 +127,7 @@
         SELECT
             pd.id, pd.code, pd.name, pd.version, pd.release_state, 
pd.project_code, pd.user_id, pd.description,
             pd.global_params, pd.flag, pd.locations, pd.warning_group_id, 
pd.create_time, pd.timeout,
-            pd.tenant_id, pd.update_time, u.user_name,p.name AS project_name
+            pd.tenant_id, pd.update_time, u.user_name,p.name AS project_name 
,pd.execution_type
         FROM
             t_ds_process_definition pd,
             t_ds_user u,
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml
index 249fb86..1dc6168 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml
@@ -19,7 +19,7 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd"; >
 <mapper 
namespace="org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper">
     <sql id="baseSql">
-        id, parent_process_instance_id, parent_task_instance_id, 
process_instance_id
+        id, parent_process_instance_id, parent_task_instance_id, 
process_instance_id,next_process_instance_id
     </sql>
     <delete id="deleteByParentProcessId">
         delete
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index 08db3af..05a56a4 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -23,7 +23,7 @@
         command_type, command_param, task_depend_type, max_try_times, 
failure_strategy, warning_type,
         warning_group_id, schedule_time, command_start_time, global_params, 
flag,
         update_time, is_sub_process, executor_id, history_cmd,
-        process_instance_priority, worker_group,environment_code, timeout, 
tenant_id, var_pool, dry_run
+        process_instance_priority, worker_group,environment_code, timeout, 
tenant_id, var_pool, dry_run,next_process_instance_id
     </sql>
     <select id="queryDetailById" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
         select
@@ -90,7 +90,7 @@
     <select id="queryProcessInstanceListPaging" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
         select instance.id, instance.command_type, instance.executor_id, 
instance.process_definition_version,
         instance.process_definition_code, instance.name, instance.state, 
instance.schedule_time, instance.start_time,
-        instance.end_time, instance.run_times, instance.recovery, 
instance.host, instance.dry_run
+        instance.end_time, instance.run_times, instance.recovery, 
instance.host, instance.dry_run ,instance.next_process_instance_id
         from t_ds_process_instance instance
         join t_ds_process_definition define ON 
instance.process_definition_code = define.code
         where instance.is_sub_process=0
@@ -218,9 +218,36 @@
         </foreach>
         order by id asc
     </select>
+    <select id="queryByProcessDefineCodeAndStatusAndNextId" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_instance
+        where process_definition_code=#{processDefinitionCode}
+        and state in
+        <foreach collection="states" item="i" open="(" close=")" separator=",">
+            #{i}
+        </foreach>
+        and next_process_instance_id=0
+        and id <![CDATA[ < ]]> #{id}
+        order by id desc
+    </select>
+    <select id="loadNextProcess4Serial" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_instance
+        where process_definition_code=#{processDefinitionCode}
+        and state = #{state}
+        and id <![CDATA[ < ]]> #{id}
+        order by id desc limit 1
+    </select>
     <update id="updateGlobalParamsById">
         update t_ds_process_instance
         set global_params = #{globalParams}
         where id = #{id}
     </update>
+    <update id="updateNextProcessIdById">
+        update t_ds_process_instance
+        set next_process_instance_id = #{thisInstanceId}
+        where id = #{runningInstanceId} and next_process_instance_id=0
+    </update>
 </mapper>
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 500f2d2..f4527b8 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -281,6 +281,7 @@ class ProcessDefinition(Base):
             # TODO add serialization function
             json.dumps(self.task_relation_json),
             json.dumps(self.task_definition_json),
+            None,
         )
         return self._process_definition_code
 
diff --git 
a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
 
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index 08390e9..4e15fbb 100644
--- 
a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++ 
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.RunMode;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
@@ -175,7 +176,8 @@ public class PythonGatewayServer extends 
SpringBootServletInitializer {
                                                 int timeout,
                                                 String tenantCode,
                                                 String taskRelationJson,
-                                                String taskDefinitionJson) {
+                                                String taskDefinitionJson,
+                                                ProcessExecutionTypeEnum 
executionType) {
         User user = usersService.queryUser(userName);
         Project project = (Project) projectService.queryByName(user, 
projectName).get(Constants.DATA_LIST);
         long projectCode = project.getCode();
@@ -189,12 +191,12 @@ public class PythonGatewayServer extends 
SpringBootServletInitializer {
             // make sure process definition offline which could edit
             processDefinitionService.releaseProcessDefinition(user, 
projectCode, processDefinitionCode, ReleaseState.OFFLINE);
             Map<String, Object> result = 
processDefinitionService.updateProcessDefinition(user, projectCode, name, 
processDefinitionCode, description, globalParams,
-                locations, timeout, tenantCode, taskRelationJson, 
taskDefinitionJson);
+                locations, timeout, tenantCode, taskRelationJson, 
taskDefinitionJson,executionType);
             return processDefinitionCode;
         } else if (verifyStatus == Status.SUCCESS) {
             // create process definition
             Map<String, Object> result = 
processDefinitionService.createProcessDefinition(user, projectCode, name, 
description, globalParams,
-                locations, timeout, tenantCode, taskRelationJson, 
taskDefinitionJson);
+                locations, timeout, tenantCode, taskRelationJson, 
taskDefinitionJson,executionType);
             ProcessDefinition processDefinition = (ProcessDefinition) 
result.get(Constants.DATA_LIST);
             return processDefinition.getCode();
         } else {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index c501262..6ed8912 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.runner;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
 import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
 import static 
org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
@@ -48,6 +49,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.Environment;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -510,6 +512,10 @@ public class WorkflowExecuteThread implements Runnable {
     private void endProcess() {
         this.stateEvents.clear();
         processInstance.setEndTime(new Date());
+        ProcessDefinition processDefinition = 
this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),processInstance.getProcessDefinitionVersion());
+        if (processDefinition.getExecutionType().typeIsSerialWait()) {
+            checkSerialProcess(processDefinition);
+        }
         processService.updateProcessInstance(processInstance);
         if (processInstance.getState().typeIsWaitingThread()) {
             processService.createRecoveryWaitingThreadCommand(null, 
processInstance);
@@ -519,6 +525,29 @@ public class WorkflowExecuteThread implements Runnable {
         processAlertManager.sendAlertProcessInstance(processInstance, 
taskInstances, projectUser);
     }
 
+    public void checkSerialProcess(ProcessDefinition processDefinition) {
+        this.processInstance = 
processService.findProcessInstanceById(processInstance.getId());
+        int nextInstanceId = processInstance.getNextProcessInstanceId();
+        if (nextInstanceId == 0) {
+            ProcessInstance nextProcessInstance = 
this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),ExecutionStatus.SERIAL_WAIT.getCode());
+            if (nextProcessInstance == null) {
+                return;
+            }
+            nextInstanceId = nextProcessInstance.getId();
+        }
+        ProcessInstance nextProcessInstance = 
this.processService.findProcessInstanceById(nextInstanceId);
+        if (nextProcessInstance.getState().typeIsFinished() || 
nextProcessInstance.getState().typeIsRunning()) {
+            return;
+        }
+        Map<String, Object> cmdParam = new HashMap<>();
+        cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, nextInstanceId);
+        Command command = new Command();
+        command.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
+        command.setProcessDefinitionCode(processDefinition.getCode());
+        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+        processService.createCommand(command);
+    }
+
     /**
      * generate process dag
      *
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index 1b4d3bf..32b2b5e 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -29,6 +29,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -111,9 +112,7 @@ public class WorkflowExecuteThreadTest {
         Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
         dag.setAccessible(true);
         dag.set(workflowExecuteThread, new DAG());
-        PowerMockito.doNothing().when(workflowExecuteThread, "executeProcess");
         PowerMockito.doNothing().when(workflowExecuteThread, "prepareProcess");
-        PowerMockito.doNothing().when(workflowExecuteThread, "runProcess");
         PowerMockito.doNothing().when(workflowExecuteThread, "endProcess");
     }
 
@@ -256,6 +255,36 @@ public class WorkflowExecuteThreadTest {
         }
     }
 
+    @Test
+    public void testCheckSerialProcess() {
+        try {
+            ProcessDefinition processDefinition1 = new ProcessDefinition();
+            processDefinition1.setId(123);
+            processDefinition1.setName("test");
+            processDefinition1.setVersion(1);
+            processDefinition1.setCode(11L);
+            
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_WAIT);
+            Mockito.when(processInstance.getId()).thenReturn(225);
+            
Mockito.when(processService.findProcessInstanceById(225)).thenReturn(processInstance);
+            workflowExecuteThread.checkSerialProcess(processDefinition1);
+
+            Mockito.when(processInstance.getId()).thenReturn(225);
+            
Mockito.when(processInstance.getNextProcessInstanceId()).thenReturn(222);
+
+            ProcessInstance processInstance9 = new ProcessInstance();
+            processInstance9.setId(222);
+            processInstance9.setProcessDefinitionCode(11L);
+            processInstance9.setProcessDefinitionVersion(1);
+            processInstance9.setState(ExecutionStatus.SERIAL_WAIT);
+
+            
Mockito.when(processService.findProcessInstanceById(225)).thenReturn(processInstance);
+            
Mockito.when(processService.findProcessInstanceById(222)).thenReturn(processInstance9);
+            workflowExecuteThread.checkSerialProcess(processDefinition1);
+        } catch (Exception e) {
+            Assert.fail();
+        }
+    }
+
     private List<Schedule> zeroSchedulerList() {
         return Collections.emptyList();
     }
@@ -268,4 +297,4 @@ public class WorkflowExecuteThreadTest {
         return schedulerList;
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 1c8e72c..c13fbda 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -100,6 +100,8 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
+import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@@ -200,6 +202,10 @@ public class ProcessService {
     @Autowired
     private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
 
+
+    @Autowired
+    StateEventCallbackService stateEventCallbackService;
+
     @Autowired
     private EnvironmentMapper environmentMapper;
 
@@ -222,12 +228,77 @@ public class ProcessService {
         }
         processInstance.setCommandType(command.getCommandType());
         processInstance.addHistoryCmd(command.getCommandType());
-        saveProcessInstance(processInstance);
+        //if the processDefination is serial
+        ProcessDefinition processDefinition = 
this.findProcessDefinition(processInstance.getProcessDefinitionCode(), 
processInstance.getProcessDefinitionVersion());
+        if (processDefinition.getExecutionType().typeIsSerial()) {
+            saveSerialProcess(processInstance,processDefinition);
+            if (processInstance.getState() != 
ExecutionStatus.SUBMITTED_SUCCESS) {
+                this.setSubProcessParam(processInstance);
+                this.commandMapper.deleteById(command.getId());
+                return null;
+            }
+        } else {
+            saveProcessInstance(processInstance);
+        }
         this.setSubProcessParam(processInstance);
         this.commandMapper.deleteById(command.getId());
         return processInstance;
     }
 
+    private void saveSerialProcess(ProcessInstance 
processInstance,ProcessDefinition processDefinition) {
+        processInstance.setState(ExecutionStatus.SERIAL_WAIT);
+        saveProcessInstance(processInstance);
+        //serial wait
+        //when we get the running instance(or waiting instance) only get the 
priority instance(by id)
+        if (processDefinition.getExecutionType().typeIsSerialWait()) {
+            while (true) {
+                List<ProcessInstance> runningProcessInstances = 
this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+                        
Constants.RUNNING_PROCESS_STATE,processInstance.getId());
+                if (CollectionUtils.isEmpty(runningProcessInstances)) {
+                    
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+                    saveProcessInstance(processInstance);
+                    return;
+                }
+                ProcessInstance runningProcess = 
runningProcessInstances.get(0);
+                if 
(this.processInstanceMapper.updateNextProcessIdById(processInstance.getId(), 
runningProcess.getId())) {
+                    return;
+                }
+            }
+        } else if (processDefinition.getExecutionType().typeIsSerialDiscard()) 
{
+            List<ProcessInstance> runningProcessInstances = 
this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+                    Constants.RUNNING_PROCESS_STATE,processInstance.getId());
+            if (CollectionUtils.isEmpty(runningProcessInstances)) {
+                processInstance.setState(ExecutionStatus.STOP);
+                saveProcessInstance(processInstance);
+            }
+        } else if 
(processDefinition.getExecutionType().typeIsSerialPriority()) {
+            List<ProcessInstance> runningProcessInstances = 
this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+                    Constants.RUNNING_PROCESS_STATE,processInstance.getId());
+            if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
+                for (ProcessInstance info : runningProcessInstances) {
+                    info.setCommandType(CommandType.STOP);
+                    info.addHistoryCmd(CommandType.STOP);
+                    info.setState(ExecutionStatus.READY_STOP);
+                    int update = updateProcessInstance(info);
+                    // determine whether the process is normal
+                    if (update > 0) {
+                        String host = info.getHost();
+                        String address = host.split(":")[0];
+                        int port = Integer.parseInt(host.split(":")[1]);
+                        StateEventChangeCommand stateEventChangeCommand = new 
StateEventChangeCommand(
+                                info.getId(), 0, info.getState(), 
info.getId(), 0
+                        );
+                        try {
+                            stateEventCallbackService.sendResult(address, 
port, stateEventChangeCommand.convert2Command());
+                        } catch (Exception e) {
+                            logger.error("sendResultError");
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     /**
      * save error command, and delete original command
      *
@@ -2495,4 +2566,8 @@ public class ProcessService {
         }
         return processTaskMap;
     }
+
+    public ProcessInstance loadNextProcess4Serial(long code, int state) {
+        return this.processInstanceMapper.loadNextProcess4Serial(code,state);
+    }
 }
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 2112563..b55aee5 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.enums.WarningType;
@@ -86,7 +87,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 /**
  * process service test
  */
-@RunWith(MockitoJUnitRunner.class)
+@RunWith(MockitoJUnitRunner.Silent.class)
 public class ProcessServiceTest {
 
     private static final Logger logger = 
LoggerFactory.getLogger(CronUtilsTest.class);
@@ -257,16 +258,25 @@ public class ProcessServiceTest {
         command1.setProcessDefinitionVersion(definitionVersion);
         command1.setCommandParam("{\"ProcessInstanceId\":222}");
         command1.setCommandType(CommandType.START_PROCESS);
+
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setId(123);
         processDefinition.setName("test");
         processDefinition.setVersion(definitionVersion);
         processDefinition.setCode(definitionCode);
         
processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
+        processDefinition.setExecutionType(ProcessExecutionTypeEnum.PARALLEL);
+
         ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(222);
+        processInstance.setProcessDefinitionCode(11L);
+        processInstance.setHost("127.0.0.1:5678");
+        processInstance.setProcessDefinitionVersion(1);
         processInstance.setId(processInstanceId);
         processInstance.setProcessDefinitionCode(definitionCode);
         processInstance.setProcessDefinitionVersion(definitionVersion);
+
+        
Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode())).thenReturn(processDefinition);
         
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
             processInstance.getProcessDefinitionVersion())).thenReturn(new 
ProcessDefinitionLog(processDefinition));
         
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
@@ -309,6 +319,77 @@ public class ProcessServiceTest {
         command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
         ProcessInstance processInstance1 = 
processService.handleCommand(logger, host, command5, 
processDefinitionCacheMaps);
         
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
+
+        ProcessDefinition processDefinition1 = new ProcessDefinition();
+        processDefinition1.setId(123);
+        processDefinition1.setName("test");
+        processDefinition1.setVersion(1);
+        processDefinition1.setCode(11L);
+        processDefinition1.setVersion(1);
+        
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_WAIT);
+        List<ProcessInstance> lists = new ArrayList<>();
+        ProcessInstance processInstance11 = new ProcessInstance();
+        processInstance11.setId(222);
+        processInstance11.setProcessDefinitionCode(11L);
+        processInstance11.setProcessDefinitionVersion(1);
+        processInstance11.setHost("127.0.0.1:5678");
+        lists.add(processInstance11);
+
+        ProcessInstance processInstance2 = new ProcessInstance();
+        processInstance2.setId(223);
+        processInstance2.setProcessDefinitionCode(11L);
+        processInstance2.setProcessDefinitionVersion(1);
+        
Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2);
+        
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
+        Assert.assertNotNull(processService.handleCommand(logger, host, 
command1, processDefinitionCacheMaps));
+        Command command6 = new Command();
+        command6.setProcessDefinitionCode(11L);
+        command6.setCommandParam("{\"ProcessInstanceId\":223}");
+        command6.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
+        command6.setProcessDefinitionVersion(1);
+        
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L,Constants.RUNNING_PROCESS_STATE,223)).thenReturn(lists);
+        Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 
222)).thenReturn(true);
+        ProcessInstance processInstance6 = 
processService.handleCommand(logger, host, command6, 
processDefinitionCacheMaps);
+        Assert.assertTrue(processInstance6 != null);
+
+        
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD);
+        
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
+        ProcessInstance processInstance7 = new ProcessInstance();
+        processInstance7.setId(224);
+        processInstance7.setProcessDefinitionCode(11L);
+        processInstance7.setProcessDefinitionVersion(1);
+        
Mockito.when(processInstanceMapper.queryDetailById(224)).thenReturn(processInstance7);
+
+        Command command7 = new Command();
+        command7.setProcessDefinitionCode(11L);
+        command7.setCommandParam("{\"ProcessInstanceId\":224}");
+        command7.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
+        command7.setProcessDefinitionVersion(1);
+        
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L,Constants.RUNNING_PROCESS_STATE,224)).thenReturn(null);
+        ProcessInstance processInstance8 = 
processService.handleCommand(logger, host, command7, 
processDefinitionCacheMaps);
+        Assert.assertTrue(processInstance8 == null);
+
+        ProcessDefinition processDefinition2 = new ProcessDefinition();
+        processDefinition2.setId(123);
+        processDefinition2.setName("test");
+        processDefinition2.setVersion(1);
+        processDefinition2.setCode(12L);
+        
processDefinition2.setExecutionType(ProcessExecutionTypeEnum.SERIAL_PRIORITY);
+        
Mockito.when(processDefineMapper.queryByCode(12L)).thenReturn(processDefinition2);
+        ProcessInstance processInstance9 = new ProcessInstance();
+        processInstance9.setId(225);
+        processInstance9.setProcessDefinitionCode(11L);
+        processInstance9.setProcessDefinitionVersion(1);
+        Command command9 = new Command();
+        command9.setProcessDefinitionCode(12L);
+        command9.setCommandParam("{\"ProcessInstanceId\":225}");
+        command9.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
+        command9.setProcessDefinitionVersion(1);
+        
Mockito.when(processInstanceMapper.queryDetailById(225)).thenReturn(processInstance9);
+        
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L,Constants.RUNNING_PROCESS_STATE,0)).thenReturn(lists);
+        
Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
+        ProcessInstance processInstance10 = 
processService.handleCommand(logger, host, command9, 
processDefinitionCacheMaps);
+        Assert.assertTrue(processInstance10 == null);
     }
 
     @Test
diff --git a/sql/dolphinscheduler_h2.sql b/sql/dolphinscheduler_h2.sql
index 172686f..7c4f301 100644
--- a/sql/dolphinscheduler_h2.sql
+++ b/sql/dolphinscheduler_h2.sql
@@ -412,6 +412,7 @@ CREATE TABLE t_ds_process_definition
     warning_group_id int(11) DEFAULT NULL,
     timeout          int(11) DEFAULT '0',
     tenant_id        int(11) NOT NULL DEFAULT '-1',
+    execution_type   tinyint(4) DEFAULT '0',
     create_time      datetime NOT NULL,
     update_time      datetime     DEFAULT NULL,
     PRIMARY KEY (id),
@@ -443,6 +444,7 @@ CREATE TABLE t_ds_process_definition_log
     warning_group_id int(11) DEFAULT NULL,
     timeout          int(11) DEFAULT '0',
     tenant_id        int(11) NOT NULL DEFAULT '-1',
+    execution_type   tinyint(4) DEFAULT '0',
     operator         int(11) DEFAULT NULL,
     operate_time     datetime     DEFAULT NULL,
     create_time      datetime NOT NULL,
@@ -595,6 +597,7 @@ CREATE TABLE t_ds_process_instance
     worker_group               varchar(64)  DEFAULT NULL,
     environment_code           bigint(20) DEFAULT '-1',
     timeout                    int(11) DEFAULT '0',
+    next_process_instance_id   int(11) DEFAULT '0',
     tenant_id                  int(11) NOT NULL DEFAULT '-1',
     var_pool                   longtext,
     dry_run                    int NULL DEFAULT 0,
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index 5a27912..cd4c84b 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -414,6 +414,7 @@ CREATE TABLE `t_ds_process_definition` (
   `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id',
   `timeout` int(11) DEFAULT '0' COMMENT 'time out, unit: minute',
   `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
+  `execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type 
0:parallel,1:serial wait,2:serial discard,3:serial priority',
   `create_time` datetime NOT NULL COMMENT 'create time',
   `update_time` datetime NOT NULL COMMENT 'update time',
   PRIMARY KEY (`id`,`code`),
@@ -443,6 +444,7 @@ CREATE TABLE `t_ds_process_definition_log` (
   `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id',
   `timeout` int(11) DEFAULT '0' COMMENT 'time out,unit: minute',
   `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
+  `execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type 
0:parallel,1:serial wait,2:serial discard,3:serial priority',
   `operator` int(11) DEFAULT NULL COMMENT 'operator user id',
   `operate_time` datetime DEFAULT NULL COMMENT 'operate time',
   `create_time` datetime NOT NULL COMMENT 'create time',
@@ -593,6 +595,7 @@ CREATE TABLE `t_ds_process_instance` (
   `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
   `var_pool` longtext COMMENT 'var_pool',
   `dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run',
+  `next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next 
processInstanceId',
   PRIMARY KEY (`id`),
   KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE,
   KEY `start_time_index` (`start_time`) USING BTREE
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index 826e377..8c39bb1 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -331,6 +331,7 @@ CREATE TABLE t_ds_process_definition (
   flag int DEFAULT NULL ,
   timeout int DEFAULT '0' ,
   tenant_id int DEFAULT '-1' ,
+  execution_type int DEFAULT '0',
   create_time timestamp DEFAULT NULL ,
   update_time timestamp DEFAULT NULL ,
   PRIMARY KEY (id) ,
@@ -355,6 +356,7 @@ CREATE TABLE t_ds_process_definition_log (
   flag int DEFAULT NULL ,
   timeout int DEFAULT '0' ,
   tenant_id int DEFAULT '-1' ,
+  execution_type int DEFAULT '0',
   operator int DEFAULT NULL ,
   operate_time timestamp DEFAULT NULL ,
   create_time timestamp DEFAULT NULL ,
@@ -498,6 +500,7 @@ CREATE TABLE t_ds_process_instance (
   tenant_id int NOT NULL DEFAULT '-1' ,
   var_pool text ,
   dry_run int DEFAULT '0' ,
+  next_process_instance_id int DEFAULT '0',
   PRIMARY KEY (id)
 ) ;
 

Reply via email to