This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split_two
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split_two by this push:
new ee4e64a [Feature][JsonSplit-api] refactor method of task save (#6067)
ee4e64a is described below
commit ee4e64a9a05751a5bccd7ef729c97a75d08033c1
Author: JinyLeeChina <[email protected]>
AuthorDate: Tue Aug 31 15:17:06 2021 +0800
[Feature][JsonSplit-api] refactor method of task save (#6067)
* refactor method of task save
* fix ut
* fix ut
Co-authored-by: JinyLeeChina <[email protected]>
---
.../api/controller/ProcessInstanceController.java | 5 +-
.../apache/dolphinscheduler/api/enums/Status.java | 1 +
.../api/service/ProcessInstanceService.java | 2 +
.../service/impl/ProcessDefinitionServiceImpl.java | 112 +++++++++++++--------
.../service/impl/ProcessInstanceServiceImpl.java | 46 ++++++---
.../service/impl/TaskDefinitionServiceImpl.java | 83 +++------------
.../api/service/ProcessInstanceServiceTest.java | 20 ++--
.../api/service/TaskDefinitionServiceImplTest.java | 17 +++-
.../dolphinscheduler/common/utils/JSONUtils.java | 1 -
.../dao/entity/ProcessTaskRelation.java | 33 +++---
.../dao/entity/TaskDefinition.java | 10 +-
.../service/process/ProcessService.java | 88 ++++++++++++++--
12 files changed, 255 insertions(+), 163 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index f46e7d8..bd1fa85 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -162,6 +162,7 @@ public class ProcessInstanceController extends
BaseController {
* @param loginUser login user
* @param projectCode project code
* @param taskRelationJson process task relation json
+ * @param taskDefinitionJson taskDefinitionJson
* @param processInstanceId process instance id
* @param scheduleTime schedule time
* @param syncDefine sync define
@@ -172,6 +173,7 @@ public class ProcessInstanceController extends
BaseController {
@ApiOperation(value = "updateProcessInstance", notes =
"UPDATE_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskRelationJson", value =
"TASK_RELATION_JSON", type = "String"),
+ @ApiImplicitParam(name = "taskDefinitionJson", value =
"TASK_DEFINITION_JSON", type = "String"),
@ApiImplicitParam(name = "processInstanceId", value =
"PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", type
= "String"),
@ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required
= true, type = "Boolean"),
@@ -187,6 +189,7 @@ public class ProcessInstanceController extends
BaseController {
public Result updateProcessInstance(@ApiIgnore @RequestAttribute(value =
Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value
= "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value =
"taskRelationJson", required = true) String taskRelationJson,
+ @RequestParam(value =
"taskDefinitionJson", required = true) String taskDefinitionJson,
@RequestParam(value =
"processInstanceId") Integer processInstanceId,
@RequestParam(value = "scheduleTime",
required = false) String scheduleTime,
@RequestParam(value = "syncDefine",
required = true) Boolean syncDefine,
@@ -195,7 +198,7 @@ public class ProcessInstanceController extends
BaseController {
@RequestParam(value = "timeout",
required = false, defaultValue = "0") int timeout,
@RequestParam(value = "tenantCode",
required = true) String tenantCode) {
Map<String, Object> result =
processInstanceService.updateProcessInstance(loginUser, projectCode,
processInstanceId,
- taskRelationJson, scheduleTime, syncDefine, globalParams,
locations, timeout, tenantCode);
+ taskRelationJson, taskDefinitionJson, scheduleTime, syncDefine,
globalParams, locations, timeout, tenantCode);
return returnDataList(result);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index ac3293c..ca60fd8 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -276,6 +276,7 @@ public enum Status {
DELETE_TASK_DEFINE_BY_CODE_ERROR(50042, "delete task definition by code
error", "删除任务定义错误"),
QUERY_DETAIL_OF_TASK_DEFINITION_ERROR(50043, "query detail of task
definition error", "查询任务详细信息错误"),
QUERY_TASK_DEFINITION_LIST_PAGING_ERROR(50044, "query task definition list
paging error", "分页查询任务定义列表错误"),
+ TASK_DEFINITION_NAME_EXISTED(50045, "task definition name [{0}] already
exists", "任务定义名称[{0}]已经存在"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/**
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 0190d17..3dbf46d 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -116,6 +116,7 @@ public interface ProcessInstanceService {
* @param loginUser login user
* @param projectCode project code
* @param taskRelationJson process task relation json
+ * @param taskDefinitionJson taskDefinitionJson
* @param processInstanceId process instance id
* @param scheduleTime schedule time
* @param syncDefine sync define
@@ -129,6 +130,7 @@ public interface ProcessInstanceService {
long projectCode,
Integer processInstanceId,
String taskRelationJson,
+ String taskDefinitionJson,
String scheduleTime,
Boolean syncDefine,
String globalParams,
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index c4f9114..1750098 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -27,7 +27,6 @@ import
org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
-import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -61,7 +60,6 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
@@ -121,9 +119,6 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
private ProjectService projectService;
@Autowired
- private TaskDefinitionService taskDefinitionService;
-
- @Autowired
private UserMapper userMapper;
@Autowired
@@ -147,21 +142,6 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
- @Autowired
- private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
-
- @Autowired
- TaskDefinitionLogMapper taskDefinitionLogMapper;
-
- @Autowired
- private TaskDefinitionMapper taskDefinitionMapper;
-
- @Autowired
- private SchedulerService schedulerService;
-
- @Autowired
- private TenantMapper tenantMapper;
-
/**
* create process definition
*
@@ -178,7 +158,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
* @return create result code
*/
@Override
- @Transactional(rollbackFor = Exception.class)
+ @Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> createProcessDefinition(User loginUser,
long projectCode,
String name,
@@ -202,9 +182,13 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
return result;
}
-
+ List<TaskDefinitionLog> taskDefinitionLogs =
JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
+ createTaskDefinition(result, loginUser, projectCode,
taskDefinitionLogs, taskDefinitionJson);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
List<ProcessTaskRelationLog> taskRelationList =
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
- Map<String, Object> checkRelationJson =
checkTaskRelationList(taskRelationList, taskRelationJson);
+ Map<String, Object> checkRelationJson =
checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
return checkRelationJson;
}
@@ -215,8 +199,6 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
- taskDefinitionService.createTaskDefinition(loginUser, projectCode,
taskDefinitionJson);
-
long processDefinitionCode;
try {
processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
@@ -227,16 +209,59 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
ProcessDefinition processDefinition = new
ProcessDefinition(projectCode, name, processDefinitionCode, description,
globalParams, locations, timeout, loginUser.getId(),
tenant.getId());
- return createProcessDefine(loginUser, result, taskRelationList,
processDefinition);
+ return createProcessDefine(loginUser, result, taskRelationList,
processDefinition, taskDefinitionLogs);
+ }
+
+ @Autowired
+ TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+ @Autowired
+ private TaskDefinitionMapper taskDefinitionMapper;
+
+ @Autowired
+ private SchedulerService schedulerService;
+
+ @Autowired
+ private TenantMapper tenantMapper;
+
+ private void createTaskDefinition(Map<String, Object> result,
+ User loginUser,
+ long projectCode,
+ List<TaskDefinitionLog>
taskDefinitionLogs,
+ String taskDefinitionJson) {
+ if (taskDefinitionLogs.isEmpty()) {
+ logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson);
+ putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
+ return;
+ }
+ for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+ if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
+ logger.error("task definition {} parameter invalid",
taskDefinitionLog.getName());
+ putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID,
taskDefinitionLog.getName());
+ return;
+ }
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByName(projectCode, taskDefinitionLog.getName());
+ if (taskDefinition != null) {
+ logger.error("task definition name {} already exists",
taskDefinitionLog.getName());
+ putMsg(result, Status.TASK_DEFINITION_NAME_EXISTED,
taskDefinitionLog.getName());
+ return;
+ }
+ }
+ if (processService.saveTaskDefine(loginUser, projectCode,
taskDefinitionLogs)) {
+ putMsg(result, Status.SUCCESS);
+ } else {
+ putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
+ }
}
private Map<String, Object> createProcessDefine(User loginUser,
Map<String, Object> result,
List<ProcessTaskRelationLog> taskRelationList,
- ProcessDefinition
processDefinition) {
+ ProcessDefinition
processDefinition,
+ List<TaskDefinitionLog>
taskDefinitionLogs) {
int insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, true);
if (insertVersion > 0) {
- int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion,
taskRelationList);
+ int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion,
taskRelationList, taskDefinitionLogs);
if (insertResult > 0) {
putMsg(result, Status.SUCCESS);
// return processDefinitionCode
@@ -250,7 +275,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
- private Map<String, Object>
checkTaskRelationList(List<ProcessTaskRelationLog> taskRelationList, String
taskRelationJson) {
+ private Map<String, Object>
checkTaskRelationList(List<ProcessTaskRelationLog> taskRelationList, String
taskRelationJson, List<TaskDefinitionLog> taskDefinitionLogs) {
Map<String, Object> result = new HashMap<>();
try {
if (taskRelationList == null || taskRelationList.isEmpty()) {
@@ -259,7 +284,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
- List<TaskNode> taskNodeList =
processService.transformTask(taskRelationList);
+ List<TaskNode> taskNodeList =
processService.transformTask(taskRelationList, taskDefinitionLogs);
if (taskNodeList.size() != taskRelationList.size()) {
Set<Long> postTaskCodes =
taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet());
Set<Long> taskNodeCodes =
taskNodeList.stream().map(TaskNode::getCode).collect(Collectors.toSet());
@@ -276,7 +301,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
// check whether the task relation json is normal
for (ProcessTaskRelationLog processTaskRelationLog :
taskRelationList) {
- if (processTaskRelationLog.getPostTaskCode() == 0 ||
processTaskRelationLog.getPostTaskVersion() == 0) {
+ if (processTaskRelationLog.getPostTaskCode() == 0) {
logger.error("the post_task_code or post_task_version
can't be zero");
putMsg(result, Status.CHECK_PROCESS_TASK_RELATION_ERROR);
return result;
@@ -419,7 +444,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
* @param taskDefinitionJson taskDefinitionJson
* @return update result code
*/
- @Transactional(rollbackFor = Exception.class)
+ @Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> updateProcessDefinition(User loginUser,
long projectCode,
@@ -439,8 +464,13 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
+ List<TaskDefinitionLog> taskDefinitionLogs =
JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
+ createTaskDefinition(result, loginUser, projectCode,
taskDefinitionLogs, taskDefinitionJson);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
List<ProcessTaskRelationLog> taskRelationList =
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
- Map<String, Object> checkRelationJson =
checkTaskRelationList(taskRelationList, taskRelationJson);
+ Map<String, Object> checkRelationJson =
checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
return checkRelationJson;
}
@@ -470,20 +500,20 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
}
- taskDefinitionService.createTaskDefinition(loginUser, projectCode,
taskDefinitionJson);
processDefinition.set(projectCode, name, description, globalParams,
locations, timeout, tenant.getId());
- return updateProcessDefine(loginUser, result, taskRelationList,
processDefinition);
+ return updateProcessDefine(loginUser, result, taskRelationList,
processDefinition, taskDefinitionLogs);
}
private Map<String, Object> updateProcessDefine(User loginUser,
Map<String, Object> result,
List<ProcessTaskRelationLog> taskRelationList,
- ProcessDefinition
processDefinition) {
+ ProcessDefinition
processDefinition,
+ List<TaskDefinitionLog>
taskDefinitionLogs) {
processDefinition.setUpdateTime(new Date());
int insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, true);
if (insertVersion > 0) {
int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(),
- processDefinition.getCode(), insertVersion, taskRelationList);
+ processDefinition.getCode(), insertVersion, taskRelationList,
taskDefinitionLogs);
if (insertResult > 0) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@@ -818,7 +848,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
});
- Map<String, Object> createProcessResult =
createProcessDefine(loginUser, result, taskRelationList, processDefinition);
+ Map<String, Object> createProcessResult =
createProcessDefine(loginUser, result, taskRelationList, processDefinition,
null);
if (Status.SUCCESS.equals(createProcessResult.get(Constants.STATUS))) {
putMsg(createProcessResult, Status.SUCCESS);
} else {
@@ -894,7 +924,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
List<ProcessTaskRelationLog> taskRelationList =
JSONUtils.toList(processTaskRelationJson, ProcessTaskRelationLog.class);
// Check whether the task node is normal
- List<TaskNode> taskNodes =
processService.transformTask(taskRelationList);
+ List<TaskNode> taskNodes =
processService.transformTask(taskRelationList, Lists.newArrayList());
if (CollectionUtils.isEmpty(taskNodes)) {
logger.error("process node info is empty");
@@ -1254,9 +1284,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
processDefinition.setProjectCode(targetProjectCode);
if (isCopy) {
processDefinition.setName(processDefinition.getName() +
"_copy_" + DateUtils.getCurrentTimeStamp());
- createProcessDefine(loginUser, result, taskRelationList,
processDefinition);
+ createProcessDefine(loginUser, result, taskRelationList,
processDefinition, Lists.newArrayList());
} else {
- updateProcessDefine(loginUser, result, taskRelationList,
processDefinition);
+ updateProcessDefine(loginUser, result, taskRelationList,
processDefinition, Lists.newArrayList());
}
if (result.get(Constants.STATUS) != Status.SUCCESS) {
failedProcessList.add(processDefinition.getCode() + "[" +
processDefinition.getName() + "]");
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 9a6fa3e..9007e00 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -33,6 +33,7 @@ import
org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.UsersService;
+import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
@@ -401,6 +402,7 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
* @param loginUser login user
* @param projectCode project code
* @param taskRelationJson process task relation json
+ * @param taskDefinitionJson taskDefinitionJson
* @param processInstanceId process instance id
* @param scheduleTime schedule time
* @param syncDefine sync define
@@ -413,7 +415,7 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
@Transactional
@Override
public Map<String, Object> updateProcessInstance(User loginUser, long
projectCode, Integer processInstanceId, String taskRelationJson,
- String scheduleTime,
Boolean syncDefine, String globalParams,
+ String
taskDefinitionJson, String scheduleTime, Boolean syncDefine, String
globalParams,
String locations, int
timeout, String tenantCode) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
@@ -433,26 +435,42 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
processInstance.getName(),
processInstance.getState().toString(), "update");
return result;
}
- ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
- List<ProcessTaskRelationLog> taskRelationList =
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
- //check workflow json is valid
- result =
processDefinitionService.checkProcessNodeList(taskRelationJson);
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
- return result;
- }
- Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
- if (tenant == null) {
- putMsg(result, Status.TENANT_NOT_EXIST);
- return result;
- }
setProcessInstance(processInstance, tenantCode, scheduleTime,
globalParams, timeout);
if (Boolean.TRUE.equals(syncDefine)) {
+ List<TaskDefinitionLog> taskDefinitionLogs =
JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
+ if (taskDefinitionLogs.isEmpty()) {
+ putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
+ return result;
+ }
+ for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+ if
(!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
+ putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID,
taskDefinitionLog.getName());
+ return result;
+ }
+ }
+ if (!processService.saveTaskDefine(loginUser, projectCode,
taskDefinitionLogs)) {
+ putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
+ return result;
+ }
+ ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+ List<ProcessTaskRelationLog> taskRelationList =
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
+ //check workflow json is valid
+ result =
processDefinitionService.checkProcessNodeList(taskRelationJson);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+ Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
+ if (tenant == null) {
+ putMsg(result, Status.TENANT_NOT_EXIST);
+ return result;
+ }
+
processDefinition.set(projectCode, processDefinition.getName(),
processDefinition.getDescription(), globalParams, locations, timeout,
tenant.getId());
processDefinition.setUpdateTime(new Date());
int insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, false);
if (insertVersion > 0) {
int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(),
- processDefinition.getCode(), insertVersion,
taskRelationList);
+ processDefinition.getCode(), insertVersion,
taskRelationList, taskDefinitionLogs);
if (insertResult > 0) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 1a078dc..35f3f1e 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -111,85 +111,28 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
return result;
}
- int totalSuccessNumber = 0;
- List<Long> totalSuccessCode = new ArrayList<>();
- Date now = new Date();
- List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
- List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
logger.error("task definition {} parameter invalid",
taskDefinitionLog.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID,
taskDefinitionLog.getName());
return result;
}
- taskDefinitionLog.setProjectCode(projectCode);
- taskDefinitionLog.setUpdateTime(now);
- taskDefinitionLog.setOperateTime(now);
- taskDefinitionLog.setOperator(loginUser.getId());
- if (taskDefinitionLog.getCode() > 0 &&
taskDefinitionLog.getVersion() > 0) {
- TaskDefinitionLog definitionCodeAndVersion =
taskDefinitionLogMapper
-
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(),
taskDefinitionLog.getVersion());
- if (definitionCodeAndVersion != null) {
- if (!taskDefinitionLog.equals(definitionCodeAndVersion)) {
-
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
- Integer version =
taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode());
- if (version == null || version == 0) {
- putMsg(result, Status.DATA_IS_NOT_VALID,
taskDefinitionLog.getCode());
- return result;
- }
- taskDefinitionLog.setVersion(version + 1);
-
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
- updateTaskDefinitionLogs.add(taskDefinitionLog);
- totalSuccessCode.add(taskDefinitionLog.getCode());
- }
- continue;
- }
- }
- taskDefinitionLog.setUserId(loginUser.getId());
- taskDefinitionLog.setVersion(1);
- taskDefinitionLog.setCreateTime(now);
- if (taskDefinitionLog.getCode() == 0) {
- long code;
- try {
- code = SnowFlakeUtils.getInstance().nextId();
- taskDefinitionLog.setVersion(1);
- taskDefinitionLog.setCode(code);
- } catch (SnowFlakeException e) {
- logger.error("Task code get error, ", e);
- putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error
generating task definition code");
- return result;
- }
- }
- totalSuccessCode.add(taskDefinitionLog.getCode());
- newTaskDefinitionLogs.add(taskDefinitionLog);
- totalSuccessNumber++;
- }
- for (TaskDefinitionLog taskDefinitionToUpdate :
updateTaskDefinitionLogs) {
- TaskDefinition task =
taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode());
- if (task == null) {
- newTaskDefinitionLogs.add(taskDefinitionToUpdate);
- } else {
- int update =
taskDefinitionMapper.updateById(taskDefinitionToUpdate);
- int insert =
taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
- if ((update & insert) != 1) {
- putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
- return result;
- }
- }
- }
- if (!newTaskDefinitionLogs.isEmpty()) {
- int insert =
taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
- int logInsert =
taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
- if ((logInsert & insert) == 0) {
- putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByName(projectCode, taskDefinitionLog.getName());
+ if (taskDefinition != null) {
+ logger.error("task definition name {} already exists",
taskDefinitionLog.getName());
+ putMsg(result, Status.TASK_DEFINITION_NAME_EXISTED,
taskDefinitionLog.getName());
return result;
}
}
- Map<String, Object> resData = new HashMap<>();
- resData.put("total", totalSuccessNumber);
- resData.put("code", totalSuccessCode);
- putMsg(result, Status.SUCCESS);
- result.put(Constants.DATA_LIST, resData);
+ if (processService.saveTaskDefine(loginUser, projectCode,
taskDefinitionLogs)) {
+ Map<String, Object> resData = new HashMap<>();
+ resData.put("total", taskDefinitionLogs.size());
+ resData.put("code",
StringUtils.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()),
","));
+ putMsg(result, Status.SUCCESS);
+ result.put(Constants.DATA_LIST, resData);
+ } else {
+ putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
+ }
return result;
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index 820d5fb..fa68525 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -116,6 +116,14 @@ public class ProcessInstanceServiceTest {
+
"\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+
"\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
+ private String taskJson =
"[{\"name\":\"shell1\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
+ + "\"localParams\":[],\"rawScript\":\"echo
1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},"
+ +
"\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\","
+ +
"\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"},{\"name\":\"shell2\",\"description\":\"\","
+ +
"\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
2\",\"conditionResult\":{\"successNode\""
+ +
":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\","
+ +
"\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"}]";
+
@Test
public void testQueryProcessInstanceList() {
long projectCode = 1L;
@@ -372,7 +380,7 @@ public class ProcessInstanceServiceTest {
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
Map<String, Object> proejctAuthFailRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
- shellJson, "2020-02-21 00:00:00", true, "", "", 0, "");
+ shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, "");
Assert.assertEquals(Status.PROJECT_NOT_FOUNT,
proejctAuthFailRes.get(Constants.STATUS));
//process instance null
@@ -382,7 +390,7 @@ public class ProcessInstanceServiceTest {
when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
when(processService.findProcessInstanceDetailById(1)).thenReturn(null);
Map<String, Object> processInstanceNullRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
- shellJson, "2020-02-21 00:00:00", true, "", "", 0, "");
+ shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "");
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST,
processInstanceNullRes.get(Constants.STATUS));
//process instance not finish
@@ -390,7 +398,7 @@ public class ProcessInstanceServiceTest {
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> processInstanceNotFinishRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
- shellJson, "2020-02-21 00:00:00", true, "", "", 0, "");
+ shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "");
Assert.assertEquals(Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstanceNotFinishRes.get(Constants.STATUS));
//process instance finish
@@ -410,15 +418,15 @@ public class ProcessInstanceServiceTest {
when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result);
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> processInstanceFinishRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
- shellJson, "2020-02-21 00:00:00", true, "", "", 0, "root");
- Assert.assertEquals(Status.UPDATE_PROCESS_DEFINITION_ERROR,
processInstanceFinishRes.get(Constants.STATUS));
+ shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0,
"root");
+ Assert.assertEquals(Status.CREATE_TASK_DEFINITION_ERROR,
processInstanceFinishRes.get(Constants.STATUS));
//success
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> successRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
- shellJson, "2020-02-21 00:00:00", false, "", "", 0, "root");
+ shellJson, taskJson,"2020-02-21 00:00:00", false, "", "", 0,
"root");
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index 72dbe39..18d4214 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -95,9 +95,8 @@ public class TaskDefinitionServiceImplTest {
+
"\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+
"\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+
"\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
- List<TaskDefinition> taskDefinitions =
JSONUtils.toList(createTaskDefinitionJson, TaskDefinition.class);
-
Mockito.when(taskDefinitionMapper.batchInsert(Mockito.anyList())).thenReturn(1);
-
Mockito.when(taskDefinitionLogMapper.batchInsert(Mockito.anyList())).thenReturn(1);
+ List<TaskDefinitionLog> taskDefinitions =
JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class);
+ Mockito.when(processService.saveTaskDefine(loginUser, projectCode,
taskDefinitions)).thenReturn(true);
Map<String, Object> relation = taskDefinitionService
.createTaskDefinition(loginUser, projectCode,
createTaskDefinitionJson);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
@@ -250,11 +249,23 @@ public class TaskDefinitionServiceImplTest {
+
"\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
List<TaskDefinitionLog> taskDefinitionLogs =
JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
Assert.assertFalse(taskDefinitionLogs.isEmpty());
+ String taskJson =
"[{\"name\":\"shell1\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
+ + "\"localParams\":[],\"rawScript\":\"echo
1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},"
+ +
"\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\","
+ +
"\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"},{\"name\":\"shell2\",\"description\":\"\","
+ +
"\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
2\",\"conditionResult\":{\"successNode\""
+ +
":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\","
+ +
"\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"}]";
+ taskDefinitionLogs = JSONUtils.toList(taskJson,
TaskDefinitionLog.class);
+ Assert.assertFalse(taskDefinitionLogs.isEmpty());
String taskParams =
"{\"resourceList\":[],\"localParams\":[{\"prop\":\"datetime\",\"direct\":\"IN\",\"type\":\"VARCHAR\","
+ "\"value\":\"${system.datetime}\"}],\"rawScript\":\"echo
${datetime}\",\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
+ "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":{}}";
ShellParameters parameters = JSONUtils.parseObject(taskParams,
ShellParameters.class);
Assert.assertNotNull(parameters);
+ String params =
"{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";
+ ShellParameters parameters1 = JSONUtils.parseObject(params,
ShellParameters.class);
+ Assert.assertNotNull(parameters1);
}
@Test
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
index 2a1e2ac..0b1edec 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
@@ -159,7 +159,6 @@ public class JSONUtils {
}
try {
-
CollectionType listType =
objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, clazz);
return objectMapper.readValue(json, listType);
} catch (Exception e) {
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
index 1dae5d8..27162d0 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.ConditionType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.Date;
@@ -25,6 +26,8 @@ import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
/**
* process task relation
@@ -86,6 +89,8 @@ public class ProcessTaskRelation {
/**
* condition parameters
*/
+ @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+ @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String conditionParams;
/**
@@ -236,19 +241,19 @@ public class ProcessTaskRelation {
@Override
public String toString() {
return "ProcessTaskRelation{"
- + "id=" + id
- + ", name='" + name + '\''
- + ", processDefinitionVersion=" + processDefinitionVersion
- + ", projectCode=" + projectCode
- + ", processDefinitionCode=" + processDefinitionCode
- + ", preTaskCode=" + preTaskCode
- + ", preTaskVersion=" + preTaskVersion
- + ", postTaskCode=" + postTaskCode
- + ", postTaskVersion=" + postTaskVersion
- + ", conditionType=" + conditionType
- + ", conditionParams='" + conditionParams + '\''
- + ", createTime=" + createTime
- + ", updateTime=" + updateTime
- + '}';
+ + "id=" + id
+ + ", name='" + name + '\''
+ + ", processDefinitionVersion=" + processDefinitionVersion
+ + ", projectCode=" + projectCode
+ + ", processDefinitionCode=" + processDefinitionCode
+ + ", preTaskCode=" + preTaskCode
+ + ", preTaskVersion=" + preTaskVersion
+ + ", postTaskCode=" + postTaskCode
+ + ", postTaskVersion=" + postTaskVersion
+ + ", conditionType=" + conditionType
+ + ", conditionParams='" + conditionParams + '\''
+ + ", createTime=" + createTime
+ + ", updateTime=" + updateTime
+ + '}';
}
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index a8a5ccd..1e479c8 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -38,6 +38,8 @@ import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
/**
* task definition
@@ -89,6 +91,8 @@ public class TaskDefinition {
/**
* user defined parameters
*/
+ @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+ @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String taskParams;
/**
@@ -438,12 +442,6 @@ public class TaskDefinition {
}
@Override
- public int hashCode() {
- return Objects.hash(name, description, taskType, taskParams, flag,
taskPriority, workerGroup, failRetryTimes,
- failRetryInterval, timeoutFlag, timeoutNotifyStrategy, timeout,
delayTime, resourceIds);
- }
-
- @Override
public String toString() {
return "TaskDefinition{"
+ "id=" + id
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 48c144d..9f0f312 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -57,6 +57,8 @@ import
org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
+import
org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
@@ -123,6 +125,7 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
+import com.facebook.presto.jdbc.internal.guava.collect.Lists;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -2090,6 +2093,63 @@ public class ProcessService {
return StringUtils.join(resourceIds, ",");
}
+ public boolean saveTaskDefine(User operator, long projectCode,
List<TaskDefinitionLog> taskDefinitionLogs) {
+ Date now = new Date();
+ List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
+ List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
+ for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+ taskDefinitionLog.setProjectCode(projectCode);
+ taskDefinitionLog.setUpdateTime(now);
+ taskDefinitionLog.setOperateTime(now);
+ taskDefinitionLog.setOperator(operator.getId());
+
taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog));
+ if (taskDefinitionLog.getCode() > 0 &&
taskDefinitionLog.getVersion() > 0) {
+ TaskDefinitionLog definitionCodeAndVersion =
taskDefinitionLogMapper
+
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(),
taskDefinitionLog.getVersion());
+ if (definitionCodeAndVersion != null) {
+ if (!taskDefinitionLog.equals(definitionCodeAndVersion)) {
+
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
+ Integer version =
taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode());
+ taskDefinitionLog.setVersion(version + 1);
+
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
+ updateTaskDefinitionLogs.add(taskDefinitionLog);
+ }
+ continue;
+ }
+ }
+ taskDefinitionLog.setUserId(operator.getId());
+ taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
+ taskDefinitionLog.setCreateTime(now);
+ if (taskDefinitionLog.getCode() == 0) {
+ try {
+
taskDefinitionLog.setCode(SnowFlakeUtils.getInstance().nextId());
+ } catch (SnowFlakeException e) {
+ logger.error("Task code get error, ", e);
+ return false;
+ }
+ }
+ newTaskDefinitionLogs.add(taskDefinitionLog);
+ }
+ for (TaskDefinitionLog taskDefinitionToUpdate :
updateTaskDefinitionLogs) {
+ TaskDefinition task =
taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode());
+ if (task == null) {
+ newTaskDefinitionLogs.add(taskDefinitionToUpdate);
+ } else {
+ int update =
taskDefinitionMapper.updateById(taskDefinitionToUpdate);
+ int insert =
taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
+ if ((update & insert) != 1) {
+ return false;
+ }
+ }
+ }
+ if (!newTaskDefinitionLogs.isEmpty()) {
+ int insert =
taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
+ int logInsert =
taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
+ return (logInsert & insert) != 0;
+ }
+ return true;
+ }
+
/**
* save processDefinition (including create or update processDefinition)
*/
@@ -2116,21 +2176,33 @@ public class ProcessService {
* save task relations
*/
public int saveTaskRelation(User operator, long projectCode, long
processDefinitionCode, int processDefinitionVersion,
- List<ProcessTaskRelationLog> taskRelationList)
{
+ List<ProcessTaskRelationLog> taskRelationList,
List<TaskDefinitionLog> taskDefinitionLogs) {
List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(projectCode,
processDefinitionCode);
}
+ Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+ if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
+ taskDefinitionLogMap = taskDefinitionLogs.stream()
+ .collect(Collectors.toMap(TaskDefinition::getCode,
taskDefinitionLog -> taskDefinitionLog));
+ }
Date now = new Date();
- taskRelationList.forEach(processTaskRelationLog -> {
+ for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList)
{
processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+ if (taskDefinitionLogMap != null) {
+ TaskDefinitionLog taskDefinitionLog =
taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+ if (taskDefinitionLog != null) {
+
processTaskRelationLog.setPreTaskVersion(taskDefinitionLog.getVersion());
+ }
+
processTaskRelationLog.setPostTaskVersion(taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode()).getVersion());
+ }
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLog.setOperator(operator.getId());
processTaskRelationLog.setOperateTime(now);
- });
+ }
int result = processTaskRelationMapper.batchInsert(taskRelationList);
int resultLog =
processTaskRelationLogMapper.batchInsert(taskRelationList);
return result & resultLog;
@@ -2162,7 +2234,7 @@ public class ProcessService {
*/
public DAG<String, TaskNode, TaskNodeRelation>
genDagGraph(ProcessDefinition processDefinition) {
List<ProcessTaskRelationLog> processTaskRelations =
processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(),
processDefinition.getVersion());
- List<TaskNode> taskNodeList = transformTask(processTaskRelations);
+ List<TaskNode> taskNodeList = transformTask(processTaskRelations,
Lists.newArrayList());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new
ArrayList<>(processTaskRelations));
// Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
@@ -2288,7 +2360,7 @@ public class ProcessService {
/**
* Use temporarily before refactoring taskNode
*/
- public List<TaskNode> transformTask(List<ProcessTaskRelationLog>
taskRelationList) {
+ public List<TaskNode> transformTask(List<ProcessTaskRelationLog>
taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs) {
Map<Long, List<Long>> taskCodeMap = new HashMap<>();
for (ProcessTaskRelationLog processTaskRelation : taskRelationList) {
taskCodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v)
-> {
@@ -2301,7 +2373,9 @@ public class ProcessService {
return v;
});
}
- List<TaskDefinitionLog> taskDefinitionLogs =
genTaskDefineList(taskRelationList);
+ if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
+ taskDefinitionLogs = genTaskDefineList(taskRelationList);
+ }
Map<Long, TaskDefinitionLog> taskDefinitionLogMap =
taskDefinitionLogs.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode,
taskDefinitionLog -> taskDefinitionLog));
List<TaskNode> taskNodeList = new ArrayList<>();
@@ -2318,7 +2392,7 @@ public class ProcessService {
taskNode.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
taskNode.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
Map<String, Object> taskParamsMap =
taskNode.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
- taskNode.setConditionResult((String)
taskParamsMap.get(Constants.CONDITION_RESULT));
+
taskNode.setConditionResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.CONDITION_RESULT)));
taskNode.setDependence(JSONUtils.toJsonString(taskParamsMap.get(Constants.DEPENDENCE)));
taskParamsMap.remove(Constants.CONDITION_RESULT);
taskParamsMap.remove(Constants.DEPENDENCE);