This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 199a84a [fix #6878] implement interface queryUpstreamRelation
queryDownstreamRelation and add UT (#6888)
199a84a is described below
commit 199a84aa059234d76f01fc4ed5daf96abcfeb630
Author: zwZjut <[email protected]>
AuthorDate: Mon Nov 22 11:02:15 2021 +0800
[fix #6878] implement interface queryUpstreamRelation
queryDownstreamRelation and add UT (#6888)
* fix: #6878, implement interface queryUpstreamRelation
queryDownstreamRelation and add UT
* fix: #6878, implement interface queryUpstreamRelation
queryDownstreamRelation and add UT
* fix: #6878, add license
* fix: #6878, code style
* fix: #6878, code style
* fix: #6878, fix Duplication
* fix: #6878
* fix: #6878 , fix style
* to #6878: fix style
* to #6878: fix style
* to #6878: add buildTaskDefinition to overwrite equals and hashCode
Co-authored-by: honghuo.zw <[email protected]>
---
.../impl/ProcessTaskRelationServiceImpl.java | 110 ++++++-
.../ProcessTaskRelationControllerTest.java | 84 ++++++
.../service/ProcessTaskRelationServiceTest.java | 317 +++++++++++++++++++++
.../dao/mapper/ProcessTaskRelationMapper.java | 18 ++
.../dao/mapper/ProcessTaskRelationMapper.xml | 17 ++
5 files changed, 537 insertions(+), 9 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index 915fc21..4d99de7 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -17,14 +17,28 @@
package org.apache.dolphinscheduler.api.service.impl;
+import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +63,10 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
- private UserMapper userMapper;
+ private TaskDefinitionLogMapper taskDefinitionLogMapper;
+ @Autowired
+ private UserMapper userMapper;
/**
* create process task relation
*
@@ -126,26 +142,102 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
/**
* query task upstream relation
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param taskCode current task code (post task code)
- * @return process task relation list
+ * @param taskCode current task code (post task code)
+ * @return the upstream task definitions
*/
@Override
public Map<String, Object> queryUpstreamRelation(User loginUser, long
projectCode, long taskCode) {
- return null;
+ Project project = projectMapper.queryByCode(projectCode);
+ //check user access for project
+ Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+ List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
+ if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
+ Set<TaskDefinition> taskDefinitions = processTaskRelationList
+ .stream()
+ .map(processTaskRelation -> {
+ TaskDefinition taskDefinition = buildTaskDefinition();
+
taskDefinition.setProjectCode(processTaskRelation.getProjectCode());
+
taskDefinition.setCode(processTaskRelation.getPreTaskCode());
+
taskDefinition.setVersion(processTaskRelation.getPreTaskVersion());
+ return taskDefinition;
+ })
+ .collect(Collectors.toSet());
+ taskDefinitionLogList =
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions);
+ }
+ result.put(Constants.DATA_LIST, taskDefinitionLogList);
+ putMsg(result, Status.SUCCESS);
+ return result;
}
/**
* query task downstream relation
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param taskCode pre task code
- * @return process task relation list
+ * @param taskCode pre task code
+ * @return the downstream task definitions
*/
@Override
public Map<String, Object> queryDownstreamRelation(User loginUser, long
projectCode, long taskCode) {
- return null;
+ Project project = projectMapper.queryByCode(projectCode);
+ //check user access for project
+ Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode);
+ List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
+ if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
+ Set<TaskDefinition> taskDefinitions = processTaskRelationList
+ .stream()
+ .map(processTaskRelation -> {
+ TaskDefinition taskDefinition = buildTaskDefinition();
+
taskDefinition.setProjectCode(processTaskRelation.getProjectCode());
+
taskDefinition.setCode(processTaskRelation.getPostTaskCode());
+
taskDefinition.setVersion(processTaskRelation.getPostTaskVersion());
+ return taskDefinition;
+ })
+ .collect(Collectors.toSet());
+ taskDefinitionLogList =
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions);
+ }
+ result.put(Constants.DATA_LIST, taskDefinitionLogList);
+ putMsg(result, Status.SUCCESS);
+ return result;
}
+
+ /**
+ * build task definition
+ *
+ * @return task definition
+ */
+ private TaskDefinition buildTaskDefinition() {
+
+ return new TaskDefinition() {
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TaskDefinition)) {
+ return false;
+ }
+ TaskDefinition that = (TaskDefinition) o;
+ return getCode() == that.getCode()
+ && getVersion() == that.getVersion()
+ && getProjectCode() == that.getProjectCode();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getCode(), getVersion(), getProjectCode());
+ }
+ };
+ }
+
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationControllerTest.java
new file mode 100644
index 0000000..ffb478f
--- /dev/null
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationControllerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.controller;
+
+import static
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static
org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.http.MediaType;
+import org.springframework.test.web.servlet.MvcResult;
+
+/**
+ * process task relation controller test
+ */
+public class ProcessTaskRelationControllerTest extends AbstractControllerTest {
+
+ @MockBean
+ private ProcessTaskRelationService processTaskRelationService;
+
+ @Test
+ public void testQueryDownstreamRelation() throws Exception {
+ Map<String, Object> mockResult = new HashMap<>();
+ mockResult.put(Constants.STATUS, Status.SUCCESS);
+
PowerMockito.when(processTaskRelationService.queryDownstreamRelation(Mockito.any(),
Mockito.anyLong(), Mockito.anyLong()))
+ .thenReturn(mockResult);
+
+ MvcResult mvcResult =
mockMvc.perform(get("/projects/{projectCode}/process-task-relation/{taskCode}/downstream",
"1113", "123")
+ .header(SESSION_ID, sessionId))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.APPLICATION_JSON))
+ .andReturn();
+
+ Result result =
JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(),
Result.class);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(Status.SUCCESS.getCode(),
result.getCode().intValue());
+ }
+
+ @Test
+ public void testQueryUpstreamRelation() throws Exception {
+ Map<String, Object> mockResult = new HashMap<>();
+ mockResult.put(Constants.STATUS, Status.SUCCESS);
+
PowerMockito.when(processTaskRelationService.queryUpstreamRelation(Mockito.any(),
Mockito.anyLong(), Mockito.anyLong()))
+ .thenReturn(mockResult);
+
+ MvcResult mvcResult =
mockMvc.perform(get("/projects/{projectCode}/process-task-relation/{taskCode}/upstream",
"1113", "123")
+ .header(SESSION_ID, sessionId))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.APPLICATION_JSON))
+ .andReturn();
+
+ Result result =
JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(),
Result.class);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(Status.SUCCESS.getCode(),
result.getCode().intValue());
+ }
+}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
new file mode 100644
index 0000000..aaccbd0
--- /dev/null
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import
org.apache.dolphinscheduler.api.service.impl.ProcessTaskRelationServiceImpl;
+import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+/**
+ * process task instance relation service test
+ */
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class ProcessTaskRelationServiceTest {
+
+ @InjectMocks
+ ProcessTaskRelationServiceImpl processTaskRelationService;
+
+ @Mock
+ private ProjectMapper projectMapper;
+
+ @Mock
+ private ProjectServiceImpl projectService;
+
+ @Mock
+ private ProcessTaskRelationMapper processTaskRelationMapper;
+
+ @Mock
+ private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+ /**
+ * get Mock Admin User
+ *
+ * @return admin user
+ */
+ private User getAdminUser() {
+ User loginUser = new User();
+ loginUser.setId(-1);
+ loginUser.setUserName("admin");
+ loginUser.setUserType(UserType.GENERAL_USER);
+ return loginUser;
+ }
+
+ /**
+ * get mock Project
+ *
+ * @param projectCode projectCode
+ * @return Project
+ */
+ private Project getProject(long projectCode) {
+ Project project = new Project();
+ project.setCode(projectCode);
+ project.setId(1);
+ project.setName("project_test1");
+ project.setUserId(1);
+ return project;
+ }
+
+ private void putMsg(Map<String, Object> result, Status status, Object...
statusParams) {
+ result.put(Constants.STATUS, status);
+ if (statusParams != null && statusParams.length > 0) {
+ result.put(Constants.MSG, MessageFormat.format(status.getMsg(),
statusParams));
+ } else {
+ result.put(Constants.MSG, status.getMsg());
+ }
+ }
+
+ private TaskDefinitionLog buildTaskDefinitionLog(long projectCode, long
code, int version) {
+
+ TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog() {
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TaskDefinitionLog)) {
+ return false;
+ }
+ TaskDefinitionLog that = (TaskDefinitionLog) o;
+ return getCode() == that.getCode()
+ && getVersion() == that.getVersion()
+ && getProjectCode() == that.getProjectCode();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getCode(), getVersion(), getProjectCode());
+ }
+ };
+ taskDefinitionLog.setProjectCode(projectCode);
+ taskDefinitionLog.setCode(code);
+ taskDefinitionLog.setVersion(version);
+ return taskDefinitionLog;
+ }
+
+ private TaskDefinition buildTaskDefinition(long projectCode, long code,
int version) {
+
+ TaskDefinition taskDefinition = new TaskDefinition() {
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TaskDefinition)) {
+ return false;
+ }
+ TaskDefinition that = (TaskDefinition) o;
+ return getCode() == that.getCode()
+ && getVersion() == that.getVersion()
+ && getProjectCode() == that.getProjectCode();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getCode(), getVersion(), getProjectCode());
+ }
+ };
+ taskDefinition.setProjectCode(projectCode);
+ taskDefinition.setCode(code);
+ taskDefinition.setVersion(version);
+ return taskDefinition;
+ }
+
+ private List<ProcessTaskRelation> getProcessTaskUpstreamRelationList(long
projectCode, long taskCode) {
+ ProcessTaskRelation processTaskRelationUpstream0 = new
ProcessTaskRelation();
+ processTaskRelationUpstream0.setPostTaskCode(taskCode);
+ processTaskRelationUpstream0.setPreTaskVersion(1);
+ processTaskRelationUpstream0.setProjectCode(projectCode);
+ processTaskRelationUpstream0.setPreTaskCode(123);
+ ProcessTaskRelation processTaskRelationUpstream1 = new
ProcessTaskRelation();
+ processTaskRelationUpstream1.setPostTaskCode(taskCode);
+ processTaskRelationUpstream1.setPreTaskVersion(1);
+ processTaskRelationUpstream1.setPreTaskCode(123);
+ processTaskRelationUpstream1.setProjectCode(projectCode);
+ ProcessTaskRelation processTaskRelationUpstream2 = new
ProcessTaskRelation();
+ processTaskRelationUpstream2.setPostTaskCode(taskCode);
+ processTaskRelationUpstream2.setPreTaskVersion(2);
+ processTaskRelationUpstream1.setPreTaskCode(123);
+ processTaskRelationUpstream2.setProjectCode(projectCode);
+ List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+ processTaskRelationList.add(processTaskRelationUpstream0);
+ processTaskRelationList.add(processTaskRelationUpstream1);
+ processTaskRelationList.add(processTaskRelationUpstream2);
+ return processTaskRelationList;
+ }
+
+ private List<ProcessTaskRelation>
getProcessTaskDownstreamRelationList(long projectCode,long taskCode) {
+ ProcessTaskRelation processTaskRelationDownstream0 = new
ProcessTaskRelation();
+ processTaskRelationDownstream0.setPreTaskCode(taskCode);
+ processTaskRelationDownstream0.setPostTaskCode(456);
+ processTaskRelationDownstream0.setPostTaskVersion(1);
+ processTaskRelationDownstream0.setProjectCode(projectCode);
+ ProcessTaskRelation processTaskRelationDownstream1 = new
ProcessTaskRelation();
+ processTaskRelationDownstream1.setPreTaskCode(taskCode);
+ processTaskRelationDownstream1.setPostTaskCode(456);
+ processTaskRelationDownstream1.setPostTaskVersion(1);
+ processTaskRelationDownstream1.setProjectCode(projectCode);
+ ProcessTaskRelation processTaskRelationDownstream2 = new
ProcessTaskRelation();
+ processTaskRelationDownstream2.setPreTaskCode(taskCode);
+ processTaskRelationDownstream2.setPostTaskCode(4567);
+ processTaskRelationDownstream2.setPostTaskVersion(1);
+ processTaskRelationDownstream2.setProjectCode(projectCode);
+ List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+ processTaskRelationList.add(processTaskRelationDownstream0);
+ processTaskRelationList.add(processTaskRelationDownstream1);
+ processTaskRelationList.add(processTaskRelationDownstream2);
+ return processTaskRelationList;
+ }
+
+ @Test
+ public void testQueryDownstreamRelation() {
+ long projectCode = 1L;
+ long taskCode = 2L;
+
+ Project project = getProject(projectCode);
+
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+
+ User loginUser = new User();
+ loginUser.setId(-1);
+ loginUser.setUserType(UserType.GENERAL_USER);
+
+ Map<String, Object> result = new HashMap<>();
+ putMsg(result, Status.SUCCESS, projectCode);
+ Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
+
+ List<ProcessTaskRelation> processTaskRelationList =
getProcessTaskDownstreamRelationList(projectCode,taskCode);
+
+
Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode,taskCode))
+ .thenReturn(processTaskRelationList);
+
+ if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
+ Set<TaskDefinition> taskDefinitions = processTaskRelationList
+ .stream()
+ .map(processTaskRelation -> {
+ TaskDefinition taskDefinition = buildTaskDefinition(
+ processTaskRelation.getProjectCode(),
+ processTaskRelation.getPostTaskCode(),
+ processTaskRelation.getPostTaskVersion());
+ return taskDefinition;
+ })
+ .collect(Collectors.toSet());
+
+ Set<TaskDefinitionLog> taskDefinitionLogSet =
processTaskRelationList
+ .stream()
+ .map(processTaskRelation -> {
+ TaskDefinitionLog taskDefinitionLog =
buildTaskDefinitionLog(
+ processTaskRelation.getProjectCode(),
+ processTaskRelation.getPostTaskCode(),
+ processTaskRelation.getPostTaskVersion()
+ );
+ return taskDefinitionLog;
+ })
+ .collect(Collectors.toSet());
+ List<TaskDefinitionLog> taskDefinitionLogList =
taskDefinitionLogSet.stream().collect(Collectors.toList());
+
Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions))
+ .thenReturn(taskDefinitionLogList);
+ }
+ Map<String, Object> relation = processTaskRelationService
+ .queryDownstreamRelation(loginUser, projectCode, taskCode);
+ Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
+ Assert.assertEquals(2, ((List) relation.get("data")).size());
+ }
+
+ @Test
+ public void testQueryUpstreamRelation() {
+ long projectCode = 1L;
+ long taskCode = 2L;
+
+ Project project = getProject(projectCode);
+
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+
+ User loginUser = new User();
+ loginUser.setId(-1);
+ loginUser.setUserType(UserType.GENERAL_USER);
+
+ Map<String, Object> result = new HashMap<>();
+ putMsg(result, Status.SUCCESS, projectCode);
+ Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
+ List<ProcessTaskRelation> processTaskRelationList =
getProcessTaskUpstreamRelationList(projectCode,taskCode);
+
+
Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode,
taskCode)).thenReturn(processTaskRelationList);
+
+ if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
+ Set<TaskDefinition> taskDefinitions = processTaskRelationList
+ .stream()
+ .map(processTaskRelation -> {
+ TaskDefinition taskDefinition = buildTaskDefinition(
+ processTaskRelation.getProjectCode(),
+ processTaskRelation.getPreTaskCode(),
+ processTaskRelation.getPreTaskVersion());
+ return taskDefinition;
+ })
+ .collect(Collectors.toSet());
+
+ Set<TaskDefinitionLog> taskDefinitionLogSet =
processTaskRelationList
+ .stream()
+ .map(processTaskRelation -> {
+ TaskDefinitionLog taskDefinitionLog =
buildTaskDefinitionLog(
+ processTaskRelation.getProjectCode(),
+ processTaskRelation.getPreTaskCode(),
+ processTaskRelation.getPreTaskVersion());
+ return taskDefinitionLog;
+ })
+ .collect(Collectors.toSet());
+ List<TaskDefinitionLog> taskDefinitionLogList =
taskDefinitionLogSet.stream().collect(Collectors.toList());
+
Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions))
+ .thenReturn(taskDefinitionLogList);
+ }
+ Map<String, Object> relation = processTaskRelationService
+ .queryUpstreamRelation(loginUser, projectCode, taskCode);
+ Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
+ Assert.assertEquals(2, ((List) relation.get("data")).size());
+ }
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index c5e9969..3dc3d58 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -82,4 +82,22 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryDownstreamByTaskCode(@Param("taskCode")
long taskCode);
+
+ /**
+ * query upstream process task relation by taskCode
+ *
+ * @param projectCode projectCode
+ * @param taskCode taskCode
+ * @return ProcessTaskRelation
+ */
+ List<ProcessTaskRelation> queryUpstreamByCode(@Param("projectCode") long
projectCode, @Param("taskCode") long taskCode);
+
+ /**
+ * query downstream process task relation by taskCode
+ *
+ * @param projectCode projectCode
+ * @param taskCode taskCode
+ * @return ProcessTaskRelation
+ */
+ List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long
projectCode, @Param("taskCode") long taskCode);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index d962d6a..d19dec6 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -75,4 +75,21 @@
from t_ds_process_task_relation
WHERE pre_task_code = #{taskCode}
</select>
+
+ <select id="queryDownstreamByCode"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_task_relation
+ WHERE project_code = #{projectCode}
+ and pre_task_code = #{taskCode}
+ </select>
+
+ <select id="queryUpstreamByCode"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_task_relation
+ WHERE project_code = #{projectCode}
+ and post_task_code = #{taskCode}
+ </select>
+
</mapper>