This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 199a84a  [fix #6878]  implement interface queryUpstreamRelation 
queryDownstreamRelation and add UT (#6888)
199a84a is described below

commit 199a84aa059234d76f01fc4ed5daf96abcfeb630
Author: zwZjut <[email protected]>
AuthorDate: Mon Nov 22 11:02:15 2021 +0800

    [fix #6878]  implement interface queryUpstreamRelation 
queryDownstreamRelation and add UT (#6888)
    
    * fix: #6878, implement interface queryUpstreamRelation 
queryDownstreamRelation and add UT
    
    * fix: #6878, implement interface queryUpstreamRelation 
queryDownstreamRelation and add UT
    
    * fix: #6878, add license
    
    * fix: #6878, code style
    
    * fix: #6878, code style
    
    * fix: #6878, fix Duplication
    
    * fix: #6878
    
    * fix: #6878 , fix style
    
    * to #6878: fix style
    
    * to #6878: fix style
    
    * to #6878: add buildTaskDefinition to overwrite equals and hashCode
    
    Co-authored-by: honghuo.zw <[email protected]>
---
 .../impl/ProcessTaskRelationServiceImpl.java       | 110 ++++++-
 .../ProcessTaskRelationControllerTest.java         |  84 ++++++
 .../service/ProcessTaskRelationServiceTest.java    | 317 +++++++++++++++++++++
 .../dao/mapper/ProcessTaskRelationMapper.java      |  18 ++
 .../dao/mapper/ProcessTaskRelationMapper.xml       |  17 ++
 5 files changed, 537 insertions(+), 9 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index 915fc21..4d99de7 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -17,14 +17,28 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
+import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,8 +63,10 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
     private ProcessTaskRelationMapper processTaskRelationMapper;
 
     @Autowired
-    private UserMapper userMapper;
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
 
+    @Autowired
+    private UserMapper userMapper;
     /**
      * create process task relation
      *
@@ -126,26 +142,102 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
     /**
      * query task upstream relation
      *
-     * @param loginUser login user
+     * @param loginUser   login user
      * @param projectCode project code
-     * @param taskCode current task code (post task code)
-     * @return process task relation list
+     * @param taskCode    current task code (post task code)
+     * @return the upstream task definitions
      */
     @Override
     public Map<String, Object> queryUpstreamRelation(User loginUser, long 
projectCode, long taskCode) {
-        return null;
+        Project project = projectMapper.queryByCode(projectCode);
+        //check user access for project
+        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+        List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+        List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
+        if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
+            Set<TaskDefinition> taskDefinitions = processTaskRelationList
+                    .stream()
+                    .map(processTaskRelation -> {
+                        TaskDefinition taskDefinition = buildTaskDefinition();
+                        
taskDefinition.setProjectCode(processTaskRelation.getProjectCode());
+                        
taskDefinition.setCode(processTaskRelation.getPreTaskCode());
+                        
taskDefinition.setVersion(processTaskRelation.getPreTaskVersion());
+                        return taskDefinition;
+                    })
+                    .collect(Collectors.toSet());
+            taskDefinitionLogList = 
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions);
+        }
+        result.put(Constants.DATA_LIST, taskDefinitionLogList);
+        putMsg(result, Status.SUCCESS);
+        return result;
     }
 
     /**
      * query task downstream relation
      *
-     * @param loginUser login user
+     * @param loginUser   login user
      * @param projectCode project code
-     * @param taskCode pre task code
-     * @return process task relation list
+     * @param taskCode    pre task code
+     * @return the downstream task definitions
      */
     @Override
     public Map<String, Object> queryDownstreamRelation(User loginUser, long 
projectCode, long taskCode) {
-        return null;
+        Project project = projectMapper.queryByCode(projectCode);
+        //check user access for project
+        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+        List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode);
+        List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
+        if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
+            Set<TaskDefinition> taskDefinitions = processTaskRelationList
+                    .stream()
+                    .map(processTaskRelation -> {
+                        TaskDefinition taskDefinition = buildTaskDefinition();
+                        
taskDefinition.setProjectCode(processTaskRelation.getProjectCode());
+                        
taskDefinition.setCode(processTaskRelation.getPostTaskCode());
+                        
taskDefinition.setVersion(processTaskRelation.getPostTaskVersion());
+                        return taskDefinition;
+                    })
+                    .collect(Collectors.toSet());
+            taskDefinitionLogList = 
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions);
+        }
+        result.put(Constants.DATA_LIST, taskDefinitionLogList);
+        putMsg(result, Status.SUCCESS);
+        return result;
     }
+
+    /**
+     * build task definition
+     *
+     * @return task definition
+     */
+    private TaskDefinition buildTaskDefinition() {
+
+        return new TaskDefinition() {
+            @Override
+            public boolean equals(Object o) {
+                if (this == o) {
+                    return true;
+                }
+                if (!(o instanceof TaskDefinition)) {
+                    return false;
+                }
+                TaskDefinition that = (TaskDefinition) o;
+                return getCode() == that.getCode()
+                        && getVersion() == that.getVersion()
+                        && getProjectCode() == that.getProjectCode();
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(getCode(), getVersion(), getProjectCode());
+            }
+        };
+    }
+
 }
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationControllerTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationControllerTest.java
new file mode 100644
index 0000000..ffb478f
--- /dev/null
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationControllerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.controller;
+
+import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.http.MediaType;
+import org.springframework.test.web.servlet.MvcResult;
+
+/**
+ * process task relation controller test
+ */
+public class ProcessTaskRelationControllerTest extends AbstractControllerTest {
+
+    @MockBean
+    private ProcessTaskRelationService processTaskRelationService;
+
+    @Test
+    public void testQueryDownstreamRelation() throws Exception {
+        Map<String, Object> mockResult = new HashMap<>();
+        mockResult.put(Constants.STATUS, Status.SUCCESS);
+        
PowerMockito.when(processTaskRelationService.queryDownstreamRelation(Mockito.any(),
 Mockito.anyLong(), Mockito.anyLong()))
+                .thenReturn(mockResult);
+
+        MvcResult mvcResult = 
mockMvc.perform(get("/projects/{projectCode}/process-task-relation/{taskCode}/downstream",
 "1113", "123")
+                .header(SESSION_ID, sessionId))
+                .andExpect(status().isOk())
+                .andExpect(content().contentType(MediaType.APPLICATION_JSON))
+                .andReturn();
+
+        Result result = 
JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), 
Result.class);
+        Assert.assertNotNull(result);
+        Assert.assertEquals(Status.SUCCESS.getCode(), 
result.getCode().intValue());
+    }
+
+    @Test
+    public void testQueryUpstreamRelation() throws Exception {
+        Map<String, Object> mockResult = new HashMap<>();
+        mockResult.put(Constants.STATUS, Status.SUCCESS);
+        
PowerMockito.when(processTaskRelationService.queryUpstreamRelation(Mockito.any(),
 Mockito.anyLong(), Mockito.anyLong()))
+                .thenReturn(mockResult);
+
+        MvcResult mvcResult = 
mockMvc.perform(get("/projects/{projectCode}/process-task-relation/{taskCode}/upstream",
 "1113", "123")
+                .header(SESSION_ID, sessionId))
+                .andExpect(status().isOk())
+                .andExpect(content().contentType(MediaType.APPLICATION_JSON))
+                .andReturn();
+
+        Result result = 
JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), 
Result.class);
+        Assert.assertNotNull(result);
+        Assert.assertEquals(Status.SUCCESS.getCode(), 
result.getCode().intValue());
+    }
+}
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
new file mode 100644
index 0000000..aaccbd0
--- /dev/null
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import 
org.apache.dolphinscheduler.api.service.impl.ProcessTaskRelationServiceImpl;
+import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+/**
+ * process task instance relation service test
+ */
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class ProcessTaskRelationServiceTest {
+
+    @InjectMocks
+    ProcessTaskRelationServiceImpl processTaskRelationService;
+
+    @Mock
+    private ProjectMapper projectMapper;
+
+    @Mock
+    private ProjectServiceImpl projectService;
+
+    @Mock
+    private ProcessTaskRelationMapper processTaskRelationMapper;
+
+    @Mock
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+    /**
+     * get Mock Admin User
+     *
+     * @return admin user
+     */
+    private User getAdminUser() {
+        User loginUser = new User();
+        loginUser.setId(-1);
+        loginUser.setUserName("admin");
+        loginUser.setUserType(UserType.GENERAL_USER);
+        return loginUser;
+    }
+
+    /**
+     * get mock Project
+     *
+     * @param projectCode projectCode
+     * @return Project
+     */
+    private Project getProject(long projectCode) {
+        Project project = new Project();
+        project.setCode(projectCode);
+        project.setId(1);
+        project.setName("project_test1");
+        project.setUserId(1);
+        return project;
+    }
+
+    private void putMsg(Map<String, Object> result, Status status, Object... 
statusParams) {
+        result.put(Constants.STATUS, status);
+        if (statusParams != null && statusParams.length > 0) {
+            result.put(Constants.MSG, MessageFormat.format(status.getMsg(), 
statusParams));
+        } else {
+            result.put(Constants.MSG, status.getMsg());
+        }
+    }
+
+    private TaskDefinitionLog buildTaskDefinitionLog(long projectCode, long 
code, int version) {
+
+        TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog() {
+            @Override
+            public boolean equals(Object o) {
+                if (this == o) {
+                    return true;
+                }
+                if (!(o instanceof TaskDefinitionLog)) {
+                    return false;
+                }
+                TaskDefinitionLog that = (TaskDefinitionLog) o;
+                return getCode() == that.getCode()
+                        && getVersion() == that.getVersion()
+                        && getProjectCode() == that.getProjectCode();
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(getCode(), getVersion(), getProjectCode());
+            }
+        };
+        taskDefinitionLog.setProjectCode(projectCode);
+        taskDefinitionLog.setCode(code);
+        taskDefinitionLog.setVersion(version);
+        return taskDefinitionLog;
+    }
+
+    private TaskDefinition buildTaskDefinition(long projectCode, long code, 
int version) {
+
+        TaskDefinition taskDefinition = new TaskDefinition() {
+            @Override
+            public boolean equals(Object o) {
+                if (this == o) {
+                    return true;
+                }
+                if (!(o instanceof TaskDefinition)) {
+                    return false;
+                }
+                TaskDefinition that = (TaskDefinition) o;
+                return getCode() == that.getCode()
+                        && getVersion() == that.getVersion()
+                        && getProjectCode() == that.getProjectCode();
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(getCode(), getVersion(), getProjectCode());
+            }
+        };
+        taskDefinition.setProjectCode(projectCode);
+        taskDefinition.setCode(code);
+        taskDefinition.setVersion(version);
+        return taskDefinition;
+    }
+
+    private List<ProcessTaskRelation> getProcessTaskUpstreamRelationList(long 
projectCode, long taskCode) {
+        ProcessTaskRelation processTaskRelationUpstream0 = new 
ProcessTaskRelation();
+        processTaskRelationUpstream0.setPostTaskCode(taskCode);
+        processTaskRelationUpstream0.setPreTaskVersion(1);
+        processTaskRelationUpstream0.setProjectCode(projectCode);
+        processTaskRelationUpstream0.setPreTaskCode(123);
+        ProcessTaskRelation processTaskRelationUpstream1 = new 
ProcessTaskRelation();
+        processTaskRelationUpstream1.setPostTaskCode(taskCode);
+        processTaskRelationUpstream1.setPreTaskVersion(1);
+        processTaskRelationUpstream1.setPreTaskCode(123);
+        processTaskRelationUpstream1.setProjectCode(projectCode);
+        ProcessTaskRelation processTaskRelationUpstream2 = new 
ProcessTaskRelation();
+        processTaskRelationUpstream2.setPostTaskCode(taskCode);
+        processTaskRelationUpstream2.setPreTaskVersion(2);
+        processTaskRelationUpstream1.setPreTaskCode(123);
+        processTaskRelationUpstream2.setProjectCode(projectCode);
+        List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+        processTaskRelationList.add(processTaskRelationUpstream0);
+        processTaskRelationList.add(processTaskRelationUpstream1);
+        processTaskRelationList.add(processTaskRelationUpstream2);
+        return processTaskRelationList;
+    }
+
+    private List<ProcessTaskRelation> 
getProcessTaskDownstreamRelationList(long projectCode,long taskCode) {
+        ProcessTaskRelation processTaskRelationDownstream0 = new 
ProcessTaskRelation();
+        processTaskRelationDownstream0.setPreTaskCode(taskCode);
+        processTaskRelationDownstream0.setPostTaskCode(456);
+        processTaskRelationDownstream0.setPostTaskVersion(1);
+        processTaskRelationDownstream0.setProjectCode(projectCode);
+        ProcessTaskRelation processTaskRelationDownstream1 = new 
ProcessTaskRelation();
+        processTaskRelationDownstream1.setPreTaskCode(taskCode);
+        processTaskRelationDownstream1.setPostTaskCode(456);
+        processTaskRelationDownstream1.setPostTaskVersion(1);
+        processTaskRelationDownstream1.setProjectCode(projectCode);
+        ProcessTaskRelation processTaskRelationDownstream2 = new 
ProcessTaskRelation();
+        processTaskRelationDownstream2.setPreTaskCode(taskCode);
+        processTaskRelationDownstream2.setPostTaskCode(4567);
+        processTaskRelationDownstream2.setPostTaskVersion(1);
+        processTaskRelationDownstream2.setProjectCode(projectCode);
+        List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+        processTaskRelationList.add(processTaskRelationDownstream0);
+        processTaskRelationList.add(processTaskRelationDownstream1);
+        processTaskRelationList.add(processTaskRelationDownstream2);
+        return processTaskRelationList;
+    }
+
+    @Test
+    public void testQueryDownstreamRelation() {
+        long projectCode = 1L;
+        long taskCode = 2L;
+
+        Project project = getProject(projectCode);
+        
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+
+        User loginUser = new User();
+        loginUser.setId(-1);
+        loginUser.setUserType(UserType.GENERAL_USER);
+
+        Map<String, Object> result = new HashMap<>();
+        putMsg(result, Status.SUCCESS, projectCode);
+        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode)).thenReturn(result);
+
+        List<ProcessTaskRelation> processTaskRelationList = 
getProcessTaskDownstreamRelationList(projectCode,taskCode);
+
+        
Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode,taskCode))
+                .thenReturn(processTaskRelationList);
+
+        if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
+            Set<TaskDefinition> taskDefinitions = processTaskRelationList
+                    .stream()
+                    .map(processTaskRelation -> {
+                        TaskDefinition taskDefinition = buildTaskDefinition(
+                                processTaskRelation.getProjectCode(),
+                                processTaskRelation.getPostTaskCode(),
+                                processTaskRelation.getPostTaskVersion());
+                        return taskDefinition;
+                    })
+                    .collect(Collectors.toSet());
+
+            Set<TaskDefinitionLog> taskDefinitionLogSet = 
processTaskRelationList
+                    .stream()
+                    .map(processTaskRelation -> {
+                        TaskDefinitionLog taskDefinitionLog = 
buildTaskDefinitionLog(
+                                processTaskRelation.getProjectCode(),
+                                processTaskRelation.getPostTaskCode(),
+                                processTaskRelation.getPostTaskVersion()
+                        );
+                        return taskDefinitionLog;
+                    })
+                    .collect(Collectors.toSet());
+            List<TaskDefinitionLog> taskDefinitionLogList = 
taskDefinitionLogSet.stream().collect(Collectors.toList());
+            
Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions))
+                    .thenReturn(taskDefinitionLogList);
+        }
+        Map<String, Object> relation = processTaskRelationService
+                .queryDownstreamRelation(loginUser, projectCode, taskCode);
+        Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
+        Assert.assertEquals(2, ((List) relation.get("data")).size());
+    }
+
+    @Test
+    public void testQueryUpstreamRelation() {
+        long projectCode = 1L;
+        long taskCode = 2L;
+
+        Project project = getProject(projectCode);
+        
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+
+        User loginUser = new User();
+        loginUser.setId(-1);
+        loginUser.setUserType(UserType.GENERAL_USER);
+
+        Map<String, Object> result = new HashMap<>();
+        putMsg(result, Status.SUCCESS, projectCode);
+        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode)).thenReturn(result);
+        List<ProcessTaskRelation> processTaskRelationList = 
getProcessTaskUpstreamRelationList(projectCode,taskCode);
+
+        
Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, 
taskCode)).thenReturn(processTaskRelationList);
+
+        if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
+            Set<TaskDefinition> taskDefinitions = processTaskRelationList
+                    .stream()
+                    .map(processTaskRelation -> {
+                        TaskDefinition taskDefinition = buildTaskDefinition(
+                                processTaskRelation.getProjectCode(),
+                                processTaskRelation.getPreTaskCode(),
+                                processTaskRelation.getPreTaskVersion());
+                        return taskDefinition;
+                    })
+                    .collect(Collectors.toSet());
+
+            Set<TaskDefinitionLog> taskDefinitionLogSet = 
processTaskRelationList
+                    .stream()
+                    .map(processTaskRelation -> {
+                        TaskDefinitionLog taskDefinitionLog = 
buildTaskDefinitionLog(
+                                processTaskRelation.getProjectCode(),
+                                processTaskRelation.getPreTaskCode(),
+                                processTaskRelation.getPreTaskVersion());
+                        return taskDefinitionLog;
+                    })
+                    .collect(Collectors.toSet());
+            List<TaskDefinitionLog> taskDefinitionLogList = 
taskDefinitionLogSet.stream().collect(Collectors.toList());
+            
Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions))
+                    .thenReturn(taskDefinitionLogList);
+        }
+        Map<String, Object> relation = processTaskRelationService
+                .queryUpstreamRelation(loginUser, projectCode, taskCode);
+        Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
+        Assert.assertEquals(2, ((List) relation.get("data")).size());
+    }
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index c5e9969..3dc3d58 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -82,4 +82,22 @@ public interface ProcessTaskRelationMapper extends 
BaseMapper<ProcessTaskRelatio
      * @return ProcessTaskRelation
      */
     List<ProcessTaskRelation> queryDownstreamByTaskCode(@Param("taskCode") 
long taskCode);
+
+    /**
+     * query upstream process task relation by taskCode
+     *
+     * @param projectCode projectCode
+     * @param taskCode    taskCode
+     * @return ProcessTaskRelation
+     */
+    List<ProcessTaskRelation> queryUpstreamByCode(@Param("projectCode") long 
projectCode, @Param("taskCode") long taskCode);
+
+    /**
+     * query downstream process task relation by taskCode
+     *
+     * @param projectCode projectCode
+     * @param taskCode    taskCode
+     * @return ProcessTaskRelation
+     */
+    List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long 
projectCode, @Param("taskCode") long taskCode);
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index d962d6a..d19dec6 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -75,4 +75,21 @@
         from t_ds_process_task_relation
         WHERE pre_task_code = #{taskCode}
     </select>
+
+    <select id="queryDownstreamByCode" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_task_relation
+        WHERE project_code = #{projectCode}
+        and pre_task_code = #{taskCode}
+    </select>
+
+    <select id="queryUpstreamByCode" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_task_relation
+        WHERE project_code = #{projectCode}
+        and post_task_code = #{taskCode}
+    </select>
+
 </mapper>

Reply via email to