This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 26dad5e2a8 Fix update TaskDefinition error (#12060)
26dad5e2a8 is described below
commit 26dad5e2a8ee66e599e7c7da74bc408d4b9f67f6
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Sep 20 22:23:16 2022 +0800
Fix update TaskDefinition error (#12060)
---
.../service/impl/TaskDefinitionServiceImpl.java | 296 +++++++++++++--------
.../service/process/ProcessServiceImpl.java | 45 +++-
2 files changed, 221 insertions(+), 120 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 9e30359d76..947dfc2639 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -17,8 +17,16 @@
package org.apache.dolphinscheduler.api.service.impl;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_VERSION_VIEW;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
+
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.permission.PermissionCheck;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -46,7 +54,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
-import org.apache.dolphinscheduler.api.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
@@ -73,8 +80,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*;
-
/**
* task definition service impl
*/
@@ -122,8 +127,9 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
long projectCode,
String taskDefinitionJson)
{
Project project = projectMapper.queryByCode(projectCode);
- //check user access for project
- Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
TASK_DEFINITION_CREATE);
+ // check user access for project
+ Map<String, Object> result =
+ projectService.checkProjectAndAuth(loginUser, project,
projectCode, TASK_DEFINITION_CREATE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -153,7 +159,8 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
}
Map<String, Object> resData = new HashMap<>();
resData.put("total", taskDefinitionLogs.size());
- resData.put("code",
StringUtils.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()),
","));
+ resData.put("code", StringUtils
+
.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()),
","));
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, resData);
return result;
@@ -177,8 +184,9 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
String
taskDefinitionJsonObj,
String upstreamCodes) {
Project project = projectMapper.queryByCode(projectCode);
- //check user access for project
- Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project,
projectCode,TASK_DEFINITION_CREATE);
+ // check user access for project
+ Map<String, Object> result =
+ projectService.checkProjectAndAuth(loginUser, project,
projectCode, TASK_DEFINITION_CREATE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -211,25 +219,27 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
}
long taskCode = taskDefinition.getCode();
if (taskCode == 0) {
- try {
- taskCode = CodeGenerateUtils.getInstance().genCode();
- taskDefinition.setCode(taskCode);
- } catch (CodeGenerateException e) {
- logger.error("Generate task definition code error.", e);
- putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS,
taskDefinitionJsonObj);
- return result;
- }
+ taskDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
}
- List<ProcessTaskRelationLog> processTaskRelationLogList =
Lists.newArrayList();
+ List<ProcessTaskRelationLog> processTaskRelationLogList =
+ processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode)
+ .stream()
+ .map(ProcessTaskRelationLog::new)
+ .collect(Collectors.toList());
+
if (StringUtils.isNotBlank(upstreamCodes)) {
- Set<Long> upstreamTaskCodes =
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+ Set<Long> upstreamTaskCodes =
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong)
+ .collect(Collectors.toSet());
List<TaskDefinition> upstreamTaskDefinitionList =
taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
- Set<Long> queryUpStreamTaskCodes =
upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet());
+ Set<Long> queryUpStreamTaskCodes =
+
upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet());
// upstreamTaskCodes - queryUpStreamTaskCodes
- Set<Long> diffCode = upstreamTaskCodes.stream().filter(code ->
!queryUpStreamTaskCodes.contains(code)).collect(Collectors.toSet());
+ Set<Long> diffCode = upstreamTaskCodes.stream().filter(code ->
!queryUpStreamTaskCodes.contains(code))
+ .collect(Collectors.toSet());
if (!diffCode.isEmpty()) {
String taskCodes = StringUtils.join(diffCode, Constants.COMMA);
- logger.error("Some task definitions with parameter
upstreamCodes do not exist, taskDefinitionCodes:{}.", taskCodes);
+ logger.error("Some task definitions with parameter
upstreamCodes do not exist, taskDefinitionCodes:{}.",
+ taskCodes);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCodes);
return result;
}
@@ -243,10 +253,6 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
processTaskRelationLog.setConditionParams("{}");
processTaskRelationLogList.add(processTaskRelationLog);
}
- List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
- if (!processTaskRelationList.isEmpty()) {
-
processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()));
- }
} else {
ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog();
processTaskRelationLog.setPreTaskCode(0);
@@ -257,22 +263,30 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
processTaskRelationLog.setConditionParams("{}");
processTaskRelationLogList.add(processTaskRelationLog);
}
- int insertResult = processService.saveTaskRelation(loginUser,
projectCode, processDefinition.getCode(), processDefinition.getVersion(),
- processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE);
+ int insertResult = processService.saveTaskRelation(loginUser,
projectCode, processDefinition.getCode(),
+ processDefinition.getVersion(),
+ processTaskRelationLogList, Lists.newArrayList(),
Boolean.TRUE);
if (insertResult != Constants.EXIT_CODE_SUCCESS) {
- logger.error("Save new version process task relations error,
processDefinitionCode:{}, processDefinitionVersion:{}.",
processDefinition.getCode(), processDefinition.getVersion());
+ logger.error(
+ "Save new version process task relations error,
processDefinitionCode:{}, processDefinitionVersion:{}.",
+ processDefinition.getCode(),
processDefinition.getVersion());
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
} else
- logger.info("Save new version process task relations complete,
processDefinitionCode:{}, processDefinitionVersion:{}.",
processDefinition.getCode(), processDefinition.getVersion());
+ logger.info(
+ "Save new version process task relations complete,
processDefinitionCode:{}, processDefinitionVersion:{}.",
+ processDefinition.getCode(),
processDefinition.getVersion());
- int saveTaskResult = processService.saveTaskDefine(loginUser,
projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE);
+ int saveTaskResult =
+ processService.saveTaskDefine(loginUser, projectCode,
Lists.newArrayList(taskDefinition), Boolean.TRUE);
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
- logger.error("Save task definition error, projectCode:{},
taskDefinitionCode:{}.", projectCode, taskDefinition.getCode());
+ logger.error("Save task definition error, projectCode:{},
taskDefinitionCode:{}.", projectCode,
+ taskDefinition.getCode());
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
} else
- logger.info("Save task definition complete, projectCode:{},
taskDefinitionCode:{}.", projectCode, taskDefinition.getCode());
+ logger.info("Save task definition complete, projectCode:{},
taskDefinitionCode:{}.", projectCode,
+ taskDefinition.getCode());
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, taskDefinition);
return result;
@@ -287,10 +301,12 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
* @param taskName task name
*/
@Override
- public Map<String, Object> queryTaskDefinitionByName(User loginUser, long
projectCode, long processCode, String taskName) {
+ public Map<String, Object> queryTaskDefinitionByName(User loginUser, long
projectCode, long processCode,
+ String taskName) {
Project project = projectMapper.queryByCode(projectCode);
- //check user access for project
- Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project,
projectCode,TASK_DEFINITION);
+ // check user access for project
+ Map<String, Object> result =
+ projectService.checkProjectAndAuth(loginUser, project,
projectCode, TASK_DEFINITION);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -318,8 +334,9 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
@Override
public Map<String, Object> deleteTaskDefinitionByCode(User loginUser, long
projectCode, long taskCode) {
Project project = projectMapper.queryByCode(projectCode);
- //check user access for project
- Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project,
projectCode,TASK_DEFINITION_DELETE);
+ // check user access for project
+ Map<String, Object> result =
+ projectService.checkProjectAndAuth(loginUser, project,
projectCode, TASK_DEFINITION_DELETE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -335,33 +352,42 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
return result;
}
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag()
== Flag.YES) {
- logger.warn("Task definition can not be deleted due to task state
online, taskDefinitionCode:{}.", taskCode);
+ logger.warn("Task definition can not be deleted due to task state
online, taskDefinitionCode:{}.",
+ taskCode);
putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode);
return result;
}
- List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryDownstreamByTaskCode(taskCode);
+ List<ProcessTaskRelation> processTaskRelationList =
+ processTaskRelationMapper.queryDownstreamByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) {
Set<Long> postTaskCodes = processTaskRelationList
.stream()
.map(ProcessTaskRelation::getPostTaskCode)
.collect(Collectors.toSet());
String postTaskCodesStr = StringUtils.join(postTaskCodes, ",");
- logger.warn("Task definition can not be deleted due to downstream
tasks, taskDefinitionCode:{}, downstreamTaskCodes:{}",
+ logger.warn(
+ "Task definition can not be deleted due to downstream
tasks, taskDefinitionCode:{}, downstreamTaskCodes:{}",
taskCode, postTaskCodesStr);
putMsg(result, Status.TASK_HAS_DOWNSTREAM, postTaskCodesStr);
return result;
}
int delete = taskDefinitionMapper.deleteByCode(taskCode);
if (delete > 0) {
- List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+ List<ProcessTaskRelation> taskRelationList =
+ processTaskRelationMapper.queryUpstreamByCode(projectCode,
taskCode);
if (!taskRelationList.isEmpty()) {
- logger.info("Task definition has upstream tasks, start handle
them after delete task, taskDefinitionCode:{}.", taskCode);
+ logger.info(
+ "Task definition has upstream tasks, start handle them
after delete task, taskDefinitionCode:{}.",
+ taskCode);
long processDefinitionCode =
taskRelationList.get(0).getProcessDefinitionCode();
- List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
- List<ProcessTaskRelation> relationList =
processTaskRelations.stream().filter(r -> r.getPostTaskCode() !=
taskCode).collect(Collectors.toList());
+ List<ProcessTaskRelation> processTaskRelations =
+
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ List<ProcessTaskRelation> relationList =
processTaskRelations.stream()
+ .filter(r -> r.getPostTaskCode() !=
taskCode).collect(Collectors.toList());
updateDag(loginUser, result, processDefinitionCode,
relationList, Lists.newArrayList());
} else {
- logger.info("Task definition delete complete, projectCode:{},
taskDefinitionCode:{}.", projectCode, taskCode);
+ logger.info("Task definition delete complete, projectCode:{},
taskDefinitionCode:{}.", projectCode,
+ taskCode);
putMsg(result, Status.SUCCESS);
}
} else {
@@ -372,7 +398,8 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
return result;
}
- private void updateDag(User loginUser, Map<String, Object> result, long
processDefinitionCode, List<ProcessTaskRelation> processTaskRelationList,
+ private void updateDag(User loginUser, Map<String, Object> result, long
processDefinitionCode,
+ List<ProcessTaskRelation> processTaskRelationList,
List<TaskDefinitionLog> taskDefinitionLogs) {
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
@@ -381,19 +408,27 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
}
int insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion <= 0) {
- logger.error("Update process definition error, projectCode:{},
processDefinitionCode:{}.", processDefinition.getProjectCode(),
processDefinitionCode);
+ logger.error("Update process definition error, projectCode:{},
processDefinitionCode:{}.",
+ processDefinition.getProjectCode(), processDefinitionCode);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} else
- logger.info("Save new version process definition complete,
projectCode:{}, processDefinitionCode:{}, newVersion:{}.",
processDefinition.getProjectCode(), processDefinitionCode, insertVersion);
- List<ProcessTaskRelationLog> relationLogs =
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
- int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(), processDefinition.getCode(),
+ logger.info(
+ "Save new version process definition complete,
projectCode:{}, processDefinitionCode:{}, newVersion:{}.",
+ processDefinition.getProjectCode(), processDefinitionCode,
insertVersion);
+ List<ProcessTaskRelationLog> relationLogs =
+
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+ int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(),
+ processDefinition.getCode(),
insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
- logger.info("Save new version task relations complete,
projectCode:{}, processDefinitionCode:{}, newVersion:{}.",
processDefinition.getProjectCode(), processDefinitionCode, insertVersion);
+ logger.info(
+ "Save new version task relations complete, projectCode:{},
processDefinitionCode:{}, newVersion:{}.",
+ processDefinition.getProjectCode(), processDefinitionCode,
insertVersion);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
- logger.error("Update task relations error, projectCode:{},
processDefinitionCode:{}.", processDefinition.getProjectCode(),
processDefinitionCode);
+ logger.error("Update task relations error, projectCode:{},
processDefinitionCode:{}.",
+ processDefinition.getProjectCode(), processDefinitionCode);
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
@@ -409,18 +444,25 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
*/
@Transactional
@Override
- public Map<String, Object> updateTaskDefinition(User loginUser, long
projectCode, long taskCode, String taskDefinitionJsonObj) {
+ public Map<String, Object> updateTaskDefinition(User loginUser, long
projectCode, long taskCode,
+ String
taskDefinitionJsonObj) {
Map<String, Object> result = new HashMap<>();
- TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser,
projectCode, taskCode, taskDefinitionJsonObj, result);
+ TaskDefinitionLog taskDefinitionToUpdate =
+ updateTask(loginUser, projectCode, taskCode,
taskDefinitionJsonObj, result);
if (taskDefinitionToUpdate == null) {
return result;
}
- List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+ List<ProcessTaskRelation> taskRelationList =
+ processTaskRelationMapper.queryUpstreamByCode(projectCode,
taskCode);
if (!taskRelationList.isEmpty()) {
- logger.info("Task definition has upstream tasks, start handle them
after update task, taskDefinitionCode:{}.", taskCode);
+ logger.info(
+ "Task definition has upstream tasks, start handle them
after update task, taskDefinitionCode:{}.",
+ taskCode);
long processDefinitionCode =
taskRelationList.get(0).getProcessDefinitionCode();
- List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
- updateDag(loginUser, result, processDefinitionCode,
processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate));
+ List<ProcessTaskRelation> processTaskRelations =
+ processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ updateDag(loginUser, result, processDefinitionCode,
processTaskRelations,
+ Lists.newArrayList(taskDefinitionToUpdate));
}
logger.info("Update task definition complete, projectCode:{},
taskDefinitionCode:{}.", projectCode, taskCode);
result.put(Constants.DATA_LIST, taskCode);
@@ -428,10 +470,11 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
return result;
}
- private TaskDefinitionLog updateTask(User loginUser, long projectCode,
long taskCode, String taskDefinitionJsonObj, Map<String, Object> result) {
+ private TaskDefinitionLog updateTask(User loginUser, long projectCode,
long taskCode, String taskDefinitionJsonObj,
+ Map<String, Object> result) {
Project project = projectMapper.queryByCode(projectCode);
- //check user access for project
- result.putAll(projectService.checkProjectAndAuth(loginUser, project,
projectCode,TASK_DEFINITION_UPDATE));
+ // check user access for project
+ result.putAll(projectService.checkProjectAndAuth(loginUser, project,
projectCode, TASK_DEFINITION_UPDATE));
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return null;
}
@@ -444,12 +487,14 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag()
== Flag.YES) {
// if stream, can update task definition without online check
if (taskDefinition.getTaskExecuteType() != TaskExecuteType.STREAM)
{
- logger.warn("Only {} type task can be updated without online
check, taskDefinitionCode:{}.", TaskExecuteType.STREAM, taskCode);
+ logger.warn("Only {} type task can be updated without online
check, taskDefinitionCode:{}.",
+ TaskExecuteType.STREAM, taskCode);
putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
return null;
}
}
- TaskDefinitionLog taskDefinitionToUpdate =
JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
+ TaskDefinitionLog taskDefinitionToUpdate =
+ JSONUtils.parseObject(taskDefinitionJsonObj,
TaskDefinitionLog.class);
if (taskDefinition.equals(taskDefinitionToUpdate)) {
logger.warn("Task definition does not need update because no
change, taskDefinitionCode:{}.", taskCode);
return null;
@@ -464,13 +509,15 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
.taskParams(taskDefinitionToUpdate.getTaskParams())
.dependence(taskDefinitionToUpdate.getDependence())
.build())) {
- logger.warn("Task definition parameters are invalid,
taskDefinitionName:{}.", taskDefinitionToUpdate.getName());
+ logger.warn("Task definition parameters are invalid,
taskDefinitionName:{}.",
+ taskDefinitionToUpdate.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID,
taskDefinitionToUpdate.getName());
return null;
}
Integer version =
taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
if (version == null || version == 0) {
- logger.error("Max version task definitionLog can not be found in
database, taskDefinitionCode:{}.", taskCode);
+ logger.error("Max version task definitionLog can not be found in
database, taskDefinitionCode:{}.",
+ taskCode);
putMsg(result, Status.DATA_IS_NOT_VALID, taskCode);
return null;
}
@@ -490,11 +537,13 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
taskDefinitionToUpdate.setId(null);
int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
if ((update & insert) != 1) {
- logger.error("Update task definition or definitionLog error,
projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode);
+ logger.error("Update task definition or definitionLog error,
projectCode:{}, taskDefinitionCode:{}.",
+ projectCode, taskCode);
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
} else
- logger.info("Update task definition and definitionLog complete,
projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
+ logger.info(
+ "Update task definition and definitionLog complete,
projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
projectCode, taskCode,
taskDefinitionToUpdate.getVersion());
return taskDefinitionToUpdate;
}
@@ -510,17 +559,22 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
* @return update result code
*/
@Override
- public Map<String, Object> updateTaskWithUpstream(User loginUser, long
projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes)
{
+ public Map<String, Object> updateTaskWithUpstream(User loginUser, long
projectCode, long taskCode,
+ String
taskDefinitionJsonObj, String upstreamCodes) {
Map<String, Object> result = new HashMap<>();
- TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser,
projectCode, taskCode, taskDefinitionJsonObj, result);
+ TaskDefinitionLog taskDefinitionToUpdate =
+ updateTask(loginUser, projectCode, taskCode,
taskDefinitionJsonObj, result);
if (result.get(Constants.STATUS) != Status.SUCCESS &&
taskDefinitionToUpdate == null) {
return result;
}
- List<ProcessTaskRelation> upstreamTaskRelations =
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
- Set<Long> upstreamCodeSet =
upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet());
+ List<ProcessTaskRelation> upstreamTaskRelations =
+ processTaskRelationMapper.queryUpstreamByCode(projectCode,
taskCode);
+ Set<Long> upstreamCodeSet =
+
upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet());
Set<Long> upstreamTaskCodes = Collections.emptySet();
if (StringUtils.isNotEmpty(upstreamCodes)) {
- upstreamTaskCodes =
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+ upstreamTaskCodes =
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong)
+ .collect(Collectors.toSet());
}
if (CollectionUtils.isEqualCollection(upstreamCodeSet,
upstreamTaskCodes) && taskDefinitionToUpdate == null) {
putMsg(result, Status.SUCCESS);
@@ -533,12 +587,14 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
Map<Long, TaskDefinition> queryUpStreamTaskCodeMap;
if (!upstreamTaskCodes.isEmpty()) {
List<TaskDefinition> upstreamTaskDefinitionList =
taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
- queryUpStreamTaskCodeMap =
upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode,
taskDefinition -> taskDefinition));
+ queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream()
+ .collect(Collectors.toMap(TaskDefinition::getCode,
taskDefinition -> taskDefinition));
// upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet());
if (!upstreamTaskCodes.isEmpty()) {
String notExistTaskCodes = StringUtils.join(upstreamTaskCodes,
Constants.COMMA);
- logger.error("Some task definitions in parameter
upstreamTaskCodes do not exist, notExistTaskCodes:{}.", notExistTaskCodes);
+ logger.error("Some task definitions in parameter
upstreamTaskCodes do not exist, notExistTaskCodes:{}.",
+ notExistTaskCodes);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST,
notExistTaskCodes);
return result;
}
@@ -547,12 +603,14 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
}
if (!upstreamTaskRelations.isEmpty()) {
ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0);
- List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
taskRelation.getProcessDefinitionCode());
+ List<ProcessTaskRelation> processTaskRelations =
+ processTaskRelationMapper.queryByProcessCode(projectCode,
taskRelation.getProcessDefinitionCode());
List<ProcessTaskRelation> processTaskRelationList =
Lists.newArrayList(processTaskRelations);
List<ProcessTaskRelation> relationList = Lists.newArrayList();
for (ProcessTaskRelation processTaskRelation :
processTaskRelationList) {
if (processTaskRelation.getPostTaskCode() == taskCode) {
- if
(queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode()) &&
processTaskRelation.getPreTaskCode() != 0L) {
+ if
(queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode())
+ && processTaskRelation.getPreTaskCode() != 0L) {
queryUpStreamTaskCodeMap.remove(processTaskRelation.getPreTaskCode());
} else {
processTaskRelation.setPreTaskCode(0L);
@@ -570,16 +628,17 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
if (queryUpStreamTaskCodeMap.isEmpty() &&
!processTaskRelationList.isEmpty()) {
processTaskRelationList.add(processTaskRelationList.get(0));
}
- updateDag(loginUser, result,
taskRelation.getProcessDefinitionCode(), processTaskRelations,
Lists.newArrayList(taskDefinitionToUpdate));
+ updateDag(loginUser, result,
taskRelation.getProcessDefinitionCode(), processTaskRelations,
+ Lists.newArrayList(taskDefinitionToUpdate));
}
- logger.info("Update task with upstream tasks complete, projectCode:{},
taskDefinitionCode:{}, upstreamTaskCodes:{}.",
+ logger.info(
+ "Update task with upstream tasks complete, projectCode:{},
taskDefinitionCode:{}, upstreamTaskCodes:{}.",
projectCode, taskCode, upstreamTaskCodes);
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
return result;
}
-
/**
* switch task definition
*
@@ -592,13 +651,15 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
@Override
public Map<String, Object> switchVersion(User loginUser, long projectCode,
long taskCode, int version) {
Project project = projectMapper.queryByCode(projectCode);
- //check user access for project
- Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser,
project,projectCode,WORKFLOW_SWITCH_TO_THIS_VERSION);
+ // check user access for project
+ Map<String, Object> result =
+ projectService.checkProjectAndAuth(loginUser, project,
projectCode, WORKFLOW_SWITCH_TO_THIS_VERSION);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
if (processService.isTaskOnline(taskCode)) {
- logger.warn("Task definition version can not be switched due to
process definition is {}, taskDefinitionCode:{}.",
+ logger.warn(
+ "Task definition version can not be switched due to
process definition is {}, taskDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), taskCode);
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE);
return result;
@@ -609,20 +670,28 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
putMsg(result, Status.TASK_DEFINE_NOT_EXIST,
String.valueOf(taskCode));
return result;
}
- TaskDefinitionLog taskDefinitionUpdate =
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version);
+ TaskDefinitionLog taskDefinitionUpdate =
+
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version);
taskDefinitionUpdate.setUserId(loginUser.getId());
taskDefinitionUpdate.setUpdateTime(new Date());
taskDefinitionUpdate.setId(taskDefinition.getId());
int switchVersion =
taskDefinitionMapper.updateById(taskDefinitionUpdate);
if (switchVersion > 0) {
- List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+ List<ProcessTaskRelation> taskRelationList =
+ processTaskRelationMapper.queryUpstreamByCode(projectCode,
taskCode);
if (!taskRelationList.isEmpty()) {
- logger.info("Task definition has upstream tasks, start handle
them after switch task, taskDefinitionCode:{}.", taskCode);
+ logger.info(
+ "Task definition has upstream tasks, start handle them
after switch task, taskDefinitionCode:{}.",
+ taskCode);
long processDefinitionCode =
taskRelationList.get(0).getProcessDefinitionCode();
- List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
- updateDag(loginUser, result, processDefinitionCode,
processTaskRelations, Lists.newArrayList(taskDefinitionUpdate));
+ List<ProcessTaskRelation> processTaskRelations =
+
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ updateDag(loginUser, result, processDefinitionCode,
processTaskRelations,
+ Lists.newArrayList(taskDefinitionUpdate));
} else {
- logger.info("Task definition version switch complete, switch
task version to {}, taskDefinitionCode:{}.", version, taskCode);
+ logger.info(
+ "Task definition version switch complete, switch task
version to {}, taskDefinitionCode:{}.",
+ version, taskCode);
putMsg(result, Status.SUCCESS);
}
} else {
@@ -641,7 +710,8 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
Result result = new Result();
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
- Map<String, Object> checkResult =
projectService.checkProjectAndAuth(loginUser, project,
projectCode,TASK_VERSION_VIEW);
+ Map<String, Object> checkResult =
+ projectService.checkProjectAndAuth(loginUser, project,
projectCode, TASK_VERSION_VIEW);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
putMsg(result, resultStatus);
@@ -649,7 +719,8 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
}
PageInfo<TaskDefinitionLog> pageInfo = new PageInfo<>(pageNo,
pageSize);
Page<TaskDefinitionLog> page = new Page<>(pageNo, pageSize);
- IPage<TaskDefinitionLog> taskDefinitionVersionsPaging =
taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode,
projectCode);
+ IPage<TaskDefinitionLog> taskDefinitionVersionsPaging =
+
taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode,
projectCode);
List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitionVersionsPaging.getRecords();
pageInfo.setTotalList(taskDefinitionLogs);
@@ -662,8 +733,9 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
@Override
public Map<String, Object> deleteByCodeAndVersion(User loginUser, long
projectCode, long taskCode, int version) {
Project project = projectMapper.queryByCode(projectCode);
- //check user access for project
- Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project,
projectCode,TASK_DEFINITION_DELETE);
+ // check user access for project
+ Map<String, Object> result =
+ projectService.checkProjectAndAuth(loginUser, project,
projectCode, TASK_DEFINITION_DELETE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -674,14 +746,16 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
putMsg(result, Status.TASK_DEFINE_NOT_EXIST,
String.valueOf(taskCode));
} else {
if (taskDefinition.getVersion() == version) {
- logger.warn("Task definition can not be deleted due to version
is being used, projectCode:{}, taskDefinitionCode:{}, version:{}.",
+ logger.warn(
+ "Task definition can not be deleted due to version is
being used, projectCode:{}, taskDefinitionCode:{}, version:{}.",
projectCode, taskCode, version);
putMsg(result, Status.MAIN_TABLE_USING_VERSION);
return result;
}
int delete =
taskDefinitionLogMapper.deleteByCodeAndVersion(taskCode, version);
if (delete > 0) {
- logger.info("Task definition version delete complete,
projectCode:{}, taskDefinitionCode:{}, version:{}.",
+ logger.info(
+ "Task definition version delete complete,
projectCode:{}, taskDefinitionCode:{}, version:{}.",
projectCode, taskCode, version);
putMsg(result, Status.SUCCESS);
} else {
@@ -696,8 +770,9 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
@Override
public Map<String, Object> queryTaskDefinitionDetail(User loginUser, long
projectCode, long taskCode) {
Project project = projectMapper.queryByCode(projectCode);
- //check user access for project
- Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project,
projectCode,TASK_DEFINITION);
+ // check user access for project
+ Map<String, Object> result =
+ projectService.checkProjectAndAuth(loginUser, project,
projectCode, TASK_DEFINITION);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -724,8 +799,9 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
Integer pageSize) {
Result result = new Result();
Project project = projectMapper.queryByCode(projectCode);
- //check user access for project
- Map<String, Object> checkResult =
projectService.checkProjectAndAuth(loginUser, project,
projectCode,TASK_DEFINITION);
+ // check user access for project
+ Map<String, Object> checkResult =
+ projectService.checkProjectAndAuth(loginUser, project,
projectCode, TASK_DEFINITION);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
putMsg(result, resultStatus);
@@ -733,8 +809,9 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
}
taskType = taskType == null ? StringUtils.EMPTY : taskType;
Page<TaskMainInfo> page = new Page<>(pageNo, pageSize);
- IPage<TaskMainInfo> taskMainInfoIPage =
taskDefinitionMapper.queryDefineListPaging(page, projectCode,
searchWorkflowName,
- searchTaskName, taskType, taskExecuteType);
+ IPage<TaskMainInfo> taskMainInfoIPage =
+ taskDefinitionMapper.queryDefineListPaging(page, projectCode,
searchWorkflowName,
+ searchTaskName, taskType, taskExecuteType);
List<TaskMainInfo> records = taskMainInfoIPage.getRecords();
if (!records.isEmpty()) {
Map<Long, TaskMainInfo> taskMainInfoMap = new HashMap<>();
@@ -800,10 +877,11 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
*/
@Transactional
@Override
- public Map<String, Object> releaseTaskDefinition(User loginUser, long
projectCode, long code, ReleaseState releaseState) {
+ public Map<String, Object> releaseTaskDefinition(User loginUser, long
projectCode, long code,
+ ReleaseState
releaseState) {
Project project = projectMapper.queryByCode(projectCode);
- //check user access for project
- Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,null);
+ // check user access for project
+ Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode, null);
Status resultStatus = (Status) result.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
return result;
@@ -817,7 +895,8 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
- TaskDefinitionLog taskDefinitionLog =
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code,
taskDefinition.getVersion());
+ TaskDefinitionLog taskDefinitionLog =
+ taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code,
taskDefinition.getVersion());
if (taskDefinitionLog == null) {
logger.error("Task definition does not exist,
taskDefinitionCode:{}.", code);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code));
@@ -831,8 +910,10 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
case ONLINE:
String resourceIds = taskDefinition.getResourceIds();
if (StringUtils.isNotBlank(resourceIds)) {
- Integer[] resourceIdArray =
Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new);
- PermissionCheck<Integer> permissionCheck = new
PermissionCheck(AuthorizationType.RESOURCE_FILE_ID, processService,
resourceIdArray, loginUser.getId(), logger);
+ Integer[] resourceIdArray =
+
Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new);
+ PermissionCheck<Integer> permissionCheck = new
PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,
+ processService, resourceIdArray,
loginUser.getId(), logger);
try {
permissionCheck.checkPermission();
} catch (Exception e) {
@@ -856,7 +937,8 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
- logger.error("Update taskDefinition state or taskDefinitionLog state
to complete, taskDefinitionCode:{}.", code);
+ logger.error("Update taskDefinition state or taskDefinitionLog state
to complete, taskDefinitionCode:{}.",
+ code);
putMsg(result, Status.SUCCESS);
return result;
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 6536a6b4ed..09ccbd7d10 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -602,7 +602,8 @@ public class ProcessServiceImpl implements ProcessService {
@Override
public void removeTaskLogFile(Integer processInstanceId) {
ProcessInstance processInstance =
processInstanceMapper.selectById(processInstanceId);
- List<TaskInstance> taskInstanceList =
findValidTaskListByProcessId(processInstanceId,processInstance.getTestFlag());
+ List<TaskInstance> taskInstanceList =
+ findValidTaskListByProcessId(processInstanceId,
processInstance.getTestFlag());
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
@@ -623,7 +624,8 @@ public class ProcessServiceImpl implements ProcessService {
@Override
public void deleteWorkTaskInstanceByProcessInstanceId(int
processInstanceId) {
ProcessInstance processInstance =
processInstanceMapper.selectById(processInstanceId);
- List<TaskInstance> taskInstanceList =
findValidTaskListByProcessId(processInstanceId,processInstance.getTestFlag());
+ List<TaskInstance> taskInstanceList =
+ findValidTaskListByProcessId(processInstanceId,
processInstance.getTestFlag());
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
@@ -1037,7 +1039,8 @@ public class ProcessServiceImpl implements ProcessService
{
case COMPLEMENT_DATA:
// delete all the valid tasks when complement data if id is
not null
if (processInstance.getId() != null) {
- List<TaskInstance> taskInstanceList =
this.findValidTaskListByProcessId(processInstance.getId(),
processInstance.getTestFlag());
+ List<TaskInstance> taskInstanceList =
+
this.findValidTaskListByProcessId(processInstance.getId(),
processInstance.getTestFlag());
for (TaskInstance taskInstance : taskInstanceList) {
taskInstance.setFlag(Flag.NO);
this.updateTaskInstance(taskInstance);
@@ -1051,7 +1054,8 @@ public class ProcessServiceImpl implements ProcessService
{
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
}
// delete all the valid tasks when repeat running
- List<TaskInstance> validTaskList =
findValidTaskListByProcessId(processInstance.getId(),processInstance.getTestFlag());
+ List<TaskInstance> validTaskList =
+ findValidTaskListByProcessId(processInstance.getId(),
processInstance.getTestFlag());
for (TaskInstance taskInstance : validTaskList) {
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
@@ -1654,7 +1658,8 @@ public class ProcessServiceImpl implements ProcessService
{
if (failureStrategy == FailureStrategy.CONTINUE) {
return true;
}
- List<TaskInstance> taskInstances =
this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(),taskInstance.getTestFlag());
+ List<TaskInstance> taskInstances =
+
this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(),
taskInstance.getTestFlag());
for (TaskInstance task : taskInstances) {
if (task.getState() == TaskExecutionStatus.FAILURE
@@ -1862,7 +1867,8 @@ public class ProcessServiceImpl implements ProcessService
{
@Override
public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer
processInstanceId) {
ProcessInstance processInstance =
processInstanceMapper.selectById(processInstanceId);
- return
taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO,
processInstance.getTestFlag());
+ return
taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO,
+ processInstance.getTestFlag());
}
/**
@@ -2194,7 +2200,8 @@ public class ProcessServiceImpl implements ProcessService
{
* @return process instance
*/
@Override
- public ProcessInstance findLastSchedulerProcessInterval(Long
definitionCode, DateInterval dateInterval, int testFlag) {
+ public ProcessInstance findLastSchedulerProcessInterval(Long
definitionCode, DateInterval dateInterval,
+ int testFlag) {
return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime(),
@@ -2535,6 +2542,7 @@ public class ProcessServiceImpl implements ProcessService
{
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
updateTaskDefinitionLogs.add(taskDefinitionLog);
}
+
if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) {
List<Long> taskDefinitionCodes = updateTaskDefinitionLogs
.stream()
@@ -2554,15 +2562,24 @@ public class ProcessServiceImpl implements
ProcessService {
// for each taskDefinitionLog, we will insert a new version into db
// and update the origin one if exist
- int updateResult = updateTaskDefinitionLogs.size();
- int insertResult = newTaskDefinitionLogs.size();
- if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
- insertResult =
taskDefinitionLogMapper.batchInsert(taskDefinitionLogs);
+ int updateResult = 0;
+ int insertResult = 0;
+ if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs)) {
+ insertResult +=
taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
+ }
+ if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) {
+ insertResult +=
taskDefinitionLogMapper.batchInsert(updateTaskDefinitionLogs);
}
if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs) &&
Boolean.TRUE.equals(syncDefine)) {
updateResult +=
taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
}
+ if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs) &&
Boolean.TRUE.equals(syncDefine)) {
+ for (TaskDefinitionLog taskDefinitionLog :
updateTaskDefinitionLogs) {
+ updateResult +=
taskDefinitionMapper.updateById(taskDefinitionLog);
+ }
+ }
+
return (insertResult & updateResult) > 0 ? 1 :
Constants.EXIT_CODE_SUCCESS;
}
@@ -3157,7 +3174,8 @@ public class ProcessServiceImpl implements ProcessService
{
ProcessInstance processInstance =
findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null);
if (processInstance != null
&& (processInstance.getState().isFailure() ||
processInstance.getState().isStop())) {
- List<TaskInstance> validTaskList =
findValidTaskListByProcessId(processInstance.getId(),
processInstance.getTestFlag());
+ List<TaskInstance> validTaskList =
+ findValidTaskListByProcessId(processInstance.getId(),
processInstance.getTestFlag());
List<Long> instanceTaskCodeList =
validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList());
List<ProcessTaskRelation> taskRelations =
findRelationByCode(processInstance.getProcessDefinitionCode(),
@@ -3183,7 +3201,8 @@ public class ProcessServiceImpl implements ProcessService
{
@Override
public Integer queryTestDataSourceId(Integer onlineDataSourceId) {
Integer testDataSourceId =
dataSourceMapper.queryTestDataSourceId(onlineDataSourceId);
- if (testDataSourceId!=null) return testDataSourceId;
+ if (testDataSourceId != null)
+ return testDataSourceId;
return null;
}
}