This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 8439b5dc69 [fix-12721] Fix cannot modify the upstream task in task
definition page. (#12722)
8439b5dc69 is described below
commit 8439b5dc69f2a7dfc1043393bbbb21a8277715da
Author: jackfanwan <[email protected]>
AuthorDate: Sun Jan 15 19:58:47 2023 +0800
[fix-12721] Fix cannot modify the upstream task in task definition page.
(#12722)
---
.../service/impl/TaskDefinitionServiceImpl.java | 100 ++++++++++++++++++++-
.../api/service/TaskDefinitionServiceImplTest.java | 82 +++++++++++++++--
.../dao/repository/ProcessTaskRelationLogDao.java | 10 +++
.../impl/ProcessTaskRelationLogDaoImpl.java | 5 ++
4 files changed, 186 insertions(+), 11 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 7dd38bba05..e79507c6b5 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -63,6 +63,7 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
@@ -82,6 +83,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -122,6 +124,9 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
+ @Autowired
+ private ProcessTaskRelationLogDao processTaskRelationLogDao;
+
@Autowired
private ProcessTaskRelationService processTaskRelationService;
@@ -852,10 +857,15 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
} else {
queryUpStreamTaskCodeMap = new HashMap<>();
}
- if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
+ if (MapUtils.isNotEmpty(queryUpStreamTaskCodeMap)) {
ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0);
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
taskRelation.getProcessDefinitionCode());
+
+ // set upstream code list
+ updateUpstreamTask(new
HashSet<>(queryUpStreamTaskCodeMap.keySet()),
+ taskCode, projectCode,
taskRelation.getProcessDefinitionCode(), loginUser);
+
List<ProcessTaskRelation> processTaskRelationList =
Lists.newArrayList(processTaskRelations);
List<ProcessTaskRelation> relationList = Lists.newArrayList();
for (ProcessTaskRelation processTaskRelation :
processTaskRelationList) {
@@ -879,8 +889,6 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
if (MapUtils.isEmpty(queryUpStreamTaskCodeMap) &&
CollectionUtils.isNotEmpty(processTaskRelationList)) {
processTaskRelationList.add(processTaskRelationList.get(0));
}
- updateDag(loginUser, taskRelation.getProcessDefinitionCode(),
processTaskRelations,
- Lists.newArrayList(taskDefinitionToUpdate));
}
logger.info(
"Update task with upstream tasks complete, projectCode:{},
taskDefinitionCode:{}, upstreamTaskCodes:{}.",
@@ -890,6 +898,92 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
return result;
}
+ private void updateUpstreamTask(Set<Long> allPreTaskCodeSet, long
taskCode, long projectCode,
+ long processDefinitionCode, User
loginUser) {
+ // query all process task relation
+ List<ProcessTaskRelation> hadProcessTaskRelationList =
processTaskRelationMapper
+ .queryUpstreamByCode(projectCode, taskCode);
+ // remove pre
+ Set<Long> removePreTaskSet = new HashSet<>();
+ List<ProcessTaskRelation> removePreTaskList = new ArrayList<>();
+ // add pre
+ Set<Long> addPreTaskSet = new HashSet<>();
+ List<ProcessTaskRelation> addPreTaskList = new ArrayList<>();
+
+ List<ProcessTaskRelationLog> processTaskRelationLogList = new
ArrayList<>();
+
+ // filter all process task relation
+ if (CollectionUtils.isNotEmpty(hadProcessTaskRelationList)) {
+ for (ProcessTaskRelation processTaskRelation :
hadProcessTaskRelationList) {
+ if (processTaskRelation.getPreTaskCode() == 0) {
+ continue;
+ }
+ // had
+ if
(allPreTaskCodeSet.contains(processTaskRelation.getPreTaskCode())) {
+
allPreTaskCodeSet.remove(processTaskRelation.getPreTaskCode());
+ } else {
+ // remove
+ removePreTaskSet.add(processTaskRelation.getPreTaskCode());
+ processTaskRelation.setPreTaskCode(0);
+ processTaskRelation.setPreTaskVersion(0);
+ removePreTaskList.add(processTaskRelation);
+
processTaskRelationLogList.add(createProcessTaskRelationLog(loginUser,
processTaskRelation));
+ }
+ }
+ }
+ // add
+ if (allPreTaskCodeSet.size() != 0) {
+ addPreTaskSet.addAll(allPreTaskCodeSet);
+ }
+ // get add task code map
+ allPreTaskCodeSet.add(Long.valueOf(taskCode));
+ List<TaskDefinition> taskDefinitionList =
taskDefinitionMapper.queryByCodeList(allPreTaskCodeSet);
+ Map<Long, TaskDefinition> taskCodeMap =
taskDefinitionList.stream().collect(Collectors
+ .toMap(TaskDefinition::getCode, Function.identity(), (a, b) ->
a));
+
+ ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processDefinitionCode);
+ TaskDefinition taskDefinition = taskCodeMap.get(taskCode);
+
+ for (Long preTaskCode : addPreTaskSet) {
+ TaskDefinition preTaskRelation = taskCodeMap.get(preTaskCode);
+ ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(
+ null, processDefinition.getVersion(), projectCode,
processDefinition.getCode(),
+ preTaskRelation.getCode(), preTaskRelation.getVersion(),
+ taskDefinition.getCode(), taskDefinition.getVersion(),
ConditionType.NONE, "{}");
+ addPreTaskList.add(processTaskRelation);
+
processTaskRelationLogList.add(createProcessTaskRelationLog(loginUser,
processTaskRelation));
+ }
+ int insert = 0;
+ int remove = 0;
+ int log = 0;
+ // insert process task relation table data
+ if (CollectionUtils.isNotEmpty(addPreTaskList)) {
+ insert = processTaskRelationMapper.batchInsert(addPreTaskList);
+ }
+ if (CollectionUtils.isNotEmpty(removePreTaskList)) {
+ for (ProcessTaskRelation processTaskRelation : removePreTaskList) {
+ remove +=
processTaskRelationMapper.updateById(processTaskRelation);
+ }
+ }
+ if (CollectionUtils.isNotEmpty(processTaskRelationLogList)) {
+ log =
processTaskRelationLogDao.batchInsert(processTaskRelationLogList);
+ }
+ if (insert + remove != log) {
+ throw new RuntimeException("updateUpstreamTask error");
+ }
+ }
+
+ private ProcessTaskRelationLog createProcessTaskRelationLog(User loginUser,
+
ProcessTaskRelation processTaskRelation) {
+ Date now = new Date();
+ ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog(processTaskRelation);
+ processTaskRelationLog.setOperator(loginUser.getId());
+ processTaskRelationLog.setOperateTime(now);
+ processTaskRelationLog.setCreateTime(now);
+ processTaskRelationLog.setUpdateTime(now);
+ return processTaskRelationLog;
+ }
+
/**
* switch task definition
*
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index a066b81124..391769627f 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -56,12 +56,14 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.process.ProcessServiceImpl;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -124,11 +126,15 @@ public class TaskDefinitionServiceImplTest {
@Mock
private ProcessDefinitionService processDefinitionService;
+ @Mock
+ private ProcessTaskRelationLogDao processTaskRelationLogDao;
+
private static final String TASK_PARAMETER =
"{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";;
private static final long PROJECT_CODE = 1L;
private static final long PROCESS_DEFINITION_CODE = 2L;
private static final long TASK_CODE = 3L;
+ private static final String UPSTREAM_CODE = "3,5";
private static final int VERSION = 1;
private static final int RESOURCE_RATE = -1;
protected User user;
@@ -169,14 +175,7 @@ public class TaskDefinitionServiceImplTest {
@Test
public void updateTaskDefinition() {
- String taskDefinitionJson =
-
"{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
- +
"\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
- +
"\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
- + "\\\"echo
${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
- +
"\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
- +
"\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
- +
"\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}";
+ String taskDefinitionJson = getTaskDefinitionJson();;
Project project = getProject();
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
@@ -591,6 +590,52 @@ public class TaskDefinitionServiceImplTest {
Assertions.assertDoesNotThrow(() ->
taskDefinitionService.getTaskDefinition(user, TASK_CODE));
}
+ @Test
+ public void testUpdateTaskWithUpstream() {
+
+ String taskDefinitionJson = getTaskDefinitionJson();
+ TaskDefinition taskDefinition = getTaskDefinition();
+ taskDefinition.setFlag(Flag.NO);
+ TaskDefinition taskDefinitionSecond = getTaskDefinition();
+ taskDefinitionSecond.setCode(5);
+
+ user.setUserType(UserType.ADMIN_USER);
+
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
+ Mockito.when(projectService.hasProjectAndWritePerm(user, getProject(),
new HashMap<>())).thenReturn(true);
+
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition);
+
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
+
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
+
Mockito.when(taskDefinitionMapper.updateById(Mockito.any())).thenReturn(1);
+
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any())).thenReturn(1);
+
+ Mockito.when(taskDefinitionMapper.queryByCodeList(Mockito.anySet()))
+ .thenReturn(Arrays.asList(taskDefinition,
taskDefinitionSecond));
+
+
Mockito.when(processTaskRelationMapper.queryUpstreamByCode(PROJECT_CODE,
TASK_CODE))
+ .thenReturn(getProcessTaskRelationListV2());
+
Mockito.when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition());
+
Mockito.when(processTaskRelationMapper.batchInsert(Mockito.anyList())).thenReturn(1);
+
Mockito.when(processTaskRelationMapper.updateById(Mockito.any())).thenReturn(1);
+
Mockito.when(processTaskRelationLogDao.batchInsert(Mockito.anyList())).thenReturn(2);
+ // success
+ Map<String, Object> successMap =
taskDefinitionService.updateTaskWithUpstream(user, PROJECT_CODE, TASK_CODE,
+ taskDefinitionJson, UPSTREAM_CODE);
+ Assertions.assertEquals(Status.SUCCESS,
successMap.get(Constants.STATUS));
+ user.setUserType(UserType.GENERAL_USER);
+ }
+
+ private String getTaskDefinitionJson() {
+ return
"{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ +
"\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\","
+ + "\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\","
+ +
"\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":\\\"echo
${datetime}\\\","
+ +
"\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ +
"\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\","
+ +
"\"flag\":0,\"taskPriority\":0,\"workerGroup\":\"default\",\"failRetryTimes\":0,"
+ +
"\"failRetryInterval\":0,\"timeoutFlag\":0,\"timeoutNotifyStrategy\":0,\"timeout\":0,"
+ + "\"delayTime\":0,\"resourceIds\":\"\"}";
+ }
+
/**
* create admin user
*/
@@ -663,6 +708,27 @@ public class TaskDefinitionServiceImplTest {
return processTaskRelationList;
}
+ private List<ProcessTaskRelation> getProcessTaskRelationListV2() {
+ List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+
+ ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+ fillProcessTaskRelation(processTaskRelation);
+
+ processTaskRelationList.add(processTaskRelation);
+ processTaskRelation = new ProcessTaskRelation();
+ fillProcessTaskRelation(processTaskRelation);
+ processTaskRelation.setPreTaskCode(4L);
+ processTaskRelationList.add(processTaskRelation);
+ return processTaskRelationList;
+ }
+
+ private void fillProcessTaskRelation(ProcessTaskRelation
processTaskRelation) {
+ processTaskRelation.setProjectCode(PROJECT_CODE);
+ processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
+ processTaskRelation.setPreTaskCode(TASK_CODE);
+ processTaskRelation.setPostTaskCode(TASK_CODE + 1L);
+ }
+
private List<ProcessTaskRelationLog> getProcessTaskRelationLogList() {
List<ProcessTaskRelationLog> processTaskRelationLogList = new
ArrayList<>();
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
index 1c1ba25783..d835cdd3ac 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.ibatis.annotations.Param;
+
import java.util.List;
public interface ProcessTaskRelationLogDao {
@@ -26,4 +28,12 @@ public interface ProcessTaskRelationLogDao {
List<ProcessTaskRelationLog> findByWorkflowDefinitionCode(long
workflowDefinitionCode);
void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+
+ /**
+ * batch insert process task relation
+ *
+ * @param taskRelationList taskRelationList
+ * @return int
+ */
+ int batchInsert(@Param("taskRelationList") List<ProcessTaskRelationLog>
taskRelationList);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
index d64f67ad7d..223fa5efbc 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
@@ -41,4 +41,9 @@ public class ProcessTaskRelationLogDaoImpl implements
ProcessTaskRelationLogDao
public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
processTaskRelationLogMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode);
}
+
+ @Override
+ public int batchInsert(List<ProcessTaskRelationLog> taskRelationList) {
+ return processTaskRelationLogMapper.batchInsert(taskRelationList);
+ }
}