This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new 22ec6c0 [Feature][jsonsplit]Refactor process lineage (#5034)
22ec6c0 is described below
commit 22ec6c069d7a44f140b4575dd01e8f2396eccf51
Author: Simon <[email protected]>
AuthorDate: Fri Mar 12 18:22:34 2021 +0800
[Feature][jsonsplit]Refactor process lineage (#5034)
* Modify Project and ProjectUser Mapper
* Modify Project and ProjectUser Mapper
* project_code is bigint(20)
* modify ERROR name
* modify saveProcessDefine, remove the duplicate code with
createTaskAndRelation
* modify import/export processdefinition, add genProcessData
* fix ut and bug
* code style
* repalce project_id with code
* conflicts solve
* conflicts solve
* conflicts solve
* bugfix
* modify listResources mothod and remove getResourceIds mothod
* 1
* conflicts solve
* modify listResources mothod and remove getResourceIds mothod
* modify listResources mothod and remove getResourceIds mothod
* replace processDefinitionVersion with processDefinitionLog
* codestyle
* codestyle
* add mapper module ut
* codestyle
* fix ProcessInstanceMapperTest
* codestyle
* conflicts solve
* conflicts solve
* conflicts solve
* conflicts solve
* conflicts solve
* fix ProcessInstanceMapperTest
* fix ProjectMapperTest/ProjectUserMapperTest/ScheduleMapperTest
* fix ProjectMapperTest/ProjectUserMapperTest/ScheduleMapperTest
* fix TaskInstanceMapperTest
* add TaskDefinitionLogMapperTest/TaskDefinitionMapperTest and bugfix
* codestyle
* codestyle
* Refactor process lineage
* Refactor process lineage
* codestyle
* codestyle
* Refactor process lineage
* Refactor process lineage
---
.../service/impl/WorkFlowLineageServiceImpl.java | 96 ++++++++++-------
.../api/service/WorkFlowLineageServiceTest.java | 2 -
.../dao/entity/ProcessLineage.java | 120 +++++++++++++++++++++
.../dao/entity/ProcessTaskRelation.java | 15 +--
.../dao/entity/WorkFlowRelation.java | 14 +++
.../dao/mapper/WorkFlowLineageMapper.java | 37 +++++--
.../dao/mapper/WorkFlowLineageMapper.xml | 103 +++++++-----------
.../dao/mapper/WorkFlowLineageMapperTest.java | 15 ---
8 files changed, 265 insertions(+), 137 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
index 35ed5fc..ae63955 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
@@ -20,9 +20,11 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
@@ -46,6 +48,9 @@ public class WorkFlowLineageServiceImpl extends
BaseServiceImpl implements WorkF
private WorkFlowLineageMapper workFlowLineageMapper;
@Autowired
+ private ProcessDefinitionMapper processDefinitionMapper;
+
+ @Autowired
private ProjectMapper projectMapper;
@Override
@@ -58,56 +63,71 @@ public class WorkFlowLineageServiceImpl extends
BaseServiceImpl implements WorkF
return result;
}
- private void getWorkFlowRelationRecursion(Set<Integer> ids,
List<WorkFlowRelation> workFlowRelations, Set<Integer> sourceIds) {
- for (int id : ids) {
- sourceIds.addAll(ids);
- List<WorkFlowRelation> workFlowRelationsTmp =
workFlowLineageMapper.querySourceTarget(id);
- if (workFlowRelationsTmp != null &&
!workFlowRelationsTmp.isEmpty()) {
- Set<Integer> idsTmp = new HashSet<>();
- for (WorkFlowRelation workFlowRelation : workFlowRelationsTmp)
{
- if
(!sourceIds.contains(workFlowRelation.getTargetWorkFlowId())) {
- idsTmp.add(workFlowRelation.getTargetWorkFlowId());
+ private void getRelation(Map<Integer, WorkFlowLineage> workFlowLineageMap,
+ Set<WorkFlowRelation> workFlowRelations,
+ ProcessLineage processLineage) {
+ List<ProcessLineage> relations =
workFlowLineageMapper.queryCodeRelation(
+ processLineage.getPostTaskCode(),
processLineage.getPostTaskVersion()
+ , processLineage.getProcessDefinitionCode(),
processLineage.getProjectCode());
+
+ for (ProcessLineage relation : relations) {
+ if (relation.getProcessDefinitionCode() != null) {
+
+ relation.setPreTaskCode(processLineage.getPostTaskCode());
+
relation.setPreTaskVersion(processLineage.getPostTaskVersion());
+
+ WorkFlowLineage pre = workFlowLineageMapper
+
.queryWorkFlowLineageByCode(processLineage.getProcessDefinitionCode(),
processLineage.getProjectCode());
+ // sourceWorkFlowId = ""
+ if (!workFlowLineageMap.containsKey(pre.getWorkFlowId())) {
+ workFlowLineageMap.put(pre.getWorkFlowId(), pre);
+ }
+
+ WorkFlowLineage post = workFlowLineageMapper
+
.queryWorkFlowLineageByCode(relation.getProcessDefinitionCode(),
relation.getProjectCode());
+
+ if (workFlowLineageMap.containsKey(post.getWorkFlowId())) {
+ WorkFlowLineage workFlowLineage =
workFlowLineageMap.get(post.getWorkFlowId());
+ String sourceWorkFlowId =
workFlowLineage.getSourceWorkFlowId();
+ if (sourceWorkFlowId.equals("")) {
+
workFlowLineage.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId()));
+ } else {
+ workFlowLineage.setSourceWorkFlowId(sourceWorkFlowId +
"," + pre.getWorkFlowId());
}
+
+ } else {
+
post.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId()));
+ workFlowLineageMap.put(post.getWorkFlowId(), post);
+ }
+
+ WorkFlowRelation workFlowRelation = new WorkFlowRelation();
+ workFlowRelation.setSourceWorkFlowId(pre.getWorkFlowId());
+ workFlowRelation.setTargetWorkFlowId(post.getWorkFlowId());
+ if (workFlowRelations.contains(workFlowRelation)) {
+ continue;
}
- workFlowRelations.addAll(workFlowRelationsTmp);
- getWorkFlowRelationRecursion(idsTmp, workFlowRelations,
sourceIds);
+ workFlowRelations.add(workFlowRelation);
+ getRelation(workFlowLineageMap, workFlowRelations, relation);
}
}
+
}
@Override
public Map<String, Object> queryWorkFlowLineageByIds(Set<Integer> ids, int
projectId) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.selectById(projectId);
- List<WorkFlowLineage> workFlowLineageList =
workFlowLineageMapper.queryByIds(ids, project.getCode());
- Map<String, Object> workFlowLists = new HashMap<>();
- Set<Integer> idsV = new HashSet<>();
- if (ids == null || ids.isEmpty()) {
- for (WorkFlowLineage workFlowLineage : workFlowLineageList) {
- idsV.add(workFlowLineage.getWorkFlowId());
- }
- } else {
- idsV = ids;
- }
- List<WorkFlowRelation> workFlowRelations = new ArrayList<>();
- Set<Integer> sourceIds = new HashSet<>();
- getWorkFlowRelationRecursion(idsV, workFlowRelations, sourceIds);
-
- Set<Integer> idSet = new HashSet<>();
- //If the incoming parameter is not empty, you need to add downstream
workflow detail attributes
- if (ids != null && !ids.isEmpty()) {
- for (WorkFlowRelation workFlowRelation : workFlowRelations) {
- idSet.add(workFlowRelation.getTargetWorkFlowId());
- }
- for (int id : ids) {
- idSet.remove(id);
- }
- if (!idSet.isEmpty()) {
-
workFlowLineageList.addAll(workFlowLineageMapper.queryByIds(idSet,
project.getCode()));
- }
+ List<ProcessLineage> processLineages =
workFlowLineageMapper.queryRelationByIds(ids, project.getCode());
+
+ Map<Integer, WorkFlowLineage> workFlowLineages = new HashMap<>();
+ Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
+
+ for (ProcessLineage processLineage : processLineages) {
+ getRelation(workFlowLineages, workFlowRelations, processLineage);
}
- workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineageList);
+ Map<String, Object> workFlowLists = new HashMap<>();
+ workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages.values());
workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations);
result.put(Constants.DATA_LIST, workFlowLists);
putMsg(result, Status.SUCCESS);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
index d946263..db75a43 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
@@ -66,8 +66,6 @@ public class WorkFlowLineageServiceTest {
ids.add(1);
ids.add(2);
- when(workFlowLineageMapper.queryByIds(ids,
1L)).thenReturn(getWorkFlowLineages());
-
when(workFlowLineageMapper.querySourceTarget(1)).thenReturn(getWorkFlowRelation());
Map<String, Object> result =
workFlowLineageService.queryWorkFlowLineageByIds(ids,1);
Map<String, Object> workFlowLists = (Map<String,
Object>)result.get(Constants.DATA_LIST);
List<WorkFlowLineage> workFlowLineages =
(List<WorkFlowLineage>)workFlowLists.get("workFlowList");
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java
new file mode 100644
index 0000000..678db42
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.entity;
+
+/**
+ * Process lineage
+ */
+public class ProcessLineage {
+
+ /**
+ * project code
+ */
+ private Long projectCode;
+
+ /**
+ * post task code
+ */
+ private Long postTaskCode;
+
+ /**
+ * post task version
+ */
+ private int postTaskVersion;
+
+ /**
+ * pre task code
+ */
+ private Long preTaskCode;
+
+ /**
+ * pre task version
+ */
+ private int preTaskVersion;
+
+ /**
+ * process definition code
+ */
+ private Long processDefinitionCode;
+
+ /**
+ * process definition version
+ */
+ private int processDefinitionVersion;
+
+ public Long getProjectCode() {
+ return projectCode;
+ }
+
+ public void setProjectCode(Long projectCode) {
+ this.projectCode = projectCode;
+ }
+
+ public Long getProcessDefinitionCode() {
+ return processDefinitionCode;
+ }
+
+ public void setProcessDefinitionCode(Long processDefinitionCode) {
+ this.processDefinitionCode = processDefinitionCode;
+ }
+
+ public int getProcessDefinitionVersion() {
+ return processDefinitionVersion;
+ }
+
+ public void setProcessDefinitionVersion(int processDefinitionVersion) {
+ this.processDefinitionVersion = processDefinitionVersion;
+ }
+
+ public void setPostTaskCode(Long postTaskCode) {
+ this.postTaskCode = postTaskCode;
+ }
+
+ public Long getPreTaskCode() {
+ return preTaskCode;
+ }
+
+ public void setPreTaskCode(Long preTaskCode) {
+ this.preTaskCode = preTaskCode;
+ }
+
+ public int getPreTaskVersion() {
+ return preTaskVersion;
+ }
+
+ public void setPreTaskVersion(int preTaskVersion) {
+ this.preTaskVersion = preTaskVersion;
+ }
+
+ public int getPostTaskVersion() {
+ return postTaskVersion;
+ }
+
+ public void setPostTaskVersion(int postTaskVersion) {
+ this.postTaskVersion = postTaskVersion;
+ }
+
+ public long getPostTaskCode() {
+ return postTaskCode;
+ }
+
+ public void setPostTaskCode(long postTaskCode) {
+ this.postTaskCode = postTaskCode;
+ }
+
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
index aad2b56..2afeb68 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
@@ -17,21 +17,14 @@
package org.apache.dolphinscheduler.dao.entity;
+import org.apache.dolphinscheduler.common.enums.ConditionType;
+
+import java.util.Date;
+
import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
-import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.fasterxml.jackson.annotation.JsonFormat;
-import org.apache.dolphinscheduler.common.enums.ConditionType;
-import org.apache.dolphinscheduler.common.process.Property;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
/**
* process task relation
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
index c03c68e..3a74d80 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
@@ -16,6 +16,8 @@
*/
package org.apache.dolphinscheduler.dao.entity;
+import java.util.Objects;
+
public class WorkFlowRelation {
private int sourceWorkFlowId;
private int targetWorkFlowId;
@@ -35,4 +37,16 @@ public class WorkFlowRelation {
public void setTargetWorkFlowId(int targetWorkFlowId) {
this.targetWorkFlowId = targetWorkFlowId;
}
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof WorkFlowRelation
+ && this.sourceWorkFlowId == ((WorkFlowRelation)
obj).getSourceWorkFlowId()
+ && this.targetWorkFlowId == ((WorkFlowRelation)
obj).getTargetWorkFlowId();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sourceWorkFlowId, targetWorkFlowId);
+ }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
index 1409654..14b0ee1 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
@@ -16,9 +16,11 @@
*/
package org.apache.dolphinscheduler.dao.mapper;
+import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
-import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
+
import org.apache.ibatis.annotations.Param;
+
import java.util.List;
import java.util.Set;
@@ -26,24 +28,41 @@ public interface WorkFlowLineageMapper {
/**
* queryByName
+ *
* @param searchVal searchVal
* @param projectCode projectCode
- * @return WorkFlowLineage list
+ * @return WorkFlowLineage list
*/
List<WorkFlowLineage> queryByName(@Param("searchVal") String searchVal,
@Param("projectCode") Long projectCode);
/**
- * queryByIds
+ * queryCodeRelation
+ *
+ * @param taskCode taskCode
+ * @param taskVersion taskVersion
+ * @param processDefinitionCode processDefinitionCode
+ * @return ProcessLineage
+ */
+ List<ProcessLineage> queryCodeRelation(
+ @Param("taskCode") Long taskCode, @Param("taskVersion") int
taskVersion,
+ @Param("processDefinitionCode") Long processDefinitionCode,
@Param("projectCode") Long projectCode);
+
+ /**
+ * queryRelationByIds
+ *
* @param ids ids
* @param projectCode projectCode
- * @return WorkFlowLineage list
+ * @return ProcessLineage
*/
- List<WorkFlowLineage> queryByIds(@Param("ids") Set<Integer> ids,
@Param("projectCode") Long projectCode);
+ List<ProcessLineage> queryRelationByIds(@Param("ids") Set<Integer> ids,
@Param("projectCode") Long projectCode);
/**
- * query SourceTarget
- * @param id id
- * @return WorkFlowRelation list
+ * queryWorkFlowLineageByCode
+ *
+ * @param processDefinitionCode processDefinitioncode
+ * @param projectCode projectCode
+ * @return WorkFlowLineage
*/
- List<WorkFlowRelation> querySourceTarget(@Param("id") int id);
+ WorkFlowLineage queryWorkFlowLineageByCode(@Param("processDefinitionCode")
Long processDefinitionCode, @Param("projectCode") Long projectCode);
+
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index 897e1ca..a1dbeef 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -18,86 +18,65 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper
namespace="org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper">
+
<select id="queryByName"
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
select tepd.id as work_flow_id,tepd.name as work_flow_name
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.id = tes.process_definition_id
where tepd.project_code = #{projectCode}
<if test="searchVal != null and searchVal != ''">
- and tepd.name like concat('%', #{searchVal}, '%')
+ and tepd.name like concat('%', #{searchVal}, '%')
</if>
</select>
- <select id="queryByIds"
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage"
databaseId="mysql">
- select tepd.id as work_flow_id,tepd.name as work_flow_name,
- (case when json_extract(tepd.process_definition_json,
'$**.dependItemList') is not null then 1 else 0 end) as is_depend_work_flow,
- json_extract(tepd.process_definition_json,
'$**.definitionId') as source_work_flow_id,
- tepd.release_state as work_flow_publish_status,
- tes.start_time as schedule_start_time,
- tes.end_time as schedule_end_time,
- tes.crontab as crontab,
- tes.release_state as schedule_publish_status
- from t_ds_process_definition tepd
- left join t_ds_schedules tes on tepd.id = tes.process_definition_id
- where tepd.project_code = #{projectCode}
+
+ <select id="queryRelationByIds"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
+ select ptr.project_code,
+ ptr.post_task_code,
+ ptr.post_task_version,
+ ptr.pre_task_code,
+ ptr.pre_task_version,
+ ptr.process_definition_code,
+ ptr.process_definition_version
+ from t_ds_process_definition pd
+ join t_ds_process_task_relation ptr on pd.code =
ptr.process_definition_code and pd.version =
+ ptr.process_definition_version
+ where pd.project_code = #{projectCode}
<if test="ids != null and ids.size()>0">
- and tepd.id in
+ and pd.id in
<foreach collection="ids" index="index" item="i" open="("
separator="," close=")">
#{i}
</foreach>
</if>
</select>
- <select id="queryByIds"
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage"
databaseId="pg">
- select a.work_flow_id,
- a.work_flow_name,
- a.is_depend_work_flow,
- array_agg(a.source_id) as source_id,
- a.work_flow_publish_status,
- a.schedule_start_time,
- a.schedule_end_time,
- a.crontab,
- a.schedule_publish_status
- from (
- select tepd.id as work_flow_id,tepd.name as work_flow_name,
- case when
tepd.process_definition_json::json#>'{tasks,1,dependence}' is not null then 1
else 0 end as is_depend_work_flow,
-
(json_array_elements(tepd.process_definition_json::json#>'{tasks}')#>>'{dependence,dependTaskList,0,dependItemList,0,definitionId}')
as source_id,
- tepd.release_state as work_flow_publish_status,
- tes.start_time as schedule_start_time,
- tes.end_time as schedule_end_time,
- tes.crontab as crontab,
- tes.release_state as schedule_publish_status
- from t_ds_process_definition tepd
- left join t_ds_schedules tes on tepd.id =
tes.process_definition_id
- where tepd.project_code = #{projectCode}
- <if test="ids != null and ids.size()>0">
- and tepd.id in
- <foreach collection="ids" index="index" item="i" open="("
separator="," close=")">
- #{i}
- </foreach>
- </if>
- ) a
- where (a.is_depend_work_flow = 1 and source_id is not null) or
(a.is_depend_work_flow = 0)
- group by
a.work_flow_id,a.work_flow_name,a.is_depend_work_flow,a.work_flow_publish_status,a.schedule_start_time,
- a.schedule_end_time,a.crontab,a.schedule_publish_status
- </select>
+ <select id="queryCodeRelation"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
+ select project_code
+ post_task_code,
+ post_task_version,
+ pre_task_code,
+ pre_task_version,
+ process_definition_code,
+ process_definition_version
+ from t_ds_process_task_relation ptr
+ where ptr.pre_task_code=#{taskCode}
+ and ptr.pre_task_version=#{taskVersion}
+ and ptr.process_definition_code!=#{processDefinitionCode}
+ and ptr.project_code =#{projectCode}
- <select id="querySourceTarget"
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelation"
databaseId="mysql">
- select id as target_work_flow_id,#{id} as source_work_flow_id
- from t_ds_process_definition t
- where json_extract(t.process_definition_json, '$**.dependItemList') is
not null
- and find_in_set(#{id},
replace(replace(replace(json_extract(t.process_definition_json,
'$**.definitionId'), '[', ''),']', ''), ' ', '')) > 0
</select>
- <select id="querySourceTarget"
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelation"
databaseId="pg">
- select a.work_flow_id as target_work_flow_id,
- a.source_id as source_work_flow_id
- from (
- select tepd.id as work_flow_id,
-
(json_array_elements(tepd.process_definition_json::json#>'{tasks}')#>>'{dependence,dependTaskList,0,dependItemList,0,definitionId}')
as source_id
- from t_ds_process_definition tepd
- left join t_ds_schedules tes on tepd.id =
tes.process_definition_id
- where tepd.project_id = 1) a
- where source_id = #{id}::text;
+ <select id="queryWorkFlowLineageByCode"
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage"
+ databaseId="mysql">
+ select tepd.id as work_flow_id,tepd.name as work_flow_name,
+ "" as source_work_flow_id,
+ tepd.release_state as work_flow_publish_status,
+ tes.start_time as schedule_start_time,
+ tes.end_time as schedule_end_time,
+ tes.crontab as crontab,
+ tes.release_state as schedule_publish_status
+ from t_ds_process_definition tepd
+ left join t_ds_schedules tes on tepd.id = tes.process_definition_id
+ where tepd.project_code = #{projectCode} and tepd.code =
#{processDefinitionCode}
</select>
</mapper>
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
index d9c2c7b..909d713 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
@@ -44,19 +44,4 @@ public class WorkFlowLineageMapperTest {
List<WorkFlowLineage> workFlowLineages =
workFlowLineageMapper.queryByName("test",1L);
Assert.assertNotEquals(workFlowLineages.size(), 0);
}
-
-
- @Test
- public void testQueryByIds() {
- Set<Integer> ids = new HashSet<>();
- ids.add(1);
- List<WorkFlowLineage> workFlowLineages =
workFlowLineageMapper.queryByIds(ids,1L);
- Assert.assertNotEquals(workFlowLineages.size(), 0);
- }
-
- @Test
- public void testQuerySourceTarget() {
- List<WorkFlowRelation> workFlowRelations =
workFlowLineageMapper.querySourceTarget(1);
- Assert.assertNotEquals(workFlowRelations.size(), 0);
- }
}