This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new dc8b87e [Feature-#6268][server-master] Serial execte process (#6185)
dc8b87e is described below
commit dc8b87e473886723cba02b064bcde6d7947891d6
Author: wangxj3 <[email protected]>
AuthorDate: Fri Nov 5 17:25:45 2021 +0800
[Feature-#6268][server-master] Serial execte process (#6185)
* add serial processInstance
* fix bug
* add test
* fix code style
* fix style code
* add sql
* fix sql error
* add api
* add test
* fix code style
* modify api
* delete column , fix mapper
* fix unimport
* fix test
* fix bug of missing param for Python
* fix code style
* fix test
* fix test
Co-authored-by: wangxj <wangxj31>
---
.../controller/ProcessDefinitionController.java | 9 +-
.../api/service/ProcessDefinitionService.java | 7 +-
.../service/impl/ProcessDefinitionServiceImpl.java | 168 +++++++++++----------
.../ProcessDefinitionControllerTest.java | 9 +-
.../api/service/ProcessDefinitionServiceTest.java | 3 +-
.../apache/dolphinscheduler/common/Constants.java | 6 +
.../dolphinscheduler/common/enums/CommandType.java | 4 +-
.../common/enums/ExecutionStatus.java | 3 +-
.../common/enums/ProcessExecutionTypeEnum.java | 79 ++++++++++
.../dao/entity/ProcessDefinition.java | 18 ++-
.../dao/entity/ProcessDefinitionLog.java | 2 +
.../dao/entity/ProcessInstance.java | 11 ++
.../dao/mapper/ProcessInstanceMapper.java | 7 +
.../dao/mapper/ProcessDefinitionLogMapper.xml | 4 +-
.../dao/mapper/ProcessDefinitionMapper.xml | 10 +-
.../dao/mapper/ProcessInstanceMapMapper.xml | 2 +-
.../dao/mapper/ProcessInstanceMapper.xml | 31 +++-
.../pydolphinscheduler/core/process_definition.py | 1 +
.../server/PythonGatewayServer.java | 8 +-
.../master/runner/WorkflowExecuteThread.java | 29 ++++
.../server/master/WorkflowExecuteThreadTest.java | 35 ++++-
.../service/process/ProcessService.java | 77 +++++++++-
.../service/process/ProcessServiceTest.java | 83 +++++++++-
sql/dolphinscheduler_h2.sql | 3 +
sql/dolphinscheduler_mysql.sql | 3 +
sql/dolphinscheduler_postgre.sql | 3 +
26 files changed, 501 insertions(+), 114 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
index d6cb1ba..6257918 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
@@ -41,6 +41,7 @@ import
org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -126,9 +127,10 @@ public class ProcessDefinitionController extends
BaseController {
@RequestParam(value = "timeout",
required = false, defaultValue = "0") int timeout,
@RequestParam(value = "tenantCode",
required = true) String tenantCode,
@RequestParam(value =
"taskRelationJson", required = true) String taskRelationJson,
- @RequestParam(value =
"taskDefinitionJson", required = true) String taskDefinitionJson) {
+ @RequestParam(value =
"taskDefinitionJson", required = true) String taskDefinitionJson,
+ @RequestParam(value =
"executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum
executionType) {
Map<String, Object> result =
processDefinitionService.createProcessDefinition(loginUser, projectCode, name,
description, globalParams,
- locations, timeout, tenantCode, taskRelationJson,
taskDefinitionJson);
+ locations, timeout, tenantCode, taskRelationJson,
taskDefinitionJson,executionType);
return returnDataList(result);
}
@@ -244,10 +246,11 @@ public class ProcessDefinitionController extends
BaseController {
@RequestParam(value = "tenantCode",
required = true) String tenantCode,
@RequestParam(value =
"taskRelationJson", required = true) String taskRelationJson,
@RequestParam(value =
"taskDefinitionJson", required = true) String taskDefinitionJson,
+ @RequestParam(value =
"executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum
executionType,
@RequestParam(value =
"releaseState", required = false, defaultValue = "OFFLINE") ReleaseState
releaseState) {
Map<String, Object> result =
processDefinitionService.updateProcessDefinition(loginUser, projectCode, name,
code, description, globalParams,
- locations, timeout, tenantCode, taskRelationJson,
taskDefinitionJson);
+ locations, timeout, tenantCode, taskRelationJson,
taskDefinitionJson,executionType);
// If the update fails, the result will be returned directly
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return returnDataList(result);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 42fce02..723717d 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.dao.entity.User;
@@ -56,7 +57,8 @@ public interface ProcessDefinitionService {
int timeout,
String tenantCode,
String taskRelationJson,
- String taskDefinitionJson);
+ String taskDefinitionJson,
+ ProcessExecutionTypeEnum
executionType);
/**
* query process definition list
@@ -164,7 +166,8 @@ public interface ProcessDefinitionService {
int timeout,
String tenantCode,
String taskRelationJson,
- String taskDefinitionJson);
+ String taskDefinitionJson,
+ ProcessExecutionTypeEnum
executionType);
/**
* verify process definition name unique
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index d4aa93e..2fdec30 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.graph.DAG;
@@ -72,8 +73,6 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.commons.lang.StringUtils;
-
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -163,15 +162,15 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* create process definition
*
- * @param loginUser login user
- * @param projectCode project code
- * @param name process definition name
- * @param description description
- * @param globalParams global params
- * @param locations locations for nodes
- * @param timeout timeout
- * @param tenantCode tenantCode
- * @param taskRelationJson relation json for nodes
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param name process definition name
+ * @param description description
+ * @param globalParams global params
+ * @param locations locations for nodes
+ * @param timeout timeout
+ * @param tenantCode tenantCode
+ * @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return create result code
*/
@@ -186,7 +185,8 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
int timeout,
String tenantCode,
String taskRelationJson,
- String
taskDefinitionJson) {
+ String
taskDefinitionJson,
+
ProcessExecutionTypeEnum executionType) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode);
@@ -227,7 +227,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = new
ProcessDefinition(projectCode, name, processDefinitionCode, description,
- globalParams, locations, timeout, loginUser.getId(), tenantId);
+ globalParams, locations, timeout, loginUser.getId(), tenantId);
+ processDefinition.setExecutionType(executionType);
+
return createDagDefine(loginUser, taskRelationList, processDefinition,
taskDefinitionLogs);
}
@@ -292,8 +294,8 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
List<ProcessTaskRelation> processTaskRelations =
taskRelationList.stream()
- .map(processTaskRelationLog ->
JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog),
ProcessTaskRelation.class))
- .collect(Collectors.toList());
+ .map(processTaskRelationLog ->
JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog),
ProcessTaskRelation.class))
+ .collect(Collectors.toList());
List<TaskNode> taskNodeList =
processService.transformTask(processTaskRelations, taskDefinitionLogs);
if (taskNodeList.size() != taskRelationList.size()) {
Set<Long> postTaskCodes =
taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet());
@@ -301,7 +303,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
Collection<Long> codes =
CollectionUtils.subtract(postTaskCodes, taskNodeCodes);
if (CollectionUtils.isNotEmpty(codes)) {
logger.error("the task code is not exit");
- putMsg(result, Status.TASK_DEFINE_NOT_EXIST,
StringUtils.join(codes, Constants.COMMA));
+ putMsg(result, Status.TASK_DEFINE_NOT_EXIST,
org.apache.commons.lang.StringUtils.join(codes, Constants.COMMA));
return result;
}
}
@@ -330,7 +332,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* query process definition list
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
* @return definition list
*/
@@ -352,12 +354,12 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* query process definition list paging
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param searchVal search value
- * @param userId user id
- * @param pageNo page number
- * @param pageSize page size
+ * @param searchVal search value
+ * @param userId user id
+ * @param pageNo page number
+ * @param pageSize page size
* @return process definition page
*/
@Override
@@ -374,7 +376,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
Page<ProcessDefinition> page = new Page<>(pageNo, pageSize);
IPage<ProcessDefinition> processDefinitionIPage =
processDefinitionMapper.queryDefineListPaging(
- page, searchVal, userId, project.getCode(), isAdmin(loginUser));
+ page, searchVal, userId, project.getCode(),
isAdmin(loginUser));
List<ProcessDefinition> records = processDefinitionIPage.getRecords();
for (ProcessDefinition pd : records) {
@@ -395,9 +397,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* query detail of process definition
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param code process definition code
+ * @param code process definition code
* @return process definition detail
*/
@Override
@@ -447,16 +449,16 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* update process definition
*
- * @param loginUser login user
- * @param projectCode project code
- * @param name process definition name
- * @param code process definition code
- * @param description description
- * @param globalParams global params
- * @param locations locations for nodes
- * @param timeout timeout
- * @param tenantCode tenantCode
- * @param taskRelationJson relation json for nodes
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param name process definition name
+ * @param code process definition code
+ * @param description description
+ * @param globalParams global params
+ * @param locations locations for nodes
+ * @param timeout timeout
+ * @param tenantCode tenantCode
+ * @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return update result code
*/
@@ -472,7 +474,8 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
int timeout,
String tenantCode,
String taskRelationJson,
- String
taskDefinitionJson) {
+ String
taskDefinitionJson,
+
ProcessExecutionTypeEnum executionType) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode);
@@ -522,6 +525,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
ProcessDefinition processDefinitionDeepCopy =
JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition),
ProcessDefinition.class);
processDefinition.set(projectCode, name, description, globalParams,
locations, timeout, tenantId);
+ processDefinition.setExecutionType(executionType);
return updateDagDefine(loginUser, taskRelationList, processDefinition,
processDefinitionDeepCopy, taskDefinitionLogs);
}
@@ -551,7 +555,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(),
- processDefinition.getCode(), insertVersion, taskRelationList,
taskDefinitionLogs);
+ processDefinition.getCode(), insertVersion, taskRelationList,
taskDefinitionLogs);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@@ -565,9 +569,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* verify process definition name unique
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param name name
+ * @param name name
* @return true if process definition name not exists, otherwise false
*/
@Override
@@ -590,9 +594,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* delete process definition by code
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param code process definition code
+ * @param code process definition code
* @return delete result code
*/
@Override
@@ -661,9 +665,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* release process definition: online / offline
*
- * @param loginUser login user
- * @param projectCode project code
- * @param code process definition code
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param code process definition code
* @param releaseState release state
* @return release result code
*/
@@ -689,7 +693,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
case ONLINE:
// To check resources whether they are already cancel
authorized or deleted
String resourceIds = processDefinition.getResourceIds();
- if (StringUtils.isNotBlank(resourceIds)) {
+ if
(org.apache.commons.lang.StringUtils.isNotBlank(resourceIds)) {
Integer[] resourceIdArray =
Arrays.stream(resourceIds.split(Constants.COMMA)).map(Integer::parseInt).toArray(Integer[]::new);
PermissionCheck<Integer> permissionCheck = new
PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService,
resourceIdArray, loginUser.getId(), logger);
try {
@@ -708,7 +712,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
processDefinition.setReleaseState(releaseState);
int updateProcess =
processDefinitionMapper.updateById(processDefinition);
List<Schedule> scheduleList =
scheduleMapper.selectAllByProcessDefineArray(
- new long[]{processDefinition.getCode()}
+ new long[]{processDefinition.getCode()}
);
if (updateProcess > 0 && scheduleList.size() == 1) {
Schedule schedule = scheduleList.get(0);
@@ -737,7 +741,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
*/
@Override
public void batchExportProcessDefinitionByCodes(User loginUser, long
projectCode, String codes, HttpServletResponse response) {
- if (StringUtils.isEmpty(codes)) {
+ if (org.apache.commons.lang.StringUtils.isEmpty(codes)) {
return;
}
Project project = projectMapper.queryByCode(projectCode);
@@ -807,9 +811,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* import process definition
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param file process metadata json file
+ * @param file process metadata json file
* @return import process
*/
@Override
@@ -1017,9 +1021,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* get task node details based on process definition
*
- * @param loginUser loginUser
+ * @param loginUser loginUser
* @param projectCode project code
- * @param code process definition code
+ * @param code process definition code
* @return task node list
*/
@Override
@@ -1046,9 +1050,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* get task node details map based on process definition
*
- * @param loginUser loginUser
+ * @param loginUser loginUser
* @param projectCode project code
- * @param codes define codes
+ * @param codes define codes
* @return task node list
*/
@Override
@@ -1083,7 +1087,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* query process definition all by project code
*
- * @param loginUser loginUser
+ * @param loginUser loginUser
* @param projectCode project code
* @return process definitions in the project
*/
@@ -1105,7 +1109,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* Encapsulates the TreeView structure
*
- * @param code process definition code
+ * @param code process definition code
* @param limit limit
* @return tree view json data
*/
@@ -1130,7 +1134,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
processInstanceList.forEach(processInstance ->
processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(),
processInstance.getEndTime())));
List<TaskDefinitionLog> taskDefinitionList =
processService.genTaskDefineList(processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
processDefinition.getCode()));
Map<Long, TaskDefinitionLog> taskDefinitionMap =
taskDefinitionList.stream()
- .collect(Collectors.toMap(TaskDefinitionLog::getCode,
taskDefinitionLog -> taskDefinitionLog));
+ .collect(Collectors.toMap(TaskDefinitionLog::getCode,
taskDefinitionLog -> taskDefinitionLog));
if (limit > processInstanceList.size()) {
limit = processInstanceList.size();
@@ -1145,8 +1149,8 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
ProcessInstance processInstance = processInstanceList.get(i);
Date endTime = processInstance.getEndTime() == null ? new Date() :
processInstance.getEndTime();
parentTreeViewDto.getInstances().add(new
Instance(processInstance.getId(), processInstance.getName(),
processInstance.getProcessDefinitionCode(),
- "", processInstance.getState().toString(),
processInstance.getStartTime(), endTime, processInstance.getHost(),
- DateUtils.format2Readable(endTime.getTime() -
processInstance.getStartTime().getTime())));
+ "", processInstance.getState().toString(),
processInstance.getStartTime(), endTime, processInstance.getHost(),
+ DateUtils.format2Readable(endTime.getTime() -
processInstance.getStartTime().getTime())));
}
List<TreeViewDto> parentTreeViewDtoList = new ArrayList<>();
@@ -1184,11 +1188,11 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
if (taskInstance.isSubProcess()) {
TaskDefinition taskDefinition =
taskDefinitionMap.get(taskInstance.getTaskCode());
subProcessId =
Integer.parseInt(JSONUtils.parseObject(
-
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
+
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
}
treeViewDto.getInstances().add(new
Instance(taskInstance.getId(), taskInstance.getName(),
taskInstance.getTaskCode(),
- taskInstance.getTaskType(),
taskInstance.getState().toString(), taskInstance.getStartTime(),
taskInstance.getEndTime(),
- taskInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - startTime.getTime()),
subProcessId));
+ taskInstance.getTaskType(),
taskInstance.getState().toString(), taskInstance.getStartTime(),
taskInstance.getEndTime(),
+ taskInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - startTime.getTime()),
subProcessId));
}
}
for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {
@@ -1249,9 +1253,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* batch copy process definition
*
- * @param loginUser loginUser
- * @param projectCode projectCode
- * @param codes processDefinitionCodes
+ * @param loginUser loginUser
+ * @param projectCode projectCode
+ * @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
@Override
@@ -1272,9 +1276,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* batch move process definition
*
- * @param loginUser loginUser
- * @param projectCode projectCode
- * @param codes processDefinitionCodes
+ * @param loginUser loginUser
+ * @param projectCode projectCode
+ * @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
@Override
@@ -1307,7 +1311,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
- if (StringUtils.isEmpty(processDefinitionCodes)) {
+ if
(org.apache.commons.lang.StringUtils.isEmpty(processDefinitionCodes)) {
putMsg(result, Status.PROCESS_DEFINITION_CODES_IS_EMPTY,
processDefinitionCodes);
return result;
}
@@ -1337,7 +1341,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
diffCode.forEach(code -> failedProcessList.add(code + "[null]"));
for (ProcessDefinition processDefinition : processDefinitionList) {
List<ProcessTaskRelation> processTaskRelations =
-
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
processDefinition.getCode());
+
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
processDefinition.getCode());
List<ProcessTaskRelationLog> taskRelationList =
processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
processDefinition.setProjectCode(targetProjectCode);
if (isCopy) {
@@ -1373,10 +1377,10 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* switch the defined process definition version
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param code process definition code
- * @param version the version user want to switch
+ * @param code process definition code
+ * @param version the version user want to switch
* @return switch process definition version result code
*/
@Override
@@ -1412,11 +1416,11 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* check batch operate result
*
- * @param srcProjectCode srcProjectCode
+ * @param srcProjectCode srcProjectCode
* @param targetProjectCode targetProjectCode
- * @param result result
+ * @param result result
* @param failedProcessList failedProcessList
- * @param isCopy isCopy
+ * @param isCopy isCopy
*/
private void checkBatchOperateResult(long srcProjectCode, long
targetProjectCode,
Map<String, Object> result,
List<String> failedProcessList, boolean isCopy) {
@@ -1434,11 +1438,11 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* query the pagination versions info by one certain process definition
code
*
- * @param loginUser login user info to check auth
+ * @param loginUser login user info to check auth
* @param projectCode project code
- * @param pageNo page number
- * @param pageSize page size
- * @param code process definition code
+ * @param pageNo page number
+ * @param pageSize page size
+ * @param code process definition code
* @return the pagination process definition versions info of the certain
process definition
*/
@Override
@@ -1468,10 +1472,10 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* delete one certain process definition by version number and process
definition code
*
- * @param loginUser login user info to check auth
+ * @param loginUser login user info to check auth
* @param projectCode project code
- * @param code process definition code
- * @param version version number
+ * @param code process definition code
+ * @param version version number
* @return delete result code
*/
@Override
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
index 4737c2f..2ac632f 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
@@ -22,6 +22,7 @@ import
org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -94,10 +95,10 @@ public class ProcessDefinitionControllerTest {
result.put(Constants.DATA_LIST, 1);
Mockito.when(processDefinitionService.createProcessDefinition(user,
projectCode, name, description, globalParams,
- locations, timeout, tenantCode, relationJson,
taskDefinitionJson)).thenReturn(result);
+ locations, timeout, tenantCode, relationJson,
taskDefinitionJson, ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result);
Result response =
processDefinitionController.createProcessDefinition(user, projectCode, name,
description, globalParams,
- locations, timeout, tenantCode, relationJson,
taskDefinitionJson);
+ locations, timeout, tenantCode, relationJson,
taskDefinitionJson,ProcessExecutionTypeEnum.PARALLEL);
Assert.assertEquals(Status.SUCCESS.getCode(),
response.getCode().intValue());
}
@@ -157,10 +158,10 @@ public class ProcessDefinitionControllerTest {
result.put("processDefinitionId", 1);
Mockito.when(processDefinitionService.updateProcessDefinition(user,
projectCode, name, code, description, globalParams,
- locations, timeout, tenantCode, relationJson,
taskDefinitionJson)).thenReturn(result);
+ locations, timeout, tenantCode, relationJson,
taskDefinitionJson,ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result);
Result response =
processDefinitionController.updateProcessDefinition(user, projectCode, name,
code, description, globalParams,
- locations, timeout, tenantCode, relationJson,
taskDefinitionJson, ReleaseState.OFFLINE);
+ locations, timeout, tenantCode, relationJson,
taskDefinitionJson, ProcessExecutionTypeEnum.PARALLEL, ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS.getCode(),
response.getCode().intValue());
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 506bba3..3714de9 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
@@ -610,7 +611,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
Map<String, Object> updateResult =
processDefinitionService.updateProcessDefinition(loginUser, projectCode,
"test", 1,
- "", "", "", 0, "root", null, null);
+ "", "", "", 0, "root", null, null,
ProcessExecutionTypeEnum.PARALLEL);
Assert.assertEquals(Status.DATA_IS_NOT_VALID,
updateResult.get(Constants.STATUS));
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index fc9e36f..fa9a490 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -876,6 +876,12 @@ public final class Constants {
ExecutionStatus.WAITING_DEPEND.ordinal()
};
+ public static final int[] RUNNING_PROCESS_STATE = new int[] {
+ ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+ ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+ ExecutionStatus.SERIAL_WAIT.ordinal()
+ };
+
/**
* status
*/
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index 4f24977..9bff968 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -50,7 +50,9 @@ public enum CommandType {
REPEAT_RUNNING(7, "repeat running a process"),
PAUSE(8, "pause a process"),
STOP(9, "stop a process"),
- RECOVER_WAITING_THREAD(10, "recover waiting thread");
+ RECOVER_WAITING_THREAD(10, "recover waiting thread"),
+ RECOVER_SERIAL_WAIT(11, "recover serial wait"),
+ ;
CommandType(int code, String descp){
this.code = code;
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
index 637eab2..99ba93e 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
@@ -56,7 +56,8 @@ public enum ExecutionStatus {
WAITING_THREAD(10, "waiting thread"),
WAITING_DEPEND(11, "waiting depend node complete"),
DELAY_EXECUTION(12, "delay execution"),
- FORCED_SUCCESS(13, "forced success");
+ FORCED_SUCCESS(13, "forced success"),
+ SERIAL_WAIT(14, "serial wait");
ExecutionStatus(int code, String descp) {
this.code = code;
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProcessExecutionTypeEnum.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProcessExecutionTypeEnum.java
new file mode 100644
index 0000000..50c46fd
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProcessExecutionTypeEnum.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.enums;
+
+import java.util.HashMap;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+
+public enum ProcessExecutionTypeEnum {
+
+ PARALLEL(0, "parallel"),
+ SERIAL_WAIT(1, "serial wait"),
+ SERIAL_DISCARD(2, "serial discard"),
+ SERIAL_PRIORITY(3, "serial priority");
+
+ ProcessExecutionTypeEnum(int code, String descp) {
+ this.code = code;
+ this.descp = descp;
+ }
+
+ @EnumValue
+ private final int code;
+ private final String descp;
+
+ private static HashMap<Integer, ProcessExecutionTypeEnum>
EXECUTION_STATUS_MAP = new HashMap<>();
+
+ static {
+ for (ProcessExecutionTypeEnum executionType :
ProcessExecutionTypeEnum.values()) {
+ EXECUTION_STATUS_MAP.put(executionType.code, executionType);
+ }
+ }
+
+ public boolean typeIsSerial() {
+ return this != PARALLEL;
+ }
+
+ public boolean typeIsSerialWait() {
+ return this == SERIAL_WAIT;
+ }
+
+ public boolean typeIsSerialDiscard() {
+ return this == SERIAL_DISCARD;
+ }
+
+ public boolean typeIsSerialPriority() {
+ return this == SERIAL_PRIORITY;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getDescp() {
+ return descp;
+ }
+
+ public static ProcessExecutionTypeEnum of(int executionType) {
+ if (EXECUTION_STATUS_MAP.containsKey(executionType)) {
+ return EXECUTION_STATUS_MAP.get(executionType);
+ }
+ throw new IllegalArgumentException("invalid status : " +
executionType);
+ }
+
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
index 8e50ce8..2fbcac1 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -174,8 +175,12 @@ public class ProcessDefinition {
@TableField(exist = false)
private int warningGroupId;
- public ProcessDefinition() {
- }
+ /**
+ * execution type
+ */
+ private ProcessExecutionTypeEnum executionType;
+
+ public ProcessDefinition() { }
public ProcessDefinition(long projectCode,
String name,
@@ -412,6 +417,14 @@ public class ProcessDefinition {
this.warningGroupId = warningGroupId;
}
+ public ProcessExecutionTypeEnum getExecutionType() {
+ return executionType;
+ }
+
+ public void setExecutionType(ProcessExecutionTypeEnum executionType) {
+ this.executionType = executionType;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -430,6 +443,7 @@ public class ProcessDefinition {
&& Objects.equals(description, that.description)
&& Objects.equals(globalParams, that.globalParams)
&& flag == that.flag
+ && executionType == that.executionType
&& Objects.equals(locations, that.locations);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
index 30840e8..ee11ba7 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
@@ -67,6 +67,7 @@ public class ProcessDefinitionLog extends ProcessDefinition {
this.setModifyBy(processDefinition.getModifyBy());
this.setResourceIds(processDefinition.getResourceIds());
this.setWarningGroupId(processDefinition.getWarningGroupId());
+ this.setExecutionType(processDefinition.getExecutionType());
}
public int getOperator() {
@@ -89,4 +90,5 @@ public class ProcessDefinitionLog extends ProcessDefinition {
public boolean equals(Object o) {
return super.equals(o);
}
+
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 18c386b..0b7ee2f 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -238,6 +238,10 @@ public class ProcessInstance {
* varPool string
*/
private String varPool;
+ /**
+ * serial queue next processInstanceId
+ */
+ private int nextProcessInstanceId;
/**
* dry run flag
@@ -706,4 +710,11 @@ public class ProcessInstance {
return Objects.hash(id);
}
+ public int getNextProcessInstanceId() {
+ return nextProcessInstanceId;
+ }
+
+ public void setNextProcessInstanceId(int nextProcessInstanceId) {
+ this.nextProcessInstanceId = nextProcessInstanceId;
+ }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index 7be58a7..6e85163 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -233,6 +233,13 @@ public interface ProcessInstanceMapper extends
BaseMapper<ProcessInstance> {
List<ProcessInstance>
queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long
processDefinitionCode,
@Param("states")
int[] states);
+ List<ProcessInstance>
queryByProcessDefineCodeAndStatusAndNextId(@Param("processDefinitionCode") Long
processDefinitionCode,
+ @Param("states")
int[] states, @Param("id") int id);
+
int updateGlobalParamsById(@Param("globalParams") String globalParams,
@Param("id") int id);
+
+ boolean updateNextProcessIdById(@Param("thisInstanceId") int
thisInstanceId, @Param("runningInstanceId")int runningInstanceId);
+
+ ProcessInstance loadNextProcess4Serial(@Param("processDefinitionCode")
Long processDefinitionCode,@Param("state") int state);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
index 40afa04..ee9f7c0 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
@@ -22,14 +22,14 @@
<sql id="baseSql">
id, code, name, version, description, project_code,
release_state, user_id,global_params, flag, locations,
- warning_group_id, timeout, tenant_id,operator, operate_time,
create_time,
+ warning_group_id, timeout, tenant_id,operator,execution_type,
operate_time, create_time,
update_time
</sql>
<select id="queryByDefinitionName"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select pd.id, pd.code, pd.name, pd.version, pd.description,
pd.project_code,
pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations,
pd.warning_group_id, pd.timeout, pd.tenant_id,pd.operator,
pd.operate_time, pd.create_time,
- pd.update_time, u.user_name,p.name as project_name
+ pd.update_time, u.user_name,p.name as project_name ,pd.execution_type
from t_ds_process_definition_log pd
JOIN t_ds_user u ON pd.user_id = u.id
JOIN t_ds_project p ON pd.project_code = p.code
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
index 9f76dd1..39314bb 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
@@ -20,12 +20,12 @@
<mapper
namespace="org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper">
<sql id="baseSql">
id, code, name, version, release_state, project_code, user_id,
description,
- global_params, flag, locations, warning_group_id, create_time,
timeout, tenant_id, update_time
+ global_params, flag, locations, warning_group_id, create_time,
timeout, tenant_id, update_time,execution_type
</sql>
<select id="verifyByDefineName"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
select pd.id, pd.code, pd.name, pd.version, pd.release_state,
pd.project_code, pd.user_id, pd.description,
- pd.global_params, pd.flag, pd.locations, pd.warning_group_id,
pd.create_time, pd.timeout, pd.tenant_id, pd.update_time
+ pd.global_params, pd.flag, pd.locations, pd.warning_group_id,
pd.create_time, pd.timeout, pd.tenant_id, pd.update_time,pd.execution_type
from t_ds_process_definition pd
WHERE pd.project_code = #{projectCode}
and pd.name = #{processDefinitionName}
@@ -58,7 +58,7 @@
<select id="queryByDefineName"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
select pd.id, pd.code, pd.name, pd.version, pd.release_state,
pd.project_code, p.id as project_id, pd.user_id, pd.description,
pd.global_params, pd.flag, pd.locations, pd.warning_group_id,
pd.create_time, pd.timeout, pd.tenant_id, pd.update_time,
- u.user_name,p.name as project_name,t.tenant_code,q.queue,q.queue_name
+ u.user_name,p.name as
project_name,t.tenant_code,q.queue,q.queue_name,pd.execution_type
from t_ds_process_definition pd
JOIN t_ds_user u ON pd.user_id = u.id
JOIN t_ds_project p ON pd.project_code = p.code
@@ -70,7 +70,7 @@
<select id="queryDefineListPaging"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
SELECT td.id, td.code, td.name, td.version, td.release_state,
td.project_code, td.user_id, td.description,
td.global_params, td.flag, td.warning_group_id, td.timeout,
td.tenant_id, td.update_time, td.create_time,
- sc.schedule_release_state, tu.user_name
+ sc.schedule_release_state, tu.user_name ,td.execution_type
FROM t_ds_process_definition td
left join (select process_definition_code,release_state as
schedule_release_state from t_ds_schedules group by
process_definition_code,release_state) sc on
sc.process_definition_code = td.code
@@ -127,7 +127,7 @@
SELECT
pd.id, pd.code, pd.name, pd.version, pd.release_state,
pd.project_code, pd.user_id, pd.description,
pd.global_params, pd.flag, pd.locations, pd.warning_group_id,
pd.create_time, pd.timeout,
- pd.tenant_id, pd.update_time, u.user_name,p.name AS project_name
+ pd.tenant_id, pd.update_time, u.user_name,p.name AS project_name
,pd.execution_type
FROM
t_ds_process_definition pd,
t_ds_user u,
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml
index 249fb86..1dc6168 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml
@@ -19,7 +19,7 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper
namespace="org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper">
<sql id="baseSql">
- id, parent_process_instance_id, parent_task_instance_id,
process_instance_id
+ id, parent_process_instance_id, parent_task_instance_id,
process_instance_id,next_process_instance_id
</sql>
<delete id="deleteByParentProcessId">
delete
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index 08db3af..05a56a4 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -23,7 +23,7 @@
command_type, command_param, task_depend_type, max_try_times,
failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params,
flag,
update_time, is_sub_process, executor_id, history_cmd,
- process_instance_priority, worker_group,environment_code, timeout,
tenant_id, var_pool, dry_run
+ process_instance_priority, worker_group,environment_code, timeout,
tenant_id, var_pool, dry_run,next_process_instance_id
</sql>
<select id="queryDetailById"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
@@ -90,7 +90,7 @@
<select id="queryProcessInstanceListPaging"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.command_type, instance.executor_id,
instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state,
instance.schedule_time, instance.start_time,
- instance.end_time, instance.run_times, instance.recovery,
instance.host, instance.dry_run
+ instance.end_time, instance.run_times, instance.recovery,
instance.host, instance.dry_run ,instance.next_process_instance_id
from t_ds_process_instance instance
join t_ds_process_definition define ON
instance.process_definition_code = define.code
where instance.is_sub_process=0
@@ -218,9 +218,36 @@
</foreach>
order by id asc
</select>
+ <select id="queryByProcessDefineCodeAndStatusAndNextId"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_instance
+ where process_definition_code=#{processDefinitionCode}
+ and state in
+ <foreach collection="states" item="i" open="(" close=")" separator=",">
+ #{i}
+ </foreach>
+ and next_process_instance_id=0
+ and id <![CDATA[ < ]]> #{id}
+ order by id desc
+ </select>
+ <select id="loadNextProcess4Serial"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_instance
+ where process_definition_code=#{processDefinitionCode}
+ and state = #{state}
+ and id <![CDATA[ < ]]> #{id}
+ order by id desc limit 1
+ </select>
<update id="updateGlobalParamsById">
update t_ds_process_instance
set global_params = #{globalParams}
where id = #{id}
</update>
+ <update id="updateNextProcessIdById">
+ update t_ds_process_instance
+ set next_process_instance_id = #{thisInstanceId}
+ where id = #{runningInstanceId} and next_process_instance_id=0
+ </update>
</mapper>
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 500f2d2..f4527b8 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -281,6 +281,7 @@ class ProcessDefinition(Base):
# TODO add serialization function
json.dumps(self.task_relation_json),
json.dumps(self.task_definition_json),
+ None,
)
return self._process_definition_code
diff --git
a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index 08390e9..4e15fbb 100644
---
a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
@@ -175,7 +176,8 @@ public class PythonGatewayServer extends
SpringBootServletInitializer {
int timeout,
String tenantCode,
String taskRelationJson,
- String taskDefinitionJson) {
+ String taskDefinitionJson,
+ ProcessExecutionTypeEnum
executionType) {
User user = usersService.queryUser(userName);
Project project = (Project) projectService.queryByName(user,
projectName).get(Constants.DATA_LIST);
long projectCode = project.getCode();
@@ -189,12 +191,12 @@ public class PythonGatewayServer extends
SpringBootServletInitializer {
// make sure process definition offline which could edit
processDefinitionService.releaseProcessDefinition(user,
projectCode, processDefinitionCode, ReleaseState.OFFLINE);
Map<String, Object> result =
processDefinitionService.updateProcessDefinition(user, projectCode, name,
processDefinitionCode, description, globalParams,
- locations, timeout, tenantCode, taskRelationJson,
taskDefinitionJson);
+ locations, timeout, tenantCode, taskRelationJson,
taskDefinitionJson,executionType);
return processDefinitionCode;
} else if (verifyStatus == Status.SUCCESS) {
// create process definition
Map<String, Object> result =
processDefinitionService.createProcessDefinition(user, projectCode, name,
description, globalParams,
- locations, timeout, tenantCode, taskRelationJson,
taskDefinitionJson);
+ locations, timeout, tenantCode, taskRelationJson,
taskDefinitionJson,executionType);
ProcessDefinition processDefinition = (ProcessDefinition)
result.get(Constants.DATA_LIST);
return processDefinition.getCode();
} else {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index c501262..6ed8912 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static
org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
@@ -48,6 +49,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -510,6 +512,10 @@ public class WorkflowExecuteThread implements Runnable {
private void endProcess() {
this.stateEvents.clear();
processInstance.setEndTime(new Date());
+ ProcessDefinition processDefinition =
this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),processInstance.getProcessDefinitionVersion());
+ if (processDefinition.getExecutionType().typeIsSerialWait()) {
+ checkSerialProcess(processDefinition);
+ }
processService.updateProcessInstance(processInstance);
if (processInstance.getState().typeIsWaitingThread()) {
processService.createRecoveryWaitingThreadCommand(null,
processInstance);
@@ -519,6 +525,29 @@ public class WorkflowExecuteThread implements Runnable {
processAlertManager.sendAlertProcessInstance(processInstance,
taskInstances, projectUser);
}
+ public void checkSerialProcess(ProcessDefinition processDefinition) {
+ this.processInstance =
processService.findProcessInstanceById(processInstance.getId());
+ int nextInstanceId = processInstance.getNextProcessInstanceId();
+ if (nextInstanceId == 0) {
+ ProcessInstance nextProcessInstance =
this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),ExecutionStatus.SERIAL_WAIT.getCode());
+ if (nextProcessInstance == null) {
+ return;
+ }
+ nextInstanceId = nextProcessInstance.getId();
+ }
+ ProcessInstance nextProcessInstance =
this.processService.findProcessInstanceById(nextInstanceId);
+ if (nextProcessInstance.getState().typeIsFinished() ||
nextProcessInstance.getState().typeIsRunning()) {
+ return;
+ }
+ Map<String, Object> cmdParam = new HashMap<>();
+ cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, nextInstanceId);
+ Command command = new Command();
+ command.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
+ command.setProcessDefinitionCode(processDefinition.getCode());
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ processService.createCommand(command);
+ }
+
/**
* generate process dag
*
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index 1b4d3bf..32b2b5e 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -29,6 +29,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -111,9 +112,7 @@ public class WorkflowExecuteThreadTest {
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
dag.setAccessible(true);
dag.set(workflowExecuteThread, new DAG());
- PowerMockito.doNothing().when(workflowExecuteThread, "executeProcess");
PowerMockito.doNothing().when(workflowExecuteThread, "prepareProcess");
- PowerMockito.doNothing().when(workflowExecuteThread, "runProcess");
PowerMockito.doNothing().when(workflowExecuteThread, "endProcess");
}
@@ -256,6 +255,36 @@ public class WorkflowExecuteThreadTest {
}
}
+ @Test
+ public void testCheckSerialProcess() {
+ try {
+ ProcessDefinition processDefinition1 = new ProcessDefinition();
+ processDefinition1.setId(123);
+ processDefinition1.setName("test");
+ processDefinition1.setVersion(1);
+ processDefinition1.setCode(11L);
+
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_WAIT);
+ Mockito.when(processInstance.getId()).thenReturn(225);
+
Mockito.when(processService.findProcessInstanceById(225)).thenReturn(processInstance);
+ workflowExecuteThread.checkSerialProcess(processDefinition1);
+
+ Mockito.when(processInstance.getId()).thenReturn(225);
+
Mockito.when(processInstance.getNextProcessInstanceId()).thenReturn(222);
+
+ ProcessInstance processInstance9 = new ProcessInstance();
+ processInstance9.setId(222);
+ processInstance9.setProcessDefinitionCode(11L);
+ processInstance9.setProcessDefinitionVersion(1);
+ processInstance9.setState(ExecutionStatus.SERIAL_WAIT);
+
+
Mockito.when(processService.findProcessInstanceById(225)).thenReturn(processInstance);
+
Mockito.when(processService.findProcessInstanceById(222)).thenReturn(processInstance9);
+ workflowExecuteThread.checkSerialProcess(processDefinition1);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
private List<Schedule> zeroSchedulerList() {
return Collections.emptyList();
}
@@ -268,4 +297,4 @@ public class WorkflowExecuteThreadTest {
return schedulerList;
}
-}
\ No newline at end of file
+}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 1c8e72c..c13fbda 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -100,6 +100,8 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
+import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@@ -200,6 +202,10 @@ public class ProcessService {
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+
+ @Autowired
+ StateEventCallbackService stateEventCallbackService;
+
@Autowired
private EnvironmentMapper environmentMapper;
@@ -222,12 +228,77 @@ public class ProcessService {
}
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
- saveProcessInstance(processInstance);
+ //if the processDefination is serial
+ ProcessDefinition processDefinition =
this.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
+ if (processDefinition.getExecutionType().typeIsSerial()) {
+ saveSerialProcess(processInstance,processDefinition);
+ if (processInstance.getState() !=
ExecutionStatus.SUBMITTED_SUCCESS) {
+ this.setSubProcessParam(processInstance);
+ this.commandMapper.deleteById(command.getId());
+ return null;
+ }
+ } else {
+ saveProcessInstance(processInstance);
+ }
this.setSubProcessParam(processInstance);
this.commandMapper.deleteById(command.getId());
return processInstance;
}
+ private void saveSerialProcess(ProcessInstance
processInstance,ProcessDefinition processDefinition) {
+ processInstance.setState(ExecutionStatus.SERIAL_WAIT);
+ saveProcessInstance(processInstance);
+ //serial wait
+ //when we get the running instance(or waiting instance) only get the
priority instance(by id)
+ if (processDefinition.getExecutionType().typeIsSerialWait()) {
+ while (true) {
+ List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+
Constants.RUNNING_PROCESS_STATE,processInstance.getId());
+ if (CollectionUtils.isEmpty(runningProcessInstances)) {
+
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+ saveProcessInstance(processInstance);
+ return;
+ }
+ ProcessInstance runningProcess =
runningProcessInstances.get(0);
+ if
(this.processInstanceMapper.updateNextProcessIdById(processInstance.getId(),
runningProcess.getId())) {
+ return;
+ }
+ }
+ } else if (processDefinition.getExecutionType().typeIsSerialDiscard())
{
+ List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+ Constants.RUNNING_PROCESS_STATE,processInstance.getId());
+ if (CollectionUtils.isEmpty(runningProcessInstances)) {
+ processInstance.setState(ExecutionStatus.STOP);
+ saveProcessInstance(processInstance);
+ }
+ } else if
(processDefinition.getExecutionType().typeIsSerialPriority()) {
+ List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+ Constants.RUNNING_PROCESS_STATE,processInstance.getId());
+ if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
+ for (ProcessInstance info : runningProcessInstances) {
+ info.setCommandType(CommandType.STOP);
+ info.addHistoryCmd(CommandType.STOP);
+ info.setState(ExecutionStatus.READY_STOP);
+ int update = updateProcessInstance(info);
+ // determine whether the process is normal
+ if (update > 0) {
+ String host = info.getHost();
+ String address = host.split(":")[0];
+ int port = Integer.parseInt(host.split(":")[1]);
+ StateEventChangeCommand stateEventChangeCommand = new
StateEventChangeCommand(
+ info.getId(), 0, info.getState(),
info.getId(), 0
+ );
+ try {
+ stateEventCallbackService.sendResult(address,
port, stateEventChangeCommand.convert2Command());
+ } catch (Exception e) {
+ logger.error("sendResultError");
+ }
+ }
+ }
+ }
+ }
+ }
+
/**
* save error command, and delete original command
*
@@ -2495,4 +2566,8 @@ public class ProcessService {
}
return processTaskMap;
}
+
+ public ProcessInstance loadNextProcess4Serial(long code, int state) {
+ return this.processInstanceMapper.loadNextProcess4Serial(code,state);
+ }
}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 2112563..b55aee5 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
@@ -86,7 +87,7 @@ import com.fasterxml.jackson.databind.JsonNode;
/**
* process service test
*/
-@RunWith(MockitoJUnitRunner.class)
+@RunWith(MockitoJUnitRunner.Silent.class)
public class ProcessServiceTest {
private static final Logger logger =
LoggerFactory.getLogger(CronUtilsTest.class);
@@ -257,16 +258,25 @@ public class ProcessServiceTest {
command1.setProcessDefinitionVersion(definitionVersion);
command1.setCommandParam("{\"ProcessInstanceId\":222}");
command1.setCommandType(CommandType.START_PROCESS);
+
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(123);
processDefinition.setName("test");
processDefinition.setVersion(definitionVersion);
processDefinition.setCode(definitionCode);
processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
+ processDefinition.setExecutionType(ProcessExecutionTypeEnum.PARALLEL);
+
ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(222);
+ processInstance.setProcessDefinitionCode(11L);
+ processInstance.setHost("127.0.0.1:5678");
+ processInstance.setProcessDefinitionVersion(1);
processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionCode(definitionCode);
processInstance.setProcessDefinitionVersion(definitionVersion);
+
+
Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode())).thenReturn(processDefinition);
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new
ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
@@ -309,6 +319,77 @@ public class ProcessServiceTest {
command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
ProcessInstance processInstance1 =
processService.handleCommand(logger, host, command5,
processDefinitionCacheMaps);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
+
+ ProcessDefinition processDefinition1 = new ProcessDefinition();
+ processDefinition1.setId(123);
+ processDefinition1.setName("test");
+ processDefinition1.setVersion(1);
+ processDefinition1.setCode(11L);
+ processDefinition1.setVersion(1);
+
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_WAIT);
+ List<ProcessInstance> lists = new ArrayList<>();
+ ProcessInstance processInstance11 = new ProcessInstance();
+ processInstance11.setId(222);
+ processInstance11.setProcessDefinitionCode(11L);
+ processInstance11.setProcessDefinitionVersion(1);
+ processInstance11.setHost("127.0.0.1:5678");
+ lists.add(processInstance11);
+
+ ProcessInstance processInstance2 = new ProcessInstance();
+ processInstance2.setId(223);
+ processInstance2.setProcessDefinitionCode(11L);
+ processInstance2.setProcessDefinitionVersion(1);
+
Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2);
+
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
+ Assert.assertNotNull(processService.handleCommand(logger, host,
command1, processDefinitionCacheMaps));
+ Command command6 = new Command();
+ command6.setProcessDefinitionCode(11L);
+ command6.setCommandParam("{\"ProcessInstanceId\":223}");
+ command6.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
+ command6.setProcessDefinitionVersion(1);
+
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L,Constants.RUNNING_PROCESS_STATE,223)).thenReturn(lists);
+ Mockito.when(processInstanceMapper.updateNextProcessIdById(223,
222)).thenReturn(true);
+ ProcessInstance processInstance6 =
processService.handleCommand(logger, host, command6,
processDefinitionCacheMaps);
+ Assert.assertTrue(processInstance6 != null);
+
+
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD);
+
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
+ ProcessInstance processInstance7 = new ProcessInstance();
+ processInstance7.setId(224);
+ processInstance7.setProcessDefinitionCode(11L);
+ processInstance7.setProcessDefinitionVersion(1);
+
Mockito.when(processInstanceMapper.queryDetailById(224)).thenReturn(processInstance7);
+
+ Command command7 = new Command();
+ command7.setProcessDefinitionCode(11L);
+ command7.setCommandParam("{\"ProcessInstanceId\":224}");
+ command7.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
+ command7.setProcessDefinitionVersion(1);
+
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L,Constants.RUNNING_PROCESS_STATE,224)).thenReturn(null);
+ ProcessInstance processInstance8 =
processService.handleCommand(logger, host, command7,
processDefinitionCacheMaps);
+ Assert.assertTrue(processInstance8 == null);
+
+ ProcessDefinition processDefinition2 = new ProcessDefinition();
+ processDefinition2.setId(123);
+ processDefinition2.setName("test");
+ processDefinition2.setVersion(1);
+ processDefinition2.setCode(12L);
+
processDefinition2.setExecutionType(ProcessExecutionTypeEnum.SERIAL_PRIORITY);
+
Mockito.when(processDefineMapper.queryByCode(12L)).thenReturn(processDefinition2);
+ ProcessInstance processInstance9 = new ProcessInstance();
+ processInstance9.setId(225);
+ processInstance9.setProcessDefinitionCode(11L);
+ processInstance9.setProcessDefinitionVersion(1);
+ Command command9 = new Command();
+ command9.setProcessDefinitionCode(12L);
+ command9.setCommandParam("{\"ProcessInstanceId\":225}");
+ command9.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
+ command9.setProcessDefinitionVersion(1);
+
Mockito.when(processInstanceMapper.queryDetailById(225)).thenReturn(processInstance9);
+
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L,Constants.RUNNING_PROCESS_STATE,0)).thenReturn(lists);
+
Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
+ ProcessInstance processInstance10 =
processService.handleCommand(logger, host, command9,
processDefinitionCacheMaps);
+ Assert.assertTrue(processInstance10 == null);
}
@Test
diff --git a/sql/dolphinscheduler_h2.sql b/sql/dolphinscheduler_h2.sql
index 172686f..7c4f301 100644
--- a/sql/dolphinscheduler_h2.sql
+++ b/sql/dolphinscheduler_h2.sql
@@ -412,6 +412,7 @@ CREATE TABLE t_ds_process_definition
warning_group_id int(11) DEFAULT NULL,
timeout int(11) DEFAULT '0',
tenant_id int(11) NOT NULL DEFAULT '-1',
+ execution_type tinyint(4) DEFAULT '0',
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
PRIMARY KEY (id),
@@ -443,6 +444,7 @@ CREATE TABLE t_ds_process_definition_log
warning_group_id int(11) DEFAULT NULL,
timeout int(11) DEFAULT '0',
tenant_id int(11) NOT NULL DEFAULT '-1',
+ execution_type tinyint(4) DEFAULT '0',
operator int(11) DEFAULT NULL,
operate_time datetime DEFAULT NULL,
create_time datetime NOT NULL,
@@ -595,6 +597,7 @@ CREATE TABLE t_ds_process_instance
worker_group varchar(64) DEFAULT NULL,
environment_code bigint(20) DEFAULT '-1',
timeout int(11) DEFAULT '0',
+ next_process_instance_id int(11) DEFAULT '0',
tenant_id int(11) NOT NULL DEFAULT '-1',
var_pool longtext,
dry_run int NULL DEFAULT 0,
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index 5a27912..cd4c84b 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -414,6 +414,7 @@ CREATE TABLE `t_ds_process_definition` (
`warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id',
`timeout` int(11) DEFAULT '0' COMMENT 'time out, unit: minute',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
+ `execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type
0:parallel,1:serial wait,2:serial discard,3:serial priority',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`,`code`),
@@ -443,6 +444,7 @@ CREATE TABLE `t_ds_process_definition_log` (
`warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id',
`timeout` int(11) DEFAULT '0' COMMENT 'time out,unit: minute',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
+ `execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type
0:parallel,1:serial wait,2:serial discard,3:serial priority',
`operator` int(11) DEFAULT NULL COMMENT 'operator user id',
`operate_time` datetime DEFAULT NULL COMMENT 'operate time',
`create_time` datetime NOT NULL COMMENT 'create time',
@@ -593,6 +595,7 @@ CREATE TABLE `t_ds_process_instance` (
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`var_pool` longtext COMMENT 'var_pool',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flagļ¼0 normal, 1 dry run',
+ `next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next
processInstanceId',
PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`) USING BTREE
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index 826e377..8c39bb1 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -331,6 +331,7 @@ CREATE TABLE t_ds_process_definition (
flag int DEFAULT NULL ,
timeout int DEFAULT '0' ,
tenant_id int DEFAULT '-1' ,
+ execution_type int DEFAULT '0',
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id) ,
@@ -355,6 +356,7 @@ CREATE TABLE t_ds_process_definition_log (
flag int DEFAULT NULL ,
timeout int DEFAULT '0' ,
tenant_id int DEFAULT '-1' ,
+ execution_type int DEFAULT '0',
operator int DEFAULT NULL ,
operate_time timestamp DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
@@ -498,6 +500,7 @@ CREATE TABLE t_ds_process_instance (
tenant_id int NOT NULL DEFAULT '-1' ,
var_pool text ,
dry_run int DEFAULT '0' ,
+ next_process_instance_id int DEFAULT '0',
PRIMARY KEY (id)
) ;