This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 1edd2bd to #6957 implement interface deleteUpstreamRelation
deleteDownstreamRelation (#6966)
1edd2bd is described below
commit 1edd2bd0f6702bcea5e3d375982f2444bdac9b65
Author: zwZjut <[email protected]>
AuthorDate: Wed Nov 24 17:12:47 2021 +0800
to #6957 implement interface deleteUpstreamRelation
deleteDownstreamRelation (#6966)
* to #6957
* to #6957
* to #6957
* to #6957: group by process definition code
* to #6957: fix sonar
* to #6957: fix sonar
* to #6957: add ut , add preTaskVersion
* to #6957: fix style
* to #6957: use org.apache.dolphinscheduler.spi.utils.StringUtils
Co-authored-by: honghuo.zw <[email protected]>
---
.../impl/ProcessTaskRelationServiceImpl.java | 119 +++++++++++++++++++--
.../service/ProcessTaskRelationServiceTest.java | 79 ++++++++++++++
.../dao/mapper/ProcessTaskRelationMapper.java | 40 +++++++
.../dao/mapper/ProcessTaskRelationMapper.xml | 56 ++++++++++
4 files changed, 286 insertions(+), 8 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index 3d6e4a2..9a476c8 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -35,6 +35,7 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils;
@@ -50,6 +51,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import com.google.common.collect.Lists;
+
/**
* process task relation service impl
*/
@@ -202,29 +205,72 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
/**
* delete task upstream relation
*
- * @param loginUser login user
- * @param projectCode project code
+ * @param loginUser login user
+ * @param projectCode project code
* @param preTaskCodes the pre task codes, sep ','
- * @param taskCode the post task code
+ * @param taskCode the post task code
* @return delete result code
*/
@Override
public Map<String, Object> deleteUpstreamRelation(User loginUser, long
projectCode, String preTaskCodes, long taskCode) {
- return null;
+ Project project = projectMapper.queryByCode(projectCode);
+ //check user access for project
+ Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+ if (StringUtils.isEmpty(preTaskCodes)) {
+ putMsg(result,Status.DATA_IS_NULL,"preTaskCodes");
+ return result;
+ }
+ Set<Long> preTaskCodesSet =
Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
+ Status status = deleteUpstreamRelation(projectCode,
preTaskCodesSet.toArray(new Long[0]), taskCode);
+ if (status != Status.SUCCESS) {
+ putMsg(result, status);
+ }
+ return result;
}
/**
* delete task downstream relation
*
- * @param loginUser login user
- * @param projectCode project code
+ * @param loginUser login user
+ * @param projectCode project code
* @param postTaskCodes the post task codes, sep ','
- * @param taskCode the pre task code
+ * @param taskCode the pre task code
* @return delete result code
*/
@Override
public Map<String, Object> deleteDownstreamRelation(User loginUser, long
projectCode, String postTaskCodes, long taskCode) {
- return null;
+ Project project = projectMapper.queryByCode(projectCode);
+ //check user access for project
+ Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+ if (StringUtils.isEmpty(postTaskCodes)) {
+ putMsg(result,Status.DATA_IS_NULL,"postTaskCodes");
+ return result;
+ }
+ Set<Long> postTaskCodesSet =
Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
+ List<Long> deleteFailedCodeList = new ArrayList<>();
+ postTaskCodesSet.stream().forEach(
+ postTaskCode -> {
+ try {
+ Status status = deleteUpstreamRelation(projectCode,
new Long[]{taskCode}, postTaskCode);
+ if (Status.SUCCESS != status) {
+ deleteFailedCodeList.add(postTaskCode);
+ }
+ } catch (Exception e) {
+ deleteFailedCodeList.add(postTaskCode);
+ }
+
+ }
+ );
+ if (!deleteFailedCodeList.isEmpty()) {
+ putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR,
String.join(",", deleteFailedCodeList.stream().map(o -> o +
"").collect(Collectors.toList())));
+ }
+ return result;
}
/**
@@ -328,4 +374,61 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
};
}
+ /**
+ * delete upstream relation
+ *
+ * @param projectCode project code
+ * @param preTaskCodes pre task codes
+ * @param taskCode pre task code
+ * @return status
+ */
+ private Status deleteUpstreamRelation(long projectCode, Long[]
preTaskCodes, long taskCode) {
+ List<ProcessTaskRelation> upstreamList =
processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode,
preTaskCodes);
+ if (CollectionUtils.isEmpty(upstreamList)) {
+ return Status.SUCCESS;
+ }
+ Map<Long, List<ProcessTaskRelation>>
processTaskRelationListGroupByProcessDefinitionCode = upstreamList.stream()
+
.collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode));
+ // count upstream relation group by process definition code
+ List<Map<Long, Integer>> countListGroupByProcessDefinitionCode =
processTaskRelationMapper
+ .countUpstreamByCodeGroupByProcessDefinitionCode(projectCode,
processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new
Long[0]), taskCode);
+
+ List<ProcessTaskRelation> deletes = new ArrayList<>();
+ List<ProcessTaskRelation> updates = new ArrayList<>();
+
+ countListGroupByProcessDefinitionCode.stream().forEach(
+ processDefinitionCodeUpstreamCountMap ->
+
processDefinitionCodeUpstreamCountMap.entrySet().stream().forEach(
+ o -> {
+ Long processDefinitionCode = o.getKey();
+ Integer count = o.getValue();
+ List<ProcessTaskRelation>
processTaskRelationList =
processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
+ if (count <=
processTaskRelationList.size()) {
+ ProcessTaskRelation
processTaskRelation = processTaskRelationList.remove(0);
+ if
(processTaskRelation.getPreTaskCode() != 0) {
+
processTaskRelation.setPreTaskCode(0);
+
processTaskRelation.setPreTaskVersion(0);
+ updates.add(processTaskRelation);
+ }
+ }
+ if (!processTaskRelationList.isEmpty()) {
+
deletes.addAll(processTaskRelationList);
+ }
+ }
+ )
+ );
+
+ int update = 0;
+ if (!updates.isEmpty()) {
+ update =
processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(updates);
+ }
+ int delete = 0;
+ if (!deletes.isEmpty()) {
+ delete =
processTaskRelationMapper.deleteBatchIds(deletes.stream().map(ProcessTaskRelation::getId).collect(Collectors.toList()));
+ }
+ if (update < 0 || delete < 0) {
+ return Status.DELETE_TASK_PROCESS_RELATION_ERROR;
+ }
+ return Status.SUCCESS;
+ }
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
index 758eceb..55efbfe 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
@@ -186,15 +186,18 @@ public class ProcessTaskRelationServiceTest {
processTaskRelationUpstream0.setPreTaskVersion(1);
processTaskRelationUpstream0.setProjectCode(projectCode);
processTaskRelationUpstream0.setPreTaskCode(123);
+ processTaskRelationUpstream0.setProcessDefinitionCode(123);
ProcessTaskRelation processTaskRelationUpstream1 = new
ProcessTaskRelation();
processTaskRelationUpstream1.setPostTaskCode(taskCode);
processTaskRelationUpstream1.setPreTaskVersion(1);
processTaskRelationUpstream1.setPreTaskCode(123);
+ processTaskRelationUpstream0.setProcessDefinitionCode(124);
processTaskRelationUpstream1.setProjectCode(projectCode);
ProcessTaskRelation processTaskRelationUpstream2 = new
ProcessTaskRelation();
processTaskRelationUpstream2.setPostTaskCode(taskCode);
processTaskRelationUpstream2.setPreTaskVersion(2);
processTaskRelationUpstream1.setPreTaskCode(123);
+ processTaskRelationUpstream0.setProcessDefinitionCode(125);
processTaskRelationUpstream2.setProjectCode(projectCode);
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
processTaskRelationList.add(processTaskRelationUpstream0);
@@ -382,4 +385,80 @@ public class ProcessTaskRelationServiceTest {
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
Assert.assertEquals(2, ((List) relation.get("data")).size());
}
+
+ @Test
+ public void testDeleteDownstreamRelation() {
+ long projectCode = 1L;
+ long taskCode = 2L;
+ Project project = getProject(projectCode);
+
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+
+ User loginUser = new User();
+ loginUser.setId(-1);
+ loginUser.setUserType(UserType.GENERAL_USER);
+ Map<String, Object> result = new HashMap<>();
+ putMsg(result, Status.SUCCESS, projectCode);
+ Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
+ List<ProcessTaskRelation> processTaskRelationList =
getProcessTaskUpstreamRelationList(projectCode, taskCode);
+
Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode,
taskCode, new Long[]{123L})).thenReturn(processTaskRelationList);
+ List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = new
ArrayList<>();
+ countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>()
{
+ {
+ put(123L, 2);
+ }
+ });
+ countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>()
{
+ {
+ put(124L, 1);
+ }
+ });
+ countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>()
{
+ {
+ put(125L, 3);
+ }
+ });
+
Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode,
new Long[]{123L, 124L, 125L},
2)).thenReturn(countListGroupByProcessDefinitionCode);
+
Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new
ArrayList())).thenReturn(3);
+ Mockito.when(processTaskRelationMapper.deleteBatchIds(new
ArrayList())).thenReturn(3);
+ Map<String, Object> result1 =
processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode,
"123", taskCode);
+ Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
+ }
+
+ @Test
+ public void testDeleteUpstreamRelation() {
+ long projectCode = 1L;
+ long taskCode = 2L;
+ Project project = getProject(projectCode);
+
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+
+ User loginUser = new User();
+ loginUser.setId(-1);
+ loginUser.setUserType(UserType.GENERAL_USER);
+ Map<String, Object> result = new HashMap<>();
+ putMsg(result, Status.SUCCESS, projectCode);
+ Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
+ List<ProcessTaskRelation> processTaskRelationList =
getProcessTaskUpstreamRelationList(projectCode, taskCode);
+
Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode,
taskCode, new Long[]{123L})).thenReturn(processTaskRelationList);
+ List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = new
ArrayList<>();
+ countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>()
{
+ {
+ put(123L, 2);
+ }
+ });
+ countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>()
{
+ {
+ put(124L, 1);
+ }
+ });
+ countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>()
{
+ {
+ put(125L, 3);
+ }
+ });
+
Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode,
new Long[]{123L, 124L, 125L},
2)).thenReturn(countListGroupByProcessDefinitionCode);
+
Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new
ArrayList())).thenReturn(3);
+ Mockito.when(processTaskRelationMapper.deleteBatchIds(new
ArrayList())).thenReturn(3);
+ Map<String, Object> result1 =
processTaskRelationService.deleteUpstreamRelation(loginUser, projectCode,
"123", taskCode);
+ Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
+ }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 6d4c79b..a03f8f0 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -23,6 +23,7 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.ibatis.annotations.Param;
import java.util.List;
+import java.util.Map;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@@ -102,6 +103,45 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long
projectCode, @Param("taskCode") long taskCode);
/**
+ * query task relation by codes
+ *
+ * @param projectCode projectCode
+ * @param taskCode taskCode
+ * @param postTaskCodes postTaskCodes list
+ * @return ProcessTaskRelation
+ */
+ List<ProcessTaskRelation> queryDownstreamByCodes(@Param("projectCode")
long projectCode, @Param("taskCode") long taskCode,@Param("postTaskCodes")
Long[] postTaskCodes);
+
+ /**
+ * query task relation by codes
+ *
+ * @param projectCode projectCode
+ * @param taskCode taskCode
+ * @param preTaskCodes preTaskCode list
+ * @return ProcessTaskRelation
+ */
+ List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long
projectCode, @Param("taskCode") long taskCode,@Param("preTaskCodes") Long[]
preTaskCodes);
+
+ /**
+ * count upstream by codes
+ *
+ * @param projectCode projectCode
+ * @param taskCode taskCode
+ * @param processDefinitionCodes processDefinitionCodes
+ * @return upstream count list group by process definition code
+ */
+ List<Map<Long, Integer>>
countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long
projectCode,
+
@Param("processDefinitionCodes") Long[] processDefinitionCodes,
@Param("taskCode") long taskCode);
+
+ /**
+ * batch update process task relation pre task
+ *
+ * @param processTaskRelationList process task relation list
+ * @return update num
+ */
+ int
batchUpdateProcessTaskRelationPreTask(@Param("processTaskRelationList")
List<ProcessTaskRelation> processTaskRelationList);
+
+ /**
* query by code
*
* @param projectCode projectCode
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index 11602eb..95a6550 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -92,6 +92,62 @@
and post_task_code = #{taskCode}
</select>
+ <select id="queryDownstreamByCodes"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_task_relation
+ WHERE project_code = #{projectCode}
+ and pre_task_code = #{taskCode}
+ <if test="postTaskCodes != null and postTaskCodes.length != 0">
+ and post_task_code in
+ <foreach collection="postTaskCodes" index="index" item="i"
open="(" separator="," close=")">
+ #{i}
+ </foreach>
+ </if>
+ </select>
+
+ <select id="queryUpstreamByCodes"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_task_relation
+ WHERE project_code = #{projectCode}
+ and post_task_code = #{taskCode}
+ <if test="preTaskCodes != null and preTaskCodes.length != 0">
+ and pre_task_code in
+ <foreach collection="preTaskCodes" index="index" item="i" open="("
separator="," close=")">
+ #{i}
+ </foreach>
+ </if>
+ </select>
+
+ <select id="countUpstreamByCodeGroupByProcessDefinitionCode"
resultType="java.lang.Integer">
+ select process_definition_code,
+ count(0)
+ from t_ds_process_task_relation
+ WHERE project_code = #{projectCode}
+ and post_task_code = #{taskCode}
+ <if test="processDefinitionCodes != null and
processDefinitionCodes.length != 0">
+ and process_definition_code in
+ <foreach collection="processDefinitionCodes" index="index"
item="i" open="(" separator="," close=")">
+ #{i}
+ </foreach>
+ </if>
+ group by process_definition_code
+ </select>
+
+ <update id="batchUpdateProcessTaskRelationPreTask"
parameterType="java.util.List">
+ <foreach collection="processTaskRelationList"
item="processTaskRelation" index="index" open="" close="" separator=";">
+ update t_ds_process_task_relation
+ <set>
+ pre_task_code=#{processTaskRelation.preTaskCode},
+ pre_task_version=#{processTaskRelation.preTaskVersion}
+ </set>
+ <where>
+ WHERE id = #{processTaskRelation.id}
+ </where>
+ </foreach>
+ </update>
+
<select id="queryByCode"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>