This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split by this push:
     new 22ec6c0  [Feature][jsonsplit]Refactor process lineage (#5034)
22ec6c0 is described below

commit 22ec6c069d7a44f140b4575dd01e8f2396eccf51
Author: Simon <[email protected]>
AuthorDate: Fri Mar 12 18:22:34 2021 +0800

    [Feature][jsonsplit]Refactor process lineage (#5034)
    
    * Modify Project and ProjectUser Mapper
    
    * Modify Project and ProjectUser Mapper
    
    * project_code is bigint(20)
    
    * modify ERROR name
    
    * modify saveProcessDefine, remove the duplicate code with 
createTaskAndRelation
    
    * modify import/export processdefinition, add genProcessData
    
    * fix ut and bug
    
    * code style
    
    * repalce project_id with code
    
    * conflicts solve
    
    * conflicts solve
    
    * conflicts solve
    
    * bugfix
    
    * modify listResources mothod and remove getResourceIds mothod
    
    * 1
    
    * conflicts solve
    
    * modify listResources mothod and remove getResourceIds mothod
    
    * modify listResources mothod and remove getResourceIds mothod
    
    * replace processDefinitionVersion with processDefinitionLog
    
    * codestyle
    
    * codestyle
    
    * add mapper module ut
    
    * codestyle
    
    * fix ProcessInstanceMapperTest
    
    * codestyle
    
    * conflicts solve
    
    * conflicts solve
    
    * conflicts solve
    
    * conflicts solve
    
    * conflicts solve
    
    * fix ProcessInstanceMapperTest
    
    * fix ProjectMapperTest/ProjectUserMapperTest/ScheduleMapperTest
    
    * fix ProjectMapperTest/ProjectUserMapperTest/ScheduleMapperTest
    
    * fix TaskInstanceMapperTest
    
    * add TaskDefinitionLogMapperTest/TaskDefinitionMapperTest and bugfix
    
    * codestyle
    
    * codestyle
    
    * Refactor process lineage
    
    * Refactor process lineage
    
    * codestyle
    
    * codestyle
    
    * Refactor process lineage
    
    * Refactor process lineage
---
 .../service/impl/WorkFlowLineageServiceImpl.java   |  96 ++++++++++-------
 .../api/service/WorkFlowLineageServiceTest.java    |   2 -
 .../dao/entity/ProcessLineage.java                 | 120 +++++++++++++++++++++
 .../dao/entity/ProcessTaskRelation.java            |  15 +--
 .../dao/entity/WorkFlowRelation.java               |  14 +++
 .../dao/mapper/WorkFlowLineageMapper.java          |  37 +++++--
 .../dao/mapper/WorkFlowLineageMapper.xml           | 103 +++++++-----------
 .../dao/mapper/WorkFlowLineageMapperTest.java      |  15 ---
 8 files changed, 265 insertions(+), 137 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
index 35ed5fc..ae63955 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
@@ -20,9 +20,11 @@ package org.apache.dolphinscheduler.api.service.impl;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
 
@@ -46,6 +48,9 @@ public class WorkFlowLineageServiceImpl extends 
BaseServiceImpl implements WorkF
     private WorkFlowLineageMapper workFlowLineageMapper;
 
     @Autowired
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Autowired
     private ProjectMapper projectMapper;
 
     @Override
@@ -58,56 +63,71 @@ public class WorkFlowLineageServiceImpl extends 
BaseServiceImpl implements WorkF
         return result;
     }
 
-    private void getWorkFlowRelationRecursion(Set<Integer> ids, 
List<WorkFlowRelation> workFlowRelations, Set<Integer> sourceIds) {
-        for (int id : ids) {
-            sourceIds.addAll(ids);
-            List<WorkFlowRelation> workFlowRelationsTmp = 
workFlowLineageMapper.querySourceTarget(id);
-            if (workFlowRelationsTmp != null && 
!workFlowRelationsTmp.isEmpty()) {
-                Set<Integer> idsTmp = new HashSet<>();
-                for (WorkFlowRelation workFlowRelation : workFlowRelationsTmp) 
{
-                    if 
(!sourceIds.contains(workFlowRelation.getTargetWorkFlowId())) {
-                        idsTmp.add(workFlowRelation.getTargetWorkFlowId());
+    private void getRelation(Map<Integer, WorkFlowLineage> workFlowLineageMap,
+                             Set<WorkFlowRelation> workFlowRelations,
+                             ProcessLineage processLineage) {
+        List<ProcessLineage> relations = 
workFlowLineageMapper.queryCodeRelation(
+                processLineage.getPostTaskCode(), 
processLineage.getPostTaskVersion()
+                , processLineage.getProcessDefinitionCode(), 
processLineage.getProjectCode());
+
+        for (ProcessLineage relation : relations) {
+            if (relation.getProcessDefinitionCode() != null) {
+
+                relation.setPreTaskCode(processLineage.getPostTaskCode());
+                
relation.setPreTaskVersion(processLineage.getPostTaskVersion());
+
+                WorkFlowLineage pre = workFlowLineageMapper
+                        
.queryWorkFlowLineageByCode(processLineage.getProcessDefinitionCode(), 
processLineage.getProjectCode());
+                // sourceWorkFlowId = ""
+                if (!workFlowLineageMap.containsKey(pre.getWorkFlowId())) {
+                    workFlowLineageMap.put(pre.getWorkFlowId(), pre);
+                }
+
+                WorkFlowLineage post = workFlowLineageMapper
+                        
.queryWorkFlowLineageByCode(relation.getProcessDefinitionCode(), 
relation.getProjectCode());
+
+                if (workFlowLineageMap.containsKey(post.getWorkFlowId())) {
+                    WorkFlowLineage workFlowLineage = 
workFlowLineageMap.get(post.getWorkFlowId());
+                    String sourceWorkFlowId = 
workFlowLineage.getSourceWorkFlowId();
+                    if (sourceWorkFlowId.equals("")) {
+                        
workFlowLineage.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId()));
+                    } else {
+                        workFlowLineage.setSourceWorkFlowId(sourceWorkFlowId + 
"," + pre.getWorkFlowId());
                     }
+
+                } else {
+                    
post.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId()));
+                    workFlowLineageMap.put(post.getWorkFlowId(), post);
+                }
+
+                WorkFlowRelation workFlowRelation = new WorkFlowRelation();
+                workFlowRelation.setSourceWorkFlowId(pre.getWorkFlowId());
+                workFlowRelation.setTargetWorkFlowId(post.getWorkFlowId());
+                if (workFlowRelations.contains(workFlowRelation)) {
+                    continue;
                 }
-                workFlowRelations.addAll(workFlowRelationsTmp);
-                getWorkFlowRelationRecursion(idsTmp, workFlowRelations, 
sourceIds);
+                workFlowRelations.add(workFlowRelation);
+                getRelation(workFlowLineageMap, workFlowRelations, relation);
             }
         }
+
     }
 
     @Override
     public Map<String, Object> queryWorkFlowLineageByIds(Set<Integer> ids, int 
projectId) {
         Map<String, Object> result = new HashMap<>();
         Project project = projectMapper.selectById(projectId);
-        List<WorkFlowLineage> workFlowLineageList = 
workFlowLineageMapper.queryByIds(ids, project.getCode());
-        Map<String, Object> workFlowLists = new HashMap<>();
-        Set<Integer> idsV = new HashSet<>();
-        if (ids == null || ids.isEmpty()) {
-            for (WorkFlowLineage workFlowLineage : workFlowLineageList) {
-                idsV.add(workFlowLineage.getWorkFlowId());
-            }
-        } else {
-            idsV = ids;
-        }
-        List<WorkFlowRelation> workFlowRelations = new ArrayList<>();
-        Set<Integer> sourceIds = new HashSet<>();
-        getWorkFlowRelationRecursion(idsV, workFlowRelations, sourceIds);
-
-        Set<Integer> idSet = new HashSet<>();
-        //If the incoming parameter is not empty, you need to add downstream 
workflow detail attributes
-        if (ids != null && !ids.isEmpty()) {
-            for (WorkFlowRelation workFlowRelation : workFlowRelations) {
-                idSet.add(workFlowRelation.getTargetWorkFlowId());
-            }
-            for (int id : ids) {
-                idSet.remove(id);
-            }
-            if (!idSet.isEmpty()) {
-                
workFlowLineageList.addAll(workFlowLineageMapper.queryByIds(idSet, 
project.getCode()));
-            }
+        List<ProcessLineage> processLineages = 
workFlowLineageMapper.queryRelationByIds(ids, project.getCode());
+
+        Map<Integer, WorkFlowLineage> workFlowLineages = new HashMap<>();
+        Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
+
+        for (ProcessLineage processLineage : processLineages) {
+            getRelation(workFlowLineages, workFlowRelations, processLineage);
         }
 
-        workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineageList);
+        Map<String, Object> workFlowLists = new HashMap<>();
+        workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages.values());
         workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations);
         result.put(Constants.DATA_LIST, workFlowLists);
         putMsg(result, Status.SUCCESS);
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
index d946263..db75a43 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
@@ -66,8 +66,6 @@ public class WorkFlowLineageServiceTest {
         ids.add(1);
         ids.add(2);
 
-        when(workFlowLineageMapper.queryByIds(ids, 
1L)).thenReturn(getWorkFlowLineages());
-        
when(workFlowLineageMapper.querySourceTarget(1)).thenReturn(getWorkFlowRelation());
         Map<String, Object> result = 
workFlowLineageService.queryWorkFlowLineageByIds(ids,1);
         Map<String, Object> workFlowLists = (Map<String, 
Object>)result.get(Constants.DATA_LIST);
         List<WorkFlowLineage> workFlowLineages = 
(List<WorkFlowLineage>)workFlowLists.get("workFlowList");
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java
new file mode 100644
index 0000000..678db42
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.entity;
+
+/**
+ * Process lineage
+ */
+public class ProcessLineage {
+
+    /**
+     * project code
+     */
+    private Long projectCode;
+
+    /**
+     * post task code
+     */
+    private Long postTaskCode;
+
+    /**
+     * post task version
+     */
+    private int postTaskVersion;
+
+    /**
+     * pre task code
+     */
+    private Long preTaskCode;
+
+    /**
+     * pre task version
+     */
+    private int preTaskVersion;
+
+    /**
+     * process definition code
+     */
+    private Long processDefinitionCode;
+
+    /**
+     * process definition version
+     */
+    private int processDefinitionVersion;
+
+    public Long getProjectCode() {
+        return projectCode;
+    }
+
+    public void setProjectCode(Long projectCode) {
+        this.projectCode = projectCode;
+    }
+
+    public Long getProcessDefinitionCode() {
+        return processDefinitionCode;
+    }
+
+    public void setProcessDefinitionCode(Long processDefinitionCode) {
+        this.processDefinitionCode = processDefinitionCode;
+    }
+
+    public int getProcessDefinitionVersion() {
+        return processDefinitionVersion;
+    }
+
+    public void setProcessDefinitionVersion(int processDefinitionVersion) {
+        this.processDefinitionVersion = processDefinitionVersion;
+    }
+
+    public void setPostTaskCode(Long postTaskCode) {
+        this.postTaskCode = postTaskCode;
+    }
+
+    public Long getPreTaskCode() {
+        return preTaskCode;
+    }
+
+    public void setPreTaskCode(Long preTaskCode) {
+        this.preTaskCode = preTaskCode;
+    }
+
+    public int getPreTaskVersion() {
+        return preTaskVersion;
+    }
+
+    public void setPreTaskVersion(int preTaskVersion) {
+        this.preTaskVersion = preTaskVersion;
+    }
+
+    public int getPostTaskVersion() {
+        return postTaskVersion;
+    }
+
+    public void setPostTaskVersion(int postTaskVersion) {
+        this.postTaskVersion = postTaskVersion;
+    }
+
+    public long getPostTaskCode() {
+        return postTaskCode;
+    }
+
+    public void setPostTaskCode(long postTaskCode) {
+        this.postTaskCode = postTaskCode;
+    }
+
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
index aad2b56..2afeb68 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
@@ -17,21 +17,14 @@
 
 package org.apache.dolphinscheduler.dao.entity;
 
+import org.apache.dolphinscheduler.common.enums.ConditionType;
+
+import java.util.Date;
+
 import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
-import com.baomidou.mybatisplus.core.toolkit.StringUtils;
 import com.fasterxml.jackson.annotation.JsonFormat;
-import org.apache.dolphinscheduler.common.enums.ConditionType;
-import org.apache.dolphinscheduler.common.process.Property;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 
 /**
  * process task relation
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
index c03c68e..3a74d80 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
@@ -16,6 +16,8 @@
  */
 package org.apache.dolphinscheduler.dao.entity;
 
+import java.util.Objects;
+
 public class WorkFlowRelation {
     private int sourceWorkFlowId;
     private int targetWorkFlowId;
@@ -35,4 +37,16 @@ public class WorkFlowRelation {
     public void setTargetWorkFlowId(int targetWorkFlowId) {
         this.targetWorkFlowId = targetWorkFlowId;
     }
+
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof WorkFlowRelation
+                && this.sourceWorkFlowId == ((WorkFlowRelation) 
obj).getSourceWorkFlowId()
+                && this.targetWorkFlowId == ((WorkFlowRelation) 
obj).getTargetWorkFlowId();
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(sourceWorkFlowId, targetWorkFlowId);
+    }
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
index 1409654..14b0ee1 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
@@ -16,9 +16,11 @@
  */
 package org.apache.dolphinscheduler.dao.mapper;
 
+import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
-import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
+
 import org.apache.ibatis.annotations.Param;
+
 import java.util.List;
 import java.util.Set;
 
@@ -26,24 +28,41 @@ public interface WorkFlowLineageMapper {
 
     /**
      * queryByName
+     *
      * @param searchVal searchVal
      * @param projectCode projectCode
-     * @return  WorkFlowLineage list
+     * @return WorkFlowLineage list
      */
     List<WorkFlowLineage> queryByName(@Param("searchVal") String searchVal, 
@Param("projectCode") Long projectCode);
 
     /**
-     * queryByIds
+     * queryCodeRelation
+     *
+     * @param taskCode taskCode
+     * @param taskVersion taskVersion
+     * @param processDefinitionCode processDefinitionCode
+     * @return ProcessLineage
+     */
+    List<ProcessLineage> queryCodeRelation(
+            @Param("taskCode") Long taskCode, @Param("taskVersion") int 
taskVersion,
+            @Param("processDefinitionCode") Long processDefinitionCode, 
@Param("projectCode") Long projectCode);
+
+    /**
+     * queryRelationByIds
+     *
      * @param ids ids
      * @param projectCode projectCode
-     * @return WorkFlowLineage list
+     * @return ProcessLineage
      */
-    List<WorkFlowLineage> queryByIds(@Param("ids") Set<Integer> ids, 
@Param("projectCode") Long projectCode);
+    List<ProcessLineage> queryRelationByIds(@Param("ids") Set<Integer> ids, 
@Param("projectCode") Long projectCode);
 
     /**
-     *  query SourceTarget
-     * @param id id
-     * @return WorkFlowRelation list
+     * queryWorkFlowLineageByCode
+     *
+     * @param processDefinitionCode processDefinitioncode
+     * @param projectCode projectCode
+     * @return WorkFlowLineage
      */
-    List<WorkFlowRelation> querySourceTarget(@Param("id") int id);
+    WorkFlowLineage queryWorkFlowLineageByCode(@Param("processDefinitionCode") 
Long processDefinitionCode, @Param("projectCode") Long projectCode);
+
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index 897e1ca..a1dbeef 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -18,86 +18,65 @@
 
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd"; >
 <mapper 
namespace="org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper">
+
     <select id="queryByName" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
         select tepd.id as work_flow_id,tepd.name as work_flow_name
         from t_ds_process_definition tepd
         left join t_ds_schedules tes on tepd.id = tes.process_definition_id
         where tepd.project_code = #{projectCode}
         <if test="searchVal != null and searchVal != ''">
-            and  tepd.name like concat('%', #{searchVal}, '%')
+            and tepd.name like concat('%', #{searchVal}, '%')
         </if>
     </select>
-    <select id="queryByIds" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage" 
databaseId="mysql">
-        select tepd.id as work_flow_id,tepd.name as work_flow_name,
-               (case when json_extract(tepd.process_definition_json, 
'$**.dependItemList') is not null then 1 else 0 end) as is_depend_work_flow,
-                          json_extract(tepd.process_definition_json, 
'$**.definitionId') as source_work_flow_id,
-                          tepd.release_state as work_flow_publish_status,
-                          tes.start_time as schedule_start_time,
-                          tes.end_time as schedule_end_time,
-                          tes.crontab as crontab,
-                          tes.release_state as schedule_publish_status
-        from t_ds_process_definition tepd
-        left join t_ds_schedules tes on tepd.id = tes.process_definition_id
-        where tepd.project_code = #{projectCode}
+
+    <select id="queryRelationByIds" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
+        select ptr.project_code,
+                ptr.post_task_code,
+                ptr.post_task_version,
+                ptr.pre_task_code,
+                ptr.pre_task_version,
+                ptr.process_definition_code,
+                ptr.process_definition_version
+        from t_ds_process_definition pd
+        join t_ds_process_task_relation ptr on pd.code = 
ptr.process_definition_code and pd.version =
+        ptr.process_definition_version
+        where pd.project_code = #{projectCode}
         <if test="ids != null and ids.size()>0">
-            and  tepd.id in
+            and pd.id in
             <foreach collection="ids" index="index" item="i" open="(" 
separator="," close=")">
                 #{i}
             </foreach>
         </if>
     </select>
 
-    <select id="queryByIds" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage" 
databaseId="pg">
-        select a.work_flow_id,
-               a.work_flow_name,
-               a.is_depend_work_flow,
-               array_agg(a.source_id) as source_id,
-               a.work_flow_publish_status,
-               a.schedule_start_time,
-               a.schedule_end_time,
-               a.crontab,
-               a.schedule_publish_status
-         from (
-               select tepd.id as work_flow_id,tepd.name as work_flow_name,
-                      case when 
tepd.process_definition_json::json#>'{tasks,1,dependence}' is not null then 1 
else 0 end as is_depend_work_flow,
-                      
(json_array_elements(tepd.process_definition_json::json#>'{tasks}')#>>'{dependence,dependTaskList,0,dependItemList,0,definitionId}')
 as source_id,
-                      tepd.release_state as work_flow_publish_status,
-                      tes.start_time as schedule_start_time,
-                      tes.end_time as schedule_end_time,
-                      tes.crontab as crontab,
-                      tes.release_state as schedule_publish_status
-                 from t_ds_process_definition tepd
-                 left join t_ds_schedules tes on tepd.id = 
tes.process_definition_id
-                 where tepd.project_code = #{projectCode}
-                 <if test="ids != null and ids.size()>0">
-                     and  tepd.id in
-                     <foreach collection="ids" index="index" item="i" open="(" 
separator="," close=")">
-                     #{i}
-                     </foreach>
-                </if>
-             ) a
-        where (a.is_depend_work_flow = 1 and source_id is not null) or 
(a.is_depend_work_flow = 0)
-        group by 
a.work_flow_id,a.work_flow_name,a.is_depend_work_flow,a.work_flow_publish_status,a.schedule_start_time,
-                 a.schedule_end_time,a.crontab,a.schedule_publish_status
-    </select>
+    <select id="queryCodeRelation" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
+        select project_code
+               post_task_code,
+               post_task_version,
+               pre_task_code,
+               pre_task_version,
+               process_definition_code,
+               process_definition_version
+         from t_ds_process_task_relation ptr
+         where ptr.pre_task_code=#{taskCode}
+                and ptr.pre_task_version=#{taskVersion}
+                and ptr.process_definition_code!=#{processDefinitionCode}
+                and ptr.project_code =#{projectCode}
 
-    <select id="querySourceTarget" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelation" 
databaseId="mysql">
-        select id as target_work_flow_id,#{id} as source_work_flow_id
-        from t_ds_process_definition t
-        where json_extract(t.process_definition_json, '$**.dependItemList') is 
not null
-          and find_in_set(#{id}, 
replace(replace(replace(json_extract(t.process_definition_json, 
'$**.definitionId'), '[', ''),']', ''), ' ', '')) > 0
     </select>
 
-    <select id="querySourceTarget" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelation" 
databaseId="pg">
-        select a.work_flow_id as target_work_flow_id,
-               a.source_id as source_work_flow_id
-         from (
-               select tepd.id as work_flow_id,
-                      
(json_array_elements(tepd.process_definition_json::json#>'{tasks}')#>>'{dependence,dependTaskList,0,dependItemList,0,definitionId}')
 as source_id
-                 from t_ds_process_definition tepd
-                 left join t_ds_schedules tes on tepd.id = 
tes.process_definition_id
-                where tepd.project_id = 1) a
-        where source_id = #{id}::text;
+    <select id="queryWorkFlowLineageByCode" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage"
+            databaseId="mysql">
+        select tepd.id as work_flow_id,tepd.name as work_flow_name,
+               "" as source_work_flow_id,
+                tepd.release_state as work_flow_publish_status,
+                tes.start_time as schedule_start_time,
+                tes.end_time as schedule_end_time,
+                tes.crontab as crontab,
+                tes.release_state as schedule_publish_status
+        from t_ds_process_definition tepd
+        left join t_ds_schedules tes on tepd.id = tes.process_definition_id
+        where tepd.project_code = #{projectCode} and tepd.code = 
#{processDefinitionCode}
     </select>
 
 </mapper>
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
index d9c2c7b..909d713 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
@@ -44,19 +44,4 @@ public class WorkFlowLineageMapperTest {
         List<WorkFlowLineage> workFlowLineages = 
workFlowLineageMapper.queryByName("test",1L);
         Assert.assertNotEquals(workFlowLineages.size(), 0);
     }
-
-
-    @Test
-    public void testQueryByIds() {
-        Set<Integer> ids = new HashSet<>();
-        ids.add(1);
-        List<WorkFlowLineage> workFlowLineages = 
workFlowLineageMapper.queryByIds(ids,1L);
-        Assert.assertNotEquals(workFlowLineages.size(), 0);
-    }
-
-    @Test
-    public void testQuerySourceTarget() {
-        List<WorkFlowRelation> workFlowRelations = 
workFlowLineageMapper.querySourceTarget(1);
-        Assert.assertNotEquals(workFlowRelations.size(), 0);
-    }
 }

Reply via email to