This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1edd2bd  to #6957 implement interface deleteUpstreamRelation 
deleteDownstreamRelation  (#6966)
1edd2bd is described below

commit 1edd2bd0f6702bcea5e3d375982f2444bdac9b65
Author: zwZjut <[email protected]>
AuthorDate: Wed Nov 24 17:12:47 2021 +0800

    to #6957 implement interface deleteUpstreamRelation 
deleteDownstreamRelation  (#6966)
    
    * to #6957
    
    * to #6957
    
    * to #6957
    
    * to #6957: group by process definition code
    
    * to #6957: fix sonar
    
    * to #6957: fix sonar
    
    * to #6957: add ut , add preTaskVersion
    
    * to #6957: fix style
    
    * to #6957: use org.apache.dolphinscheduler.spi.utils.StringUtils
    
    Co-authored-by: honghuo.zw <[email protected]>
---
 .../impl/ProcessTaskRelationServiceImpl.java       | 119 +++++++++++++++++++--
 .../service/ProcessTaskRelationServiceTest.java    |  79 ++++++++++++++
 .../dao/mapper/ProcessTaskRelationMapper.java      |  40 +++++++
 .../dao/mapper/ProcessTaskRelationMapper.xml       |  56 ++++++++++
 4 files changed, 286 insertions(+), 8 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index 3d6e4a2..9a476c8 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -35,6 +35,7 @@ import 
org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
 import org.apache.commons.collections.CollectionUtils;
 
@@ -50,6 +51,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
+import com.google.common.collect.Lists;
+
 /**
  * process task relation service impl
  */
@@ -202,29 +205,72 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
     /**
      * delete task upstream relation
      *
-     * @param loginUser login user
-     * @param projectCode project code
+     * @param loginUser    login user
+     * @param projectCode  project code
      * @param preTaskCodes the pre task codes, sep ','
-     * @param taskCode the post task code
+     * @param taskCode     the post task code
      * @return delete result code
      */
     @Override
     public Map<String, Object> deleteUpstreamRelation(User loginUser, long 
projectCode, String preTaskCodes, long taskCode) {
-        return null;
+        Project project = projectMapper.queryByCode(projectCode);
+        //check user access for project
+        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+        if (StringUtils.isEmpty(preTaskCodes)) {
+            putMsg(result,Status.DATA_IS_NULL,"preTaskCodes");
+            return result;
+        }
+        Set<Long> preTaskCodesSet = 
Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
+        Status status = deleteUpstreamRelation(projectCode, 
preTaskCodesSet.toArray(new Long[0]), taskCode);
+        if (status != Status.SUCCESS) {
+            putMsg(result, status);
+        }
+        return result;
     }
 
     /**
      * delete task downstream relation
      *
-     * @param loginUser login user
-     * @param projectCode project code
+     * @param loginUser     login user
+     * @param projectCode   project code
      * @param postTaskCodes the post task codes, sep ','
-     * @param taskCode the pre task code
+     * @param taskCode      the pre task code
      * @return delete result code
      */
     @Override
     public Map<String, Object> deleteDownstreamRelation(User loginUser, long 
projectCode, String postTaskCodes, long taskCode) {
-        return null;
+        Project project = projectMapper.queryByCode(projectCode);
+        //check user access for project
+        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+        if (StringUtils.isEmpty(postTaskCodes)) {
+            putMsg(result,Status.DATA_IS_NULL,"postTaskCodes");
+            return result;
+        }
+        Set<Long> postTaskCodesSet = 
Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
+        List<Long> deleteFailedCodeList = new ArrayList<>();
+        postTaskCodesSet.stream().forEach(
+                postTaskCode -> {
+                    try {
+                        Status status = deleteUpstreamRelation(projectCode, 
new Long[]{taskCode}, postTaskCode);
+                        if (Status.SUCCESS != status) {
+                            deleteFailedCodeList.add(postTaskCode);
+                        }
+                    } catch (Exception e) {
+                        deleteFailedCodeList.add(postTaskCode);
+                    }
+
+                }
+        );
+        if (!deleteFailedCodeList.isEmpty()) {
+            putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR, 
String.join(",", deleteFailedCodeList.stream().map(o -> o + 
"").collect(Collectors.toList())));
+        }
+        return result;
     }
 
     /**
@@ -328,4 +374,61 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
         };
     }
 
+    /**
+     * delete upstream relation
+     *
+     * @param projectCode  project code
+     * @param preTaskCodes pre task codes
+     * @param taskCode     pre task code
+     * @return status
+     */
+    private Status deleteUpstreamRelation(long projectCode, Long[] 
preTaskCodes, long taskCode) {
+        List<ProcessTaskRelation> upstreamList = 
processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, 
preTaskCodes);
+        if (CollectionUtils.isEmpty(upstreamList)) {
+            return Status.SUCCESS;
+        }
+        Map<Long, List<ProcessTaskRelation>> 
processTaskRelationListGroupByProcessDefinitionCode = upstreamList.stream()
+                
.collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode));
+        // count upstream relation group by process definition code
+        List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = 
processTaskRelationMapper
+                .countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, 
processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new 
Long[0]), taskCode);
+
+        List<ProcessTaskRelation> deletes = new ArrayList<>();
+        List<ProcessTaskRelation> updates = new ArrayList<>();
+
+        countListGroupByProcessDefinitionCode.stream().forEach(
+                processDefinitionCodeUpstreamCountMap ->
+                        
processDefinitionCodeUpstreamCountMap.entrySet().stream().forEach(
+                                o -> {
+                                    Long processDefinitionCode = o.getKey();
+                                    Integer count = o.getValue();
+                                    List<ProcessTaskRelation> 
processTaskRelationList = 
processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
+                                    if (count <= 
processTaskRelationList.size()) {
+                                        ProcessTaskRelation 
processTaskRelation = processTaskRelationList.remove(0);
+                                        if 
(processTaskRelation.getPreTaskCode() != 0) {
+                                            
processTaskRelation.setPreTaskCode(0);
+                                            
processTaskRelation.setPreTaskVersion(0);
+                                            updates.add(processTaskRelation);
+                                        }
+                                    }
+                                    if (!processTaskRelationList.isEmpty()) {
+                                        
deletes.addAll(processTaskRelationList);
+                                    }
+                                }
+                        )
+        );
+
+        int update = 0;
+        if (!updates.isEmpty()) {
+            update = 
processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(updates);
+        }
+        int delete = 0;
+        if (!deletes.isEmpty()) {
+            delete = 
processTaskRelationMapper.deleteBatchIds(deletes.stream().map(ProcessTaskRelation::getId).collect(Collectors.toList()));
+        }
+        if (update < 0 || delete < 0) {
+            return Status.DELETE_TASK_PROCESS_RELATION_ERROR;
+        }
+        return Status.SUCCESS;
+    }
 }
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
index 758eceb..55efbfe 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
@@ -186,15 +186,18 @@ public class ProcessTaskRelationServiceTest {
         processTaskRelationUpstream0.setPreTaskVersion(1);
         processTaskRelationUpstream0.setProjectCode(projectCode);
         processTaskRelationUpstream0.setPreTaskCode(123);
+        processTaskRelationUpstream0.setProcessDefinitionCode(123);
         ProcessTaskRelation processTaskRelationUpstream1 = new 
ProcessTaskRelation();
         processTaskRelationUpstream1.setPostTaskCode(taskCode);
         processTaskRelationUpstream1.setPreTaskVersion(1);
         processTaskRelationUpstream1.setPreTaskCode(123);
+        processTaskRelationUpstream0.setProcessDefinitionCode(124);
         processTaskRelationUpstream1.setProjectCode(projectCode);
         ProcessTaskRelation processTaskRelationUpstream2 = new 
ProcessTaskRelation();
         processTaskRelationUpstream2.setPostTaskCode(taskCode);
         processTaskRelationUpstream2.setPreTaskVersion(2);
         processTaskRelationUpstream1.setPreTaskCode(123);
+        processTaskRelationUpstream0.setProcessDefinitionCode(125);
         processTaskRelationUpstream2.setProjectCode(projectCode);
         List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
         processTaskRelationList.add(processTaskRelationUpstream0);
@@ -382,4 +385,80 @@ public class ProcessTaskRelationServiceTest {
         Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
         Assert.assertEquals(2, ((List) relation.get("data")).size());
     }
+
+    @Test
+    public void testDeleteDownstreamRelation() {
+        long projectCode = 1L;
+        long taskCode = 2L;
+        Project project = getProject(projectCode);
+        
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+
+        User loginUser = new User();
+        loginUser.setId(-1);
+        loginUser.setUserType(UserType.GENERAL_USER);
+        Map<String, Object> result = new HashMap<>();
+        putMsg(result, Status.SUCCESS, projectCode);
+        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode)).thenReturn(result);
+        List<ProcessTaskRelation> processTaskRelationList = 
getProcessTaskUpstreamRelationList(projectCode, taskCode);
+        
Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, 
taskCode, new Long[]{123L})).thenReturn(processTaskRelationList);
+        List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = new 
ArrayList<>();
+        countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() 
{
+            {
+                put(123L, 2);
+            }
+        });
+        countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() 
{
+            {
+                put(124L, 1);
+            }
+        });
+        countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() 
{
+            {
+                put(125L, 3);
+            }
+        });
+        
Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode,
 new Long[]{123L, 124L, 125L}, 
2)).thenReturn(countListGroupByProcessDefinitionCode);
+        
Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new
 ArrayList())).thenReturn(3);
+        Mockito.when(processTaskRelationMapper.deleteBatchIds(new 
ArrayList())).thenReturn(3);
+        Map<String, Object> result1 = 
processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode, 
"123", taskCode);
+        Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
+    }
+
+    @Test
+    public void testDeleteUpstreamRelation() {
+        long projectCode = 1L;
+        long taskCode = 2L;
+        Project project = getProject(projectCode);
+        
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+
+        User loginUser = new User();
+        loginUser.setId(-1);
+        loginUser.setUserType(UserType.GENERAL_USER);
+        Map<String, Object> result = new HashMap<>();
+        putMsg(result, Status.SUCCESS, projectCode);
+        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode)).thenReturn(result);
+        List<ProcessTaskRelation> processTaskRelationList = 
getProcessTaskUpstreamRelationList(projectCode, taskCode);
+        
Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, 
taskCode, new Long[]{123L})).thenReturn(processTaskRelationList);
+        List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = new 
ArrayList<>();
+        countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() 
{
+            {
+                put(123L, 2);
+            }
+        });
+        countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() 
{
+            {
+                put(124L, 1);
+            }
+        });
+        countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() 
{
+            {
+                put(125L, 3);
+            }
+        });
+        
Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode,
 new Long[]{123L, 124L, 125L}, 
2)).thenReturn(countListGroupByProcessDefinitionCode);
+        
Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new
 ArrayList())).thenReturn(3);
+        Mockito.when(processTaskRelationMapper.deleteBatchIds(new 
ArrayList())).thenReturn(3);
+        Map<String, Object> result1 = 
processTaskRelationService.deleteUpstreamRelation(loginUser, projectCode, 
"123", taskCode);
+        Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
+    }
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 6d4c79b..a03f8f0 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -23,6 +23,7 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 import org.apache.ibatis.annotations.Param;
 
 import java.util.List;
+import java.util.Map;
 
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 
@@ -102,6 +103,45 @@ public interface ProcessTaskRelationMapper extends 
BaseMapper<ProcessTaskRelatio
     List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long 
projectCode, @Param("taskCode") long taskCode);
 
     /**
+     * query task relation by codes
+     *
+     * @param projectCode   projectCode
+     * @param taskCode      taskCode
+     * @param postTaskCodes postTaskCodes list
+     * @return ProcessTaskRelation
+     */
+    List<ProcessTaskRelation> queryDownstreamByCodes(@Param("projectCode") 
long projectCode, @Param("taskCode") long taskCode,@Param("postTaskCodes") 
Long[] postTaskCodes);
+
+    /**
+     * query task relation by codes
+     *
+     * @param projectCode  projectCode
+     * @param taskCode     taskCode
+     * @param preTaskCodes preTaskCode list
+     * @return ProcessTaskRelation
+     */
+    List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long 
projectCode, @Param("taskCode") long taskCode,@Param("preTaskCodes") Long[] 
preTaskCodes);
+
+    /**
+     * count upstream by codes
+     *
+     * @param projectCode projectCode
+     * @param taskCode    taskCode
+     * @param processDefinitionCodes    processDefinitionCodes
+     * @return upstream count list group by process definition code
+     */
+    List<Map<Long, Integer>> 
countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long 
projectCode,
+                                                                             
@Param("processDefinitionCodes") Long[] processDefinitionCodes, 
@Param("taskCode") long taskCode);
+
+    /**
+     * batch update process task relation pre task
+     *
+     * @param processTaskRelationList process task relation list
+     * @return update num
+     */
+    int 
batchUpdateProcessTaskRelationPreTask(@Param("processTaskRelationList") 
List<ProcessTaskRelation> processTaskRelationList);
+
+    /**
      * query by code
      *
      * @param projectCode           projectCode
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index 11602eb..95a6550 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -92,6 +92,62 @@
         and post_task_code = #{taskCode}
     </select>
 
+    <select id="queryDownstreamByCodes" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_task_relation
+        WHERE project_code = #{projectCode}
+        and pre_task_code = #{taskCode}
+        <if test="postTaskCodes != null and postTaskCodes.length != 0">
+            and post_task_code in
+            <foreach collection="postTaskCodes" index="index" item="i" 
open="(" separator="," close=")">
+                #{i}
+            </foreach>
+        </if>
+    </select>
+
+    <select id="queryUpstreamByCodes" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_task_relation
+        WHERE project_code = #{projectCode}
+        and post_task_code = #{taskCode}
+        <if test="preTaskCodes != null and preTaskCodes.length != 0">
+            and pre_task_code in
+            <foreach collection="preTaskCodes" index="index" item="i" open="(" 
separator="," close=")">
+                #{i}
+            </foreach>
+        </if>
+    </select>
+
+    <select id="countUpstreamByCodeGroupByProcessDefinitionCode" 
resultType="java.lang.Integer">
+        select process_definition_code,
+        count(0)
+        from t_ds_process_task_relation
+        WHERE project_code = #{projectCode}
+        and post_task_code = #{taskCode}
+        <if test="processDefinitionCodes != null and 
processDefinitionCodes.length != 0">
+            and process_definition_code in
+            <foreach collection="processDefinitionCodes" index="index" 
item="i" open="(" separator="," close=")">
+                #{i}
+            </foreach>
+        </if>
+        group by process_definition_code
+    </select>
+
+    <update id="batchUpdateProcessTaskRelationPreTask" 
parameterType="java.util.List">
+        <foreach collection="processTaskRelationList" 
item="processTaskRelation" index="index" open="" close="" separator=";">
+            update t_ds_process_task_relation
+            <set>
+                pre_task_code=#{processTaskRelation.preTaskCode},
+                pre_task_version=#{processTaskRelation.preTaskVersion}
+            </set>
+            <where>
+                WHERE id = #{processTaskRelation.id}
+            </where>
+        </foreach>
+    </update>
+
     <select id="queryByCode" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
         select
         <include refid="baseSql"/>

Reply via email to