This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 07d6542  [Fix-6139][API] fix workFlowLineage relation (#6217)
07d6542 is described below

commit 07d6542febe1b4ad3f4ca3e69af654b2f6538933
Author: JinyLeeChina <[email protected]>
AuthorDate: Wed Sep 15 17:42:21 2021 +0800

    [Fix-6139][API] fix workFlowLineage relation (#6217)
    
    * fix bug of view-tree api
    
    * fix bug of view-tree api
    
    * fix ut
    
    * fix ut
    
    * fix master buildFlowDag error
    
    * fix workFlowLineage relation
    
    * fix ut
    
    * fix ut
    
    * fix ut
    
    Co-authored-by: JinyLeeChina <[email protected]>
---
 .../service/impl/WorkFlowLineageServiceImpl.java   | 143 ++++++++++++++-------
 .../api/service/WorkFlowLineageServiceTest.java    |  17 ++-
 .../common/model/DependentItem.java                |  20 ++-
 .../dao/entity/WorkFlowRelation.java               |  19 +++
 .../dao/mapper/WorkFlowLineageMapper.java          |  18 ++-
 .../dao/mapper/WorkFlowLineageMapper.xml           |  24 +++-
 .../dao/mapper/WorkFlowLineageMapperTest.java      |   4 +-
 .../service/process/ProcessService.java            |   1 -
 8 files changed, 178 insertions(+), 68 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
index c37b141..d665a1a 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
@@ -20,20 +20,32 @@ package org.apache.dolphinscheduler.api.service.impl;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.model.DependentItem;
+import org.apache.dolphinscheduler.common.model.DependentTaskModel;
+import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
 import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.curator.shaded.com.google.common.collect.Sets;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -50,6 +62,9 @@ public class WorkFlowLineageServiceImpl extends 
BaseServiceImpl implements WorkF
     @Autowired
     private ProjectMapper projectMapper;
 
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
     @Override
     public Map<String, Object> queryWorkFlowLineageByName(long projectCode, 
String workFlowName) {
         Map<String, Object> result = new HashMap<>();
@@ -72,12 +87,47 @@ public class WorkFlowLineageServiceImpl extends 
BaseServiceImpl implements WorkF
             putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
             return result;
         }
-        WorkFlowLineage workFlowLineage = 
workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, workFlowCode);
-        result.put(Constants.DATA_LIST, workFlowLineage);
+        Map<Long, WorkFlowLineage> workFlowLineagesMap = new HashMap<>();
+        Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
+        Set<Long> sourceWorkFlowCodes = Sets.newHashSet(workFlowCode);
+        recursiveWorkFlow(projectCode, workFlowLineagesMap, workFlowRelations, 
sourceWorkFlowCodes);
+        Map<String, Object> workFlowLists = new HashMap<>();
+        workFlowLists.put(Constants.WORKFLOW_LIST, 
workFlowLineagesMap.values());
+        workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations);
+        result.put(Constants.DATA_LIST, workFlowLists);
         putMsg(result, Status.SUCCESS);
         return result;
     }
 
+    private void recursiveWorkFlow(long projectCode,
+                                   Map<Long, WorkFlowLineage> 
workFlowLineagesMap,
+                                   Set<WorkFlowRelation> workFlowRelations,
+                                   Set<Long> sourceWorkFlowCodes) {
+        for (Long workFlowCode : sourceWorkFlowCodes) {
+            WorkFlowLineage workFlowLineage = 
workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, workFlowCode);
+            workFlowLineagesMap.put(workFlowCode, workFlowLineage);
+            List<ProcessLineage> processLineages = 
workFlowLineageMapper.queryProcessLineageByCode(projectCode, workFlowCode);
+            List<TaskDefinition> taskDefinitionList = new ArrayList<>();
+            for (ProcessLineage processLineage : processLineages) {
+                if (processLineage.getPreTaskCode() > 0) {
+                    taskDefinitionList.add(new 
TaskDefinition(processLineage.getPreTaskCode(), 
processLineage.getPreTaskVersion()));
+                }
+                if (processLineage.getPostTaskCode() > 0) {
+                    taskDefinitionList.add(new 
TaskDefinition(processLineage.getPostTaskCode(), 
processLineage.getPostTaskVersion()));
+                }
+            }
+            sourceWorkFlowCodes = querySourceWorkFlowCodes(projectCode, 
workFlowCode, taskDefinitionList);
+            if (sourceWorkFlowCodes.isEmpty()) {
+                workFlowRelations.add(new WorkFlowRelation(0L, workFlowCode));
+                return;
+            } else {
+                
workFlowLineagesMap.get(workFlowCode).setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes,
 Constants.COMMA));
+                sourceWorkFlowCodes.forEach(code -> workFlowRelations.add(new 
WorkFlowRelation(code, workFlowCode)));
+                recursiveWorkFlow(projectCode, workFlowLineagesMap, 
workFlowRelations, sourceWorkFlowCodes);
+            }
+        }
+    }
+
     @Override
     public Map<String, Object> queryWorkFlowLineage(long projectCode) {
         Map<String, Object> result = new HashMap<>();
@@ -87,58 +137,65 @@ public class WorkFlowLineageServiceImpl extends 
BaseServiceImpl implements WorkF
             return result;
         }
         List<ProcessLineage> processLineages = 
workFlowLineageMapper.queryProcessLineage(projectCode);
-
-        Map<Long, WorkFlowLineage> workFlowLineages = new HashMap<>();
+        Map<Long, WorkFlowLineage> workFlowLineagesMap = new HashMap<>();
         Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
-
-        for (ProcessLineage processLineage : processLineages) {
-            getRelation(workFlowLineages, workFlowRelations, processLineage);
+        if (!processLineages.isEmpty()) {
+            List<WorkFlowLineage> workFlowLineages = 
workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages);
+            workFlowLineagesMap = 
workFlowLineages.stream().collect(Collectors.toMap(WorkFlowLineage::getWorkFlowCode,
 workFlowLineage -> workFlowLineage));
+            Map<Long, List<TaskDefinition>> workFlowMap = new HashMap<>();
+            for (ProcessLineage processLineage : processLineages) {
+                workFlowMap.compute(processLineage.getProcessDefinitionCode(), 
(k, v) -> {
+                    if (v == null) {
+                        v = new ArrayList<>();
+                    }
+                    if (processLineage.getPreTaskCode() > 0) {
+                        v.add(new 
TaskDefinition(processLineage.getPreTaskCode(), 
processLineage.getPreTaskVersion()));
+                    }
+                    if (processLineage.getPostTaskCode() > 0) {
+                        v.add(new 
TaskDefinition(processLineage.getPostTaskCode(), 
processLineage.getPostTaskVersion()));
+                    }
+                    return v;
+                });
+            }
+            for (Entry<Long, List<TaskDefinition>> workFlow : 
workFlowMap.entrySet()) {
+                Set<Long> sourceWorkFlowCodes = 
querySourceWorkFlowCodes(projectCode, workFlow.getKey(), workFlow.getValue());
+                if (sourceWorkFlowCodes.isEmpty()) {
+                    workFlowRelations.add(new WorkFlowRelation(0L, 
workFlow.getKey()));
+                } else {
+                    
workFlowLineagesMap.get(workFlow.getKey()).setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes,
 Constants.COMMA));
+                    sourceWorkFlowCodes.forEach(code -> 
workFlowRelations.add(new WorkFlowRelation(code, workFlow.getKey())));
+                }
+            }
         }
-
         Map<String, Object> workFlowLists = new HashMap<>();
-        workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages.values());
+        workFlowLists.put(Constants.WORKFLOW_LIST, 
workFlowLineagesMap.values());
         workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations);
         result.put(Constants.DATA_LIST, workFlowLists);
         putMsg(result, Status.SUCCESS);
         return result;
     }
 
-    private void getRelation(Map<Long, WorkFlowLineage> workFlowLineageMap,
-                             Set<WorkFlowRelation> workFlowRelations,
-                             ProcessLineage processLineage) {
-        List<ProcessLineage> relations = 
workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(),
-            processLineage.getProcessDefinitionCode(), 
processLineage.getPostTaskCode(), processLineage.getPostTaskVersion());
-        if (!relations.isEmpty()) {
-            Set<Long> preWorkFlowCodes = new HashSet<>();
-            List<ProcessLineage> preRelations = 
workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(),
-                processLineage.getProcessDefinitionCode(), 
processLineage.getPreTaskCode(), processLineage.getPreTaskVersion());
-            for (ProcessLineage preRelation : preRelations) {
-                preWorkFlowCodes.add(preRelation.getProcessDefinitionCode());
-            }
-            ProcessLineage postRelation = relations.get(0);
-            WorkFlowLineage post = 
workFlowLineageMapper.queryWorkFlowLineageByCode(postRelation.getProjectCode(), 
postRelation.getProcessDefinitionCode());
-            preWorkFlowCodes.remove(post.getWorkFlowCode());
-            if (!workFlowLineageMap.containsKey(post.getWorkFlowCode())) {
-                post.setSourceWorkFlowCode(StringUtils.join(preWorkFlowCodes, 
","));
-                workFlowLineageMap.put(post.getWorkFlowCode(), post);
-            } else {
-                WorkFlowLineage workFlowLineage = 
workFlowLineageMap.get(post.getWorkFlowCode());
-                String sourceWorkFlowCode = 
workFlowLineage.getSourceWorkFlowCode();
-                if (StringUtils.isBlank(sourceWorkFlowCode)) {
-                    
post.setSourceWorkFlowCode(StringUtils.join(preWorkFlowCodes, ","));
-                } else {
-                    if (!preWorkFlowCodes.isEmpty()) {
-                        
workFlowLineage.setSourceWorkFlowCode(sourceWorkFlowCode + "," + 
StringUtils.join(preWorkFlowCodes, ","));
+    private Set<Long> querySourceWorkFlowCodes(long projectCode, long 
workFlowCode, List<TaskDefinition> taskDefinitionList) {
+        Set<Long> sourceWorkFlowCodes = new HashSet<>();
+        List<TaskDefinitionLog> taskDefinitionLogs = 
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionList);
+        for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+            if (taskDefinitionLog.getProjectCode() == projectCode) {
+                if 
(taskDefinitionLog.getTaskType().equals(TaskType.DEPENDENT.getDesc())) {
+                    DependentParameters dependentParameters = 
JSONUtils.parseObject(taskDefinitionLog.getDependence(), 
DependentParameters.class);
+                    if (dependentParameters != null) {
+                        List<DependentTaskModel> dependTaskList = 
dependentParameters.getDependTaskList();
+                        for (DependentTaskModel taskModel : dependTaskList) {
+                            List<DependentItem> dependItemList = 
taskModel.getDependItemList();
+                            for (DependentItem dependentItem : dependItemList) 
{
+                                if (dependentItem.getProjectCode() == 
projectCode && dependentItem.getDefinitionCode() != workFlowCode) {
+                                    
sourceWorkFlowCodes.add(dependentItem.getDefinitionCode());
+                                }
+                            }
+                        }
                     }
                 }
             }
-            if (preWorkFlowCodes.isEmpty()) {
-                workFlowRelations.add(new WorkFlowRelation(0L, 
post.getWorkFlowCode()));
-            } else {
-                for (long workFlowCode : preWorkFlowCodes) {
-                    workFlowRelations.add(new WorkFlowRelation(workFlowCode, 
post.getWorkFlowCode()));
-                }
-            }
         }
+        return sourceWorkFlowCodes;
     }
 }
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
index 5bbe2ac..fc7eee3 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
 
 import java.util.ArrayList;
@@ -57,6 +58,9 @@ public class WorkFlowLineageServiceTest {
     @Mock
     private ProjectMapper projectMapper;
 
+    @Mock
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
     /**
      * get mock Project
      *
@@ -97,23 +101,22 @@ public class WorkFlowLineageServiceTest {
         processLineage.setProcessDefinitionVersion(1);
         processLineage.setProjectCode(1111L);
         processLineages.add(processLineage);
-
         WorkFlowLineage workFlowLineage = new WorkFlowLineage();
         workFlowLineage.setSourceWorkFlowCode("");
+        workFlowLineage.setWorkFlowCode(1111L);
+        List<WorkFlowLineage> workFlowLineages = new ArrayList<>();
+        workFlowLineages.add(workFlowLineage);
 
         when(projectMapper.queryByCode(1L)).thenReturn(project);
         
when(workFlowLineageMapper.queryProcessLineage(project.getCode())).thenReturn(processLineages);
-        
when(workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(), 
processLineage.getProcessDefinitionCode(),
-            processLineage.getPostTaskCode(), 
processLineage.getPreTaskVersion())).thenReturn(processLineages);
-        
when(workFlowLineageMapper.queryWorkFlowLineageByCode(processLineage.getProjectCode(),
 processLineage.getProcessDefinitionCode()))
-            .thenReturn(workFlowLineage);
+        
when(workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages)).thenReturn(workFlowLineages);
 
         Map<String, Object> result = 
workFlowLineageService.queryWorkFlowLineage(1L);
 
         Map<String, Object> workFlowLists = (Map<String, Object>) 
result.get(Constants.DATA_LIST);
-        Collection<WorkFlowLineage> workFlowLineages = 
(Collection<WorkFlowLineage>) workFlowLists.get(Constants.WORKFLOW_LIST);
+        Collection<WorkFlowLineage> workFlowLineageList = 
(Collection<WorkFlowLineage>) workFlowLists.get(Constants.WORKFLOW_LIST);
         Set<WorkFlowRelation> workFlowRelations = (Set<WorkFlowRelation>) 
workFlowLists.get(Constants.WORKFLOW_RELATION_LIST);
-        Assert.assertTrue(workFlowLineages.size() > 0);
+        Assert.assertTrue(workFlowLineageList.size() > 0);
         Assert.assertTrue(workFlowRelations.size() > 0);
     }
 
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java
index 7d6f7d3..196ac0b 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.common.model;
 
 import org.apache.dolphinscheduler.common.enums.DependResult;
@@ -23,16 +24,15 @@ import 
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
  * dependent item
  */
 public class DependentItem {
-
-    private Long definitionCode;
+    private long projectCode;
+    private long definitionCode;
     private String depTasks;
     private String cycle;
     private String dateValue;
     private DependResult dependResult;
     private ExecutionStatus status;
 
-
-    public String getKey(){
+    public String getKey() {
         return String.format("%d-%s-%s-%s",
                 getDefinitionCode(),
                 getDepTasks(),
@@ -40,11 +40,19 @@ public class DependentItem {
                 getDateValue());
     }
 
-    public Long getDefinitionCode() {
+    public long getProjectCode() {
+        return projectCode;
+    }
+
+    public void setProjectCode(long projectCode) {
+        this.projectCode = projectCode;
+    }
+
+    public long getDefinitionCode() {
         return definitionCode;
     }
 
-    public void setDefinitionCode(Long definitionCode) {
+    public void setDefinitionCode(long definitionCode) {
         this.definitionCode = definitionCode;
     }
 
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
index 5b4d7d9..f89992e 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.dao.entity;
 
 import java.util.Objects;
@@ -45,4 +46,22 @@ public class WorkFlowRelation {
         this.sourceWorkFlowCode = sourceWorkFlowCode;
         this.targetWorkFlowCode = targetWorkFlowCode;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        WorkFlowRelation that = (WorkFlowRelation) o;
+        return sourceWorkFlowCode == that.sourceWorkFlowCode
+            && targetWorkFlowCode == that.targetWorkFlowCode;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(sourceWorkFlowCode, targetWorkFlowCode);
+    }
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
index d4c7838..249e42a 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.dao.mapper;
 
 import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
@@ -44,6 +45,14 @@ public interface WorkFlowLineageMapper {
     WorkFlowLineage queryWorkFlowLineageByCode(@Param("projectCode") long 
projectCode, @Param("workFlowCode") long workFlowCode);
 
     /**
+     * queryWorkFlowLineageByCode
+     *
+     * @param processLineages processLineages
+     * @return WorkFlowLineage list
+     */
+    List<WorkFlowLineage> 
queryWorkFlowLineageByLineage(@Param("processLineages") List<ProcessLineage> 
processLineages);
+
+    /**
      * queryProcessLineage
      *
      * @param projectCode projectCode
@@ -54,13 +63,10 @@ public interface WorkFlowLineageMapper {
     /**
      * queryCodeRelation
      *
-     * @param taskCode taskCode
-     * @param taskVersion taskVersion
+     * @param projectCode projectCode
      * @param processDefinitionCode processDefinitionCode
      * @return ProcessLineage list
      */
-    List<ProcessLineage> queryCodeRelation(@Param("projectCode") long 
projectCode,
-                                           @Param("processDefinitionCode") 
long processDefinitionCode,
-                                           @Param("taskCode") long taskCode,
-                                           @Param("taskVersion") int 
taskVersion);
+    List<ProcessLineage> queryProcessLineageByCode(@Param("projectCode") long 
projectCode,
+                                                   
@Param("processDefinitionCode") long processDefinitionCode);
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index 366c4a6..638ac5d 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -42,6 +42,26 @@
         where tepd.project_code = #{projectCode} and tepd.code = 
#{workFlowCode}
     </select>
 
+    <select id="queryWorkFlowLineageByLineage" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
+        select tepd.code as work_flow_code,tepd.name as work_flow_name,
+               "" as source_work_flow_code,
+                tepd.release_state as work_flow_publish_status,
+                tes.start_time as schedule_start_time,
+                tes.end_time as schedule_end_time,
+                tes.crontab as crontab,
+                tes.release_state as schedule_publish_status
+        from t_ds_process_definition tepd
+        left join t_ds_schedules tes on tepd.code = tes.process_definition_code
+        where 1=1
+        <if test="processLineages != null and processLineages.size != 0">
+            and
+            <foreach collection="processLineages" index="index" item="item" 
open="(" separator=" or " close=")">
+                (tepd.project_code = #{item.projectCode}
+                and tepd.code = #{item.processDefinitionCode})
+            </foreach>
+        </if>
+    </select>
+
     <select id="queryProcessLineage" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
     select ptr.project_code,
         ptr.post_task_code,
@@ -56,7 +76,7 @@
         where pd.project_code = #{projectCode}
     </select>
 
-    <select id="queryCodeRelation" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
+    <select id="queryProcessLineageByCode" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
         select project_code,
                post_task_code,
                post_task_version,
@@ -67,7 +87,5 @@
          from t_ds_process_task_relation
          where project_code = #{projectCode}
                 and process_definition_code = #{processDefinitionCode}
-                and post_task_code = #{taskCode}
-                and post_task_version = #{taskVersion}
     </select>
 </mapper>
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
index da1a617..fc6b090 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
@@ -125,8 +125,8 @@ public class WorkFlowLineageMapperTest {
     @Test
     public void testQueryCodeRelation() {
         ProcessTaskRelation processTaskRelation = 
insertOneProcessTaskRelation();
-        List<ProcessLineage> workFlowLineages = 
workFlowLineageMapper.queryCodeRelation(processTaskRelation.getProjectCode(),
-            processTaskRelation.getProcessDefinitionCode(), 
processTaskRelation.getPostTaskCode(), 
processTaskRelation.getPostTaskVersion());
+        List<ProcessLineage> workFlowLineages = 
workFlowLineageMapper.queryProcessLineageByCode(processTaskRelation.getProjectCode(),
+            processTaskRelation.getProcessDefinitionCode());
         Assert.assertNotEquals(workFlowLineages.size(), 0);
     }
 
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 2abce01..5805b2d 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -44,7 +44,6 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.DateInterval;
-import org.apache.dolphinscheduler.common.model.PreviousTaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.process.ProcessDag;

Reply via email to