This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ae847ba331 [Fix-15036] [API] Fix task definition edit doesn't work 
(#14801)
ae847ba331 is described below

commit ae847ba331ac6edec3c6b2e7013098bb274a5871
Author: 小可耐 <[email protected]>
AuthorDate: Wed Oct 25 20:25:02 2023 +0800

    [Fix-15036] [API] Fix task definition edit doesn't work (#14801)
    
    * [Bug] [task definition] 修复任务定义修改,工作流无法生效问题
    
    * [Bug] [API] Fix task check apply
    
    * [BUG][API] fix updateTaskDefinition method ut
    
    * [BUG][API] fix updateTaskDefinition method code specification
    
    * [BUG][API] fix one task corresponds to multiple workflow problems
    
    * [BUG][API] queryProcessTaskRelationByTaskCodeAndTaskVersion method adjust
    
    * [BUG][API] fix updateTaskDefinition method update
    
    ---------
    
    Co-authored-by: David Zollo <[email protected]>
    Co-authored-by: 旺阳 <[email protected]>
    Co-authored-by: xiangzihao <[email protected]>
---
 .../service/impl/TaskDefinitionServiceImpl.java    | 68 ++++++++++++++++++----
 .../api/service/TaskDefinitionServiceImplTest.java | 10 +++-
 .../dao/mapper/ProcessTaskRelationMapper.java      | 10 ++++
 .../dao/mapper/ProcessTaskRelationMapper.xml       | 18 +++++-
 4 files changed, 92 insertions(+), 14 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 838744767a..303e3e62b0 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -51,6 +51,7 @@ import 
org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import 
org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 import org.apache.dolphinscheduler.dao.entity.Project;
@@ -58,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
@@ -141,6 +143,9 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
     @Autowired
     private ProcessDefinitionService processDefinitionService;
 
+    @Autowired
+    private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
     /**
      * create task definition
      *
@@ -781,20 +786,59 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
                     projectCode, taskCode, 
taskDefinitionToUpdate.getVersion());
         // update process task relation
         List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper
-                .queryByTaskCode(taskDefinitionToUpdate.getCode());
+                
.queryProcessTaskRelationByTaskCodeAndTaskVersion(taskDefinitionToUpdate.getCode(),
+                        taskDefinition.getVersion());
         if (CollectionUtils.isNotEmpty(processTaskRelations)) {
-            for (ProcessTaskRelation processTaskRelation : 
processTaskRelations) {
-                if (taskCode == processTaskRelation.getPreTaskCode()) {
-                    processTaskRelation.setPreTaskVersion(version);
-                } else if (taskCode == processTaskRelation.getPostTaskCode()) {
-                    processTaskRelation.setPostTaskVersion(version);
+            Map<Long, List<ProcessTaskRelation>> processTaskRelationGroupList 
= processTaskRelations.stream()
+                    
.collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode));
+            for (Map.Entry<Long, List<ProcessTaskRelation>> 
processTaskRelationMap : processTaskRelationGroupList
+                    .entrySet()) {
+                Long processDefinitionCode = processTaskRelationMap.getKey();
+                int processDefinitionVersion =
+                        
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinitionCode)
+                                + 1;
+                List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMap.getValue();
+                for (ProcessTaskRelation processTaskRelation : 
processTaskRelationList) {
+                    if (taskCode == processTaskRelation.getPreTaskCode()) {
+                        processTaskRelation.setPreTaskVersion(version);
+                    } else if (taskCode == 
processTaskRelation.getPostTaskCode()) {
+                        processTaskRelation.setPostTaskVersion(version);
+                    }
+                    
processTaskRelation.setProcessDefinitionVersion(processDefinitionVersion);
+                    int updateProcessDefinitionVersionCount =
+                            
processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
+                    if (updateProcessDefinitionVersionCount != 1) {
+                        log.error("batch update process task relation error, 
projectCode:{}, taskDefinitionCode:{}.",
+                                projectCode, taskCode);
+                        putMsg(result, 
Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+                        throw new 
ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+                    }
+                    ProcessTaskRelationLog processTaskRelationLog = new 
ProcessTaskRelationLog(processTaskRelation);
+                    processTaskRelationLog.setOperator(loginUser.getId());
+                    processTaskRelationLog.setId(null);
+                    processTaskRelationLog.setOperateTime(now);
+                    int insertProcessTaskRelationLogCount = 
processTaskRelationLogDao.insert(processTaskRelationLog);
+                    if (insertProcessTaskRelationLogCount != 1) {
+                        log.error("batch update process task relation error, 
projectCode:{}, taskDefinitionCode:{}.",
+                                projectCode, taskCode);
+                        putMsg(result, 
Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR);
+                        throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR);
+                    }
                 }
-                int count = 
processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
-                if (count != 1) {
-                    log.error("batch update process task relation error, 
projectCode:{}, taskDefinitionCode:{}.",
-                            projectCode, taskCode);
-                    putMsg(result, 
Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
-                    throw new 
ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+                ProcessDefinition processDefinition = 
processDefinitionMapper.queryByCode(processDefinitionCode);
+                processDefinition.setVersion(processDefinitionVersion);
+                processDefinition.setUpdateTime(now);
+                processDefinition.setUserId(loginUser.getId());
+                // update process definition
+                int updateProcessDefinitionCount = 
processDefinitionMapper.updateById(processDefinition);
+                ProcessDefinitionLog processDefinitionLog = new 
ProcessDefinitionLog(processDefinition);
+                processDefinitionLog.setOperateTime(now);
+                processDefinitionLog.setId(null);
+                processDefinitionLog.setOperator(loginUser.getId());
+                int insertProcessDefinitionLogCount = 
processDefinitionLogMapper.insert(processDefinitionLog);
+                if ((updateProcessDefinitionCount & 
insertProcessDefinitionLogCount) != 1) {
+                    putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+                    throw new 
ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
                 }
             }
         }
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index 26091e7529..2644712074 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -129,6 +129,9 @@ public class TaskDefinitionServiceImplTest {
     @Mock
     private ProcessTaskRelationLogDao processTaskRelationLogDao;
 
+    @Mock
+    private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
     private static final String TASK_PARAMETER =
             "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 
1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";;
     private static final long PROJECT_CODE = 1L;
@@ -188,9 +191,14 @@ public class TaskDefinitionServiceImplTest {
         
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new 
TaskDefinition());
         
Mockito.when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
         
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
+        
Mockito.when(processTaskRelationLogDao.insert(Mockito.any(ProcessTaskRelationLog.class))).thenReturn(1);
+        Mockito.when(processDefinitionMapper.queryByCode(2L)).thenReturn(new 
ProcessDefinition());
+        
Mockito.when(processDefinitionMapper.updateById(Mockito.any(ProcessDefinition.class))).thenReturn(1);
+        
Mockito.when(processDefinitionLogMapper.insert(Mockito.any(ProcessDefinitionLog.class))).thenReturn(1);
         
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
         
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
-        
Mockito.when(processTaskRelationMapper.queryByTaskCode(3)).thenReturn(getProcessTaskRelationList2());
+        
Mockito.when(processTaskRelationMapper.queryProcessTaskRelationByTaskCodeAndTaskVersion(TASK_CODE,
 0))
+                .thenReturn(getProcessTaskRelationList2());
         Mockito.when(processTaskRelationMapper
                 
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
         result = taskDefinitionService.updateTaskDefinition(user, 
PROJECT_CODE, TASK_CODE, taskDefinitionJson);
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 9b8dfc8736..07ff5c2d86 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -225,4 +225,14 @@ public interface ProcessTaskRelationMapper extends 
BaseMapper<ProcessTaskRelatio
 
     void deleteByWorkflowDefinitionCode(@Param("workflowDefinitionCode") long 
workflowDefinitionCode,
                                         @Param("workflowDefinitionVersion") 
int workflowDefinitionVersion);
+
+    /**
+     * process task relation by taskCode and postTaskVersion
+     *
+     * @param taskCode taskCode
+     * @param postTaskVersion postTaskVersion
+     * @return ProcessTaskRelation
+     */
+    List<ProcessTaskRelation> 
queryProcessTaskRelationByTaskCodeAndTaskVersion(@Param("taskCode") long 
taskCode,
+                                                                               
@Param("postTaskVersion") long postTaskVersion);
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index 2da8514c55..742d53726e 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -231,7 +231,8 @@
     <update id="updateProcessTaskRelationTaskVersion">
         update t_ds_process_task_relation
         set pre_task_version=#{processTaskRelation.preTaskVersion},
-            post_task_version=#{processTaskRelation.postTaskVersion}
+            post_task_version=#{processTaskRelation.postTaskVersion},
+            
process_definition_version=#{processTaskRelation.processDefinitionVersion}
         where id = #{processTaskRelation.id}
     </update>
 
@@ -240,4 +241,19 @@
         from t_ds_process_task_relation
         where process_definition_code = #{workflowDefinitionCode} and 
process_definition_version = #{workflowDefinitionVersion}
     </delete>
+
+    <select id="queryProcessTaskRelationByTaskCodeAndTaskVersion" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_task_relation
+        WHERE process_definition_code in (
+                                            SELECT
+                                            process_definition_code
+                                            FROM
+                                            t_ds_process_task_relation
+                                            WHERE
+                                            post_task_code = #{taskCode}
+                                            and post_task_version = 
#{postTaskVersion}
+                                          )
+    </select>
 </mapper>

Reply via email to