This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8439b5dc69 [fix-12721] Fix cannot modify the upstream task in task 
definition page. (#12722)
8439b5dc69 is described below

commit 8439b5dc69f2a7dfc1043393bbbb21a8277715da
Author: jackfanwan <[email protected]>
AuthorDate: Sun Jan 15 19:58:47 2023 +0800

    [fix-12721] Fix cannot modify the upstream task in task definition page. 
(#12722)
---
 .../service/impl/TaskDefinitionServiceImpl.java    | 100 ++++++++++++++++++++-
 .../api/service/TaskDefinitionServiceImplTest.java |  82 +++++++++++++++--
 .../dao/repository/ProcessTaskRelationLogDao.java  |  10 +++
 .../impl/ProcessTaskRelationLogDaoImpl.java        |   5 ++
 4 files changed, 186 insertions(+), 11 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 7dd38bba05..e79507c6b5 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -63,6 +63,7 @@ import 
org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
@@ -82,6 +83,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -122,6 +124,9 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
     @Autowired
     private ProcessTaskRelationMapper processTaskRelationMapper;
 
+    @Autowired
+    private ProcessTaskRelationLogDao processTaskRelationLogDao;
+
     @Autowired
     private ProcessTaskRelationService processTaskRelationService;
 
@@ -852,10 +857,15 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         } else {
             queryUpStreamTaskCodeMap = new HashMap<>();
         }
-        if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
+        if (MapUtils.isNotEmpty(queryUpStreamTaskCodeMap)) {
             ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0);
             List<ProcessTaskRelation> processTaskRelations =
                     processTaskRelationMapper.queryByProcessCode(projectCode, 
taskRelation.getProcessDefinitionCode());
+
+            // set upstream code list
+            updateUpstreamTask(new 
HashSet<>(queryUpStreamTaskCodeMap.keySet()),
+                    taskCode, projectCode, 
taskRelation.getProcessDefinitionCode(), loginUser);
+
             List<ProcessTaskRelation> processTaskRelationList = 
Lists.newArrayList(processTaskRelations);
             List<ProcessTaskRelation> relationList = Lists.newArrayList();
             for (ProcessTaskRelation processTaskRelation : 
processTaskRelationList) {
@@ -879,8 +889,6 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
             if (MapUtils.isEmpty(queryUpStreamTaskCodeMap) && 
CollectionUtils.isNotEmpty(processTaskRelationList)) {
                 processTaskRelationList.add(processTaskRelationList.get(0));
             }
-            updateDag(loginUser, taskRelation.getProcessDefinitionCode(), 
processTaskRelations,
-                    Lists.newArrayList(taskDefinitionToUpdate));
         }
         logger.info(
                 "Update task with upstream tasks complete, projectCode:{}, 
taskDefinitionCode:{}, upstreamTaskCodes:{}.",
@@ -890,6 +898,92 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         return result;
     }
 
+    private void updateUpstreamTask(Set<Long> allPreTaskCodeSet, long 
taskCode, long projectCode,
+                                    long processDefinitionCode, User 
loginUser) {
+        // query all process task relation
+        List<ProcessTaskRelation> hadProcessTaskRelationList = 
processTaskRelationMapper
+                .queryUpstreamByCode(projectCode, taskCode);
+        // remove pre
+        Set<Long> removePreTaskSet = new HashSet<>();
+        List<ProcessTaskRelation> removePreTaskList = new ArrayList<>();
+        // add pre
+        Set<Long> addPreTaskSet = new HashSet<>();
+        List<ProcessTaskRelation> addPreTaskList = new ArrayList<>();
+
+        List<ProcessTaskRelationLog> processTaskRelationLogList = new 
ArrayList<>();
+
+        // filter all process task relation
+        if (CollectionUtils.isNotEmpty(hadProcessTaskRelationList)) {
+            for (ProcessTaskRelation processTaskRelation : 
hadProcessTaskRelationList) {
+                if (processTaskRelation.getPreTaskCode() == 0) {
+                    continue;
+                }
+                // had
+                if 
(allPreTaskCodeSet.contains(processTaskRelation.getPreTaskCode())) {
+                    
allPreTaskCodeSet.remove(processTaskRelation.getPreTaskCode());
+                } else {
+                    // remove
+                    removePreTaskSet.add(processTaskRelation.getPreTaskCode());
+                    processTaskRelation.setPreTaskCode(0);
+                    processTaskRelation.setPreTaskVersion(0);
+                    removePreTaskList.add(processTaskRelation);
+                    
processTaskRelationLogList.add(createProcessTaskRelationLog(loginUser, 
processTaskRelation));
+                }
+            }
+        }
+        // add
+        if (allPreTaskCodeSet.size() != 0) {
+            addPreTaskSet.addAll(allPreTaskCodeSet);
+        }
+        // get add task code map
+        allPreTaskCodeSet.add(Long.valueOf(taskCode));
+        List<TaskDefinition> taskDefinitionList = 
taskDefinitionMapper.queryByCodeList(allPreTaskCodeSet);
+        Map<Long, TaskDefinition> taskCodeMap = 
taskDefinitionList.stream().collect(Collectors
+                .toMap(TaskDefinition::getCode, Function.identity(), (a, b) -> 
a));
+
+        ProcessDefinition processDefinition = 
processDefinitionMapper.queryByCode(processDefinitionCode);
+        TaskDefinition taskDefinition = taskCodeMap.get(taskCode);
+
+        for (Long preTaskCode : addPreTaskSet) {
+            TaskDefinition preTaskRelation = taskCodeMap.get(preTaskCode);
+            ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(
+                    null, processDefinition.getVersion(), projectCode, 
processDefinition.getCode(),
+                    preTaskRelation.getCode(), preTaskRelation.getVersion(),
+                    taskDefinition.getCode(), taskDefinition.getVersion(), 
ConditionType.NONE, "{}");
+            addPreTaskList.add(processTaskRelation);
+            
processTaskRelationLogList.add(createProcessTaskRelationLog(loginUser, 
processTaskRelation));
+        }
+        int insert = 0;
+        int remove = 0;
+        int log = 0;
+        // insert process task relation table data
+        if (CollectionUtils.isNotEmpty(addPreTaskList)) {
+            insert = processTaskRelationMapper.batchInsert(addPreTaskList);
+        }
+        if (CollectionUtils.isNotEmpty(removePreTaskList)) {
+            for (ProcessTaskRelation processTaskRelation : removePreTaskList) {
+                remove += 
processTaskRelationMapper.updateById(processTaskRelation);
+            }
+        }
+        if (CollectionUtils.isNotEmpty(processTaskRelationLogList)) {
+            log = 
processTaskRelationLogDao.batchInsert(processTaskRelationLogList);
+        }
+        if (insert + remove != log) {
+            throw new RuntimeException("updateUpstreamTask error");
+        }
+    }
+
+    private ProcessTaskRelationLog createProcessTaskRelationLog(User loginUser,
+                                                                
ProcessTaskRelation processTaskRelation) {
+        Date now = new Date();
+        ProcessTaskRelationLog processTaskRelationLog = new 
ProcessTaskRelationLog(processTaskRelation);
+        processTaskRelationLog.setOperator(loginUser.getId());
+        processTaskRelationLog.setOperateTime(now);
+        processTaskRelationLog.setCreateTime(now);
+        processTaskRelationLog.setUpdateTime(now);
+        return processTaskRelationLog;
+    }
+
     /**
      * switch task definition
      *
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index a066b81124..391769627f 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -56,12 +56,14 @@ import 
org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.process.ProcessServiceImpl;
 
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -124,11 +126,15 @@ public class TaskDefinitionServiceImplTest {
     @Mock
     private ProcessDefinitionService processDefinitionService;
 
+    @Mock
+    private ProcessTaskRelationLogDao processTaskRelationLogDao;
+
     private static final String TASK_PARAMETER =
             "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 
1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";;
     private static final long PROJECT_CODE = 1L;
     private static final long PROCESS_DEFINITION_CODE = 2L;
     private static final long TASK_CODE = 3L;
+    private static final String UPSTREAM_CODE = "3,5";
     private static final int VERSION = 1;
     private static final int RESOURCE_RATE = -1;
     protected User user;
@@ -169,14 +175,7 @@ public class TaskDefinitionServiceImplTest {
 
     @Test
     public void updateTaskDefinition() {
-        String taskDefinitionJson =
-                
"{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
-                        + 
"\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
-                        + 
"\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
-                        + "\\\"echo 
${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
-                        + 
"\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
-                        + 
"\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
-                        + 
"\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}";
+        String taskDefinitionJson = getTaskDefinitionJson();;
 
         Project project = getProject();
         
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
@@ -591,6 +590,52 @@ public class TaskDefinitionServiceImplTest {
         Assertions.assertDoesNotThrow(() -> 
taskDefinitionService.getTaskDefinition(user, TASK_CODE));
     }
 
+    @Test
+    public void testUpdateTaskWithUpstream() {
+
+        String taskDefinitionJson = getTaskDefinitionJson();
+        TaskDefinition taskDefinition = getTaskDefinition();
+        taskDefinition.setFlag(Flag.NO);
+        TaskDefinition taskDefinitionSecond = getTaskDefinition();
+        taskDefinitionSecond.setCode(5);
+
+        user.setUserType(UserType.ADMIN_USER);
+        
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
+        Mockito.when(projectService.hasProjectAndWritePerm(user, getProject(), 
new HashMap<>())).thenReturn(true);
+        
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition);
+        
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
+        
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
+        
Mockito.when(taskDefinitionMapper.updateById(Mockito.any())).thenReturn(1);
+        
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any())).thenReturn(1);
+
+        Mockito.when(taskDefinitionMapper.queryByCodeList(Mockito.anySet()))
+                .thenReturn(Arrays.asList(taskDefinition, 
taskDefinitionSecond));
+
+        
Mockito.when(processTaskRelationMapper.queryUpstreamByCode(PROJECT_CODE, 
TASK_CODE))
+                .thenReturn(getProcessTaskRelationListV2());
+        
Mockito.when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition());
+        
Mockito.when(processTaskRelationMapper.batchInsert(Mockito.anyList())).thenReturn(1);
+        
Mockito.when(processTaskRelationMapper.updateById(Mockito.any())).thenReturn(1);
+        
Mockito.when(processTaskRelationLogDao.batchInsert(Mockito.anyList())).thenReturn(2);
+        // success
+        Map<String, Object> successMap = 
taskDefinitionService.updateTaskWithUpstream(user, PROJECT_CODE, TASK_CODE,
+                taskDefinitionJson, UPSTREAM_CODE);
+        Assertions.assertEquals(Status.SUCCESS, 
successMap.get(Constants.STATUS));
+        user.setUserType(UserType.GENERAL_USER);
+    }
+
+    private String getTaskDefinitionJson() {
+        return 
"{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+                + 
"\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\","
+                + "\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\","
+                + 
"\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":\\\"echo 
${datetime}\\\","
+                + 
"\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+                + 
"\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\","
+                + 
"\"flag\":0,\"taskPriority\":0,\"workerGroup\":\"default\",\"failRetryTimes\":0,"
+                + 
"\"failRetryInterval\":0,\"timeoutFlag\":0,\"timeoutNotifyStrategy\":0,\"timeout\":0,"
+                + "\"delayTime\":0,\"resourceIds\":\"\"}";
+    }
+
     /**
      * create admin user
      */
@@ -663,6 +708,27 @@ public class TaskDefinitionServiceImplTest {
         return processTaskRelationList;
     }
 
+    private List<ProcessTaskRelation> getProcessTaskRelationListV2() {
+        List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        fillProcessTaskRelation(processTaskRelation);
+
+        processTaskRelationList.add(processTaskRelation);
+        processTaskRelation = new ProcessTaskRelation();
+        fillProcessTaskRelation(processTaskRelation);
+        processTaskRelation.setPreTaskCode(4L);
+        processTaskRelationList.add(processTaskRelation);
+        return processTaskRelationList;
+    }
+
+    private void fillProcessTaskRelation(ProcessTaskRelation 
processTaskRelation) {
+        processTaskRelation.setProjectCode(PROJECT_CODE);
+        processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
+        processTaskRelation.setPreTaskCode(TASK_CODE);
+        processTaskRelation.setPostTaskCode(TASK_CODE + 1L);
+    }
+
     private List<ProcessTaskRelationLog> getProcessTaskRelationLogList() {
         List<ProcessTaskRelationLog> processTaskRelationLogList = new 
ArrayList<>();
 
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
index 1c1ba25783..d835cdd3ac 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.dao.repository;
 
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 
+import org.apache.ibatis.annotations.Param;
+
 import java.util.List;
 
 public interface ProcessTaskRelationLogDao {
@@ -26,4 +28,12 @@ public interface ProcessTaskRelationLogDao {
     List<ProcessTaskRelationLog> findByWorkflowDefinitionCode(long 
workflowDefinitionCode);
 
     void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+
+    /**
+     * batch insert process task relation
+     *
+     * @param taskRelationList taskRelationList
+     * @return int
+     */
+    int batchInsert(@Param("taskRelationList") List<ProcessTaskRelationLog> 
taskRelationList);
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
index d64f67ad7d..223fa5efbc 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
@@ -41,4 +41,9 @@ public class ProcessTaskRelationLogDaoImpl implements 
ProcessTaskRelationLogDao
     public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
         
processTaskRelationLogMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode);
     }
+
+    @Override
+    public int batchInsert(List<ProcessTaskRelationLog> taskRelationList) {
+        return processTaskRelationLogMapper.batchInsert(taskRelationList);
+    }
 }

Reply via email to