This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 07d6542 [Fix-6139][API] fix workFlowLineage relation (#6217)
07d6542 is described below
commit 07d6542febe1b4ad3f4ca3e69af654b2f6538933
Author: JinyLeeChina <[email protected]>
AuthorDate: Wed Sep 15 17:42:21 2021 +0800
[Fix-6139][API] fix workFlowLineage relation (#6217)
* fix bug of view-tree api
* fix bug of view-tree api
* fix ut
* fix ut
* fix master buildFlowDag error
* fix workFlowLineage relation
* fix ut
* fix ut
* fix ut
Co-authored-by: JinyLeeChina <[email protected]>
---
.../service/impl/WorkFlowLineageServiceImpl.java | 143 ++++++++++++++-------
.../api/service/WorkFlowLineageServiceTest.java | 17 ++-
.../common/model/DependentItem.java | 20 ++-
.../dao/entity/WorkFlowRelation.java | 19 +++
.../dao/mapper/WorkFlowLineageMapper.java | 18 ++-
.../dao/mapper/WorkFlowLineageMapper.xml | 24 +++-
.../dao/mapper/WorkFlowLineageMapperTest.java | 4 +-
.../service/process/ProcessService.java | 1 -
8 files changed, 178 insertions(+), 68 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
index c37b141..d665a1a 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
@@ -20,20 +20,32 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.model.DependentItem;
+import org.apache.dolphinscheduler.common.model.DependentTaskModel;
+import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import org.apache.commons.lang.StringUtils;
+import org.apache.curator.shaded.com.google.common.collect.Sets;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -50,6 +62,9 @@ public class WorkFlowLineageServiceImpl extends
BaseServiceImpl implements WorkF
@Autowired
private ProjectMapper projectMapper;
+ @Autowired
+ private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
@Override
public Map<String, Object> queryWorkFlowLineageByName(long projectCode,
String workFlowName) {
Map<String, Object> result = new HashMap<>();
@@ -72,12 +87,47 @@ public class WorkFlowLineageServiceImpl extends
BaseServiceImpl implements WorkF
putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
return result;
}
- WorkFlowLineage workFlowLineage =
workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, workFlowCode);
- result.put(Constants.DATA_LIST, workFlowLineage);
+ Map<Long, WorkFlowLineage> workFlowLineagesMap = new HashMap<>();
+ Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
+ Set<Long> sourceWorkFlowCodes = Sets.newHashSet(workFlowCode);
+ recursiveWorkFlow(projectCode, workFlowLineagesMap, workFlowRelations,
sourceWorkFlowCodes);
+ Map<String, Object> workFlowLists = new HashMap<>();
+ workFlowLists.put(Constants.WORKFLOW_LIST,
workFlowLineagesMap.values());
+ workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations);
+ result.put(Constants.DATA_LIST, workFlowLists);
putMsg(result, Status.SUCCESS);
return result;
}
+ private void recursiveWorkFlow(long projectCode,
+ Map<Long, WorkFlowLineage>
workFlowLineagesMap,
+ Set<WorkFlowRelation> workFlowRelations,
+ Set<Long> sourceWorkFlowCodes) {
+ for (Long workFlowCode : sourceWorkFlowCodes) {
+ WorkFlowLineage workFlowLineage =
workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, workFlowCode);
+ workFlowLineagesMap.put(workFlowCode, workFlowLineage);
+ List<ProcessLineage> processLineages =
workFlowLineageMapper.queryProcessLineageByCode(projectCode, workFlowCode);
+ List<TaskDefinition> taskDefinitionList = new ArrayList<>();
+ for (ProcessLineage processLineage : processLineages) {
+ if (processLineage.getPreTaskCode() > 0) {
+ taskDefinitionList.add(new
TaskDefinition(processLineage.getPreTaskCode(),
processLineage.getPreTaskVersion()));
+ }
+ if (processLineage.getPostTaskCode() > 0) {
+ taskDefinitionList.add(new
TaskDefinition(processLineage.getPostTaskCode(),
processLineage.getPostTaskVersion()));
+ }
+ }
+ sourceWorkFlowCodes = querySourceWorkFlowCodes(projectCode,
workFlowCode, taskDefinitionList);
+ if (sourceWorkFlowCodes.isEmpty()) {
+ workFlowRelations.add(new WorkFlowRelation(0L, workFlowCode));
+ return;
+ } else {
+
workFlowLineagesMap.get(workFlowCode).setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes,
Constants.COMMA));
+ sourceWorkFlowCodes.forEach(code -> workFlowRelations.add(new
WorkFlowRelation(code, workFlowCode)));
+ recursiveWorkFlow(projectCode, workFlowLineagesMap,
workFlowRelations, sourceWorkFlowCodes);
+ }
+ }
+ }
+
@Override
public Map<String, Object> queryWorkFlowLineage(long projectCode) {
Map<String, Object> result = new HashMap<>();
@@ -87,58 +137,65 @@ public class WorkFlowLineageServiceImpl extends
BaseServiceImpl implements WorkF
return result;
}
List<ProcessLineage> processLineages =
workFlowLineageMapper.queryProcessLineage(projectCode);
-
- Map<Long, WorkFlowLineage> workFlowLineages = new HashMap<>();
+ Map<Long, WorkFlowLineage> workFlowLineagesMap = new HashMap<>();
Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
-
- for (ProcessLineage processLineage : processLineages) {
- getRelation(workFlowLineages, workFlowRelations, processLineage);
+ if (!processLineages.isEmpty()) {
+ List<WorkFlowLineage> workFlowLineages =
workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages);
+ workFlowLineagesMap =
workFlowLineages.stream().collect(Collectors.toMap(WorkFlowLineage::getWorkFlowCode,
workFlowLineage -> workFlowLineage));
+ Map<Long, List<TaskDefinition>> workFlowMap = new HashMap<>();
+ for (ProcessLineage processLineage : processLineages) {
+ workFlowMap.compute(processLineage.getProcessDefinitionCode(),
(k, v) -> {
+ if (v == null) {
+ v = new ArrayList<>();
+ }
+ if (processLineage.getPreTaskCode() > 0) {
+ v.add(new
TaskDefinition(processLineage.getPreTaskCode(),
processLineage.getPreTaskVersion()));
+ }
+ if (processLineage.getPostTaskCode() > 0) {
+ v.add(new
TaskDefinition(processLineage.getPostTaskCode(),
processLineage.getPostTaskVersion()));
+ }
+ return v;
+ });
+ }
+ for (Entry<Long, List<TaskDefinition>> workFlow :
workFlowMap.entrySet()) {
+ Set<Long> sourceWorkFlowCodes =
querySourceWorkFlowCodes(projectCode, workFlow.getKey(), workFlow.getValue());
+ if (sourceWorkFlowCodes.isEmpty()) {
+ workFlowRelations.add(new WorkFlowRelation(0L,
workFlow.getKey()));
+ } else {
+
workFlowLineagesMap.get(workFlow.getKey()).setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes,
Constants.COMMA));
+ sourceWorkFlowCodes.forEach(code ->
workFlowRelations.add(new WorkFlowRelation(code, workFlow.getKey())));
+ }
+ }
}
-
Map<String, Object> workFlowLists = new HashMap<>();
- workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages.values());
+ workFlowLists.put(Constants.WORKFLOW_LIST,
workFlowLineagesMap.values());
workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations);
result.put(Constants.DATA_LIST, workFlowLists);
putMsg(result, Status.SUCCESS);
return result;
}
- private void getRelation(Map<Long, WorkFlowLineage> workFlowLineageMap,
- Set<WorkFlowRelation> workFlowRelations,
- ProcessLineage processLineage) {
- List<ProcessLineage> relations =
workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(),
- processLineage.getProcessDefinitionCode(),
processLineage.getPostTaskCode(), processLineage.getPostTaskVersion());
- if (!relations.isEmpty()) {
- Set<Long> preWorkFlowCodes = new HashSet<>();
- List<ProcessLineage> preRelations =
workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(),
- processLineage.getProcessDefinitionCode(),
processLineage.getPreTaskCode(), processLineage.getPreTaskVersion());
- for (ProcessLineage preRelation : preRelations) {
- preWorkFlowCodes.add(preRelation.getProcessDefinitionCode());
- }
- ProcessLineage postRelation = relations.get(0);
- WorkFlowLineage post =
workFlowLineageMapper.queryWorkFlowLineageByCode(postRelation.getProjectCode(),
postRelation.getProcessDefinitionCode());
- preWorkFlowCodes.remove(post.getWorkFlowCode());
- if (!workFlowLineageMap.containsKey(post.getWorkFlowCode())) {
- post.setSourceWorkFlowCode(StringUtils.join(preWorkFlowCodes,
","));
- workFlowLineageMap.put(post.getWorkFlowCode(), post);
- } else {
- WorkFlowLineage workFlowLineage =
workFlowLineageMap.get(post.getWorkFlowCode());
- String sourceWorkFlowCode =
workFlowLineage.getSourceWorkFlowCode();
- if (StringUtils.isBlank(sourceWorkFlowCode)) {
-
post.setSourceWorkFlowCode(StringUtils.join(preWorkFlowCodes, ","));
- } else {
- if (!preWorkFlowCodes.isEmpty()) {
-
workFlowLineage.setSourceWorkFlowCode(sourceWorkFlowCode + "," +
StringUtils.join(preWorkFlowCodes, ","));
+ private Set<Long> querySourceWorkFlowCodes(long projectCode, long
workFlowCode, List<TaskDefinition> taskDefinitionList) {
+ Set<Long> sourceWorkFlowCodes = new HashSet<>();
+ List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionList);
+ for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+ if (taskDefinitionLog.getProjectCode() == projectCode) {
+ if
(taskDefinitionLog.getTaskType().equals(TaskType.DEPENDENT.getDesc())) {
+ DependentParameters dependentParameters =
JSONUtils.parseObject(taskDefinitionLog.getDependence(),
DependentParameters.class);
+ if (dependentParameters != null) {
+ List<DependentTaskModel> dependTaskList =
dependentParameters.getDependTaskList();
+ for (DependentTaskModel taskModel : dependTaskList) {
+ List<DependentItem> dependItemList =
taskModel.getDependItemList();
+ for (DependentItem dependentItem : dependItemList)
{
+ if (dependentItem.getProjectCode() ==
projectCode && dependentItem.getDefinitionCode() != workFlowCode) {
+
sourceWorkFlowCodes.add(dependentItem.getDefinitionCode());
+ }
+ }
+ }
}
}
}
- if (preWorkFlowCodes.isEmpty()) {
- workFlowRelations.add(new WorkFlowRelation(0L,
post.getWorkFlowCode()));
- } else {
- for (long workFlowCode : preWorkFlowCodes) {
- workFlowRelations.add(new WorkFlowRelation(workFlowCode,
post.getWorkFlowCode()));
- }
- }
}
+ return sourceWorkFlowCodes;
}
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
index 5bbe2ac..fc7eee3 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import java.util.ArrayList;
@@ -57,6 +58,9 @@ public class WorkFlowLineageServiceTest {
@Mock
private ProjectMapper projectMapper;
+ @Mock
+ private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
/**
* get mock Project
*
@@ -97,23 +101,22 @@ public class WorkFlowLineageServiceTest {
processLineage.setProcessDefinitionVersion(1);
processLineage.setProjectCode(1111L);
processLineages.add(processLineage);
-
WorkFlowLineage workFlowLineage = new WorkFlowLineage();
workFlowLineage.setSourceWorkFlowCode("");
+ workFlowLineage.setWorkFlowCode(1111L);
+ List<WorkFlowLineage> workFlowLineages = new ArrayList<>();
+ workFlowLineages.add(workFlowLineage);
when(projectMapper.queryByCode(1L)).thenReturn(project);
when(workFlowLineageMapper.queryProcessLineage(project.getCode())).thenReturn(processLineages);
-
when(workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(),
processLineage.getProcessDefinitionCode(),
- processLineage.getPostTaskCode(),
processLineage.getPreTaskVersion())).thenReturn(processLineages);
-
when(workFlowLineageMapper.queryWorkFlowLineageByCode(processLineage.getProjectCode(),
processLineage.getProcessDefinitionCode()))
- .thenReturn(workFlowLineage);
+
when(workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages)).thenReturn(workFlowLineages);
Map<String, Object> result =
workFlowLineageService.queryWorkFlowLineage(1L);
Map<String, Object> workFlowLists = (Map<String, Object>)
result.get(Constants.DATA_LIST);
- Collection<WorkFlowLineage> workFlowLineages =
(Collection<WorkFlowLineage>) workFlowLists.get(Constants.WORKFLOW_LIST);
+ Collection<WorkFlowLineage> workFlowLineageList =
(Collection<WorkFlowLineage>) workFlowLists.get(Constants.WORKFLOW_LIST);
Set<WorkFlowRelation> workFlowRelations = (Set<WorkFlowRelation>)
workFlowLists.get(Constants.WORKFLOW_RELATION_LIST);
- Assert.assertTrue(workFlowLineages.size() > 0);
+ Assert.assertTrue(workFlowLineageList.size() > 0);
Assert.assertTrue(workFlowRelations.size() > 0);
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java
index 7d6f7d3..196ac0b 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common.model;
import org.apache.dolphinscheduler.common.enums.DependResult;
@@ -23,16 +24,15 @@ import
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
* dependent item
*/
public class DependentItem {
-
- private Long definitionCode;
+ private long projectCode;
+ private long definitionCode;
private String depTasks;
private String cycle;
private String dateValue;
private DependResult dependResult;
private ExecutionStatus status;
-
- public String getKey(){
+ public String getKey() {
return String.format("%d-%s-%s-%s",
getDefinitionCode(),
getDepTasks(),
@@ -40,11 +40,19 @@ public class DependentItem {
getDateValue());
}
- public Long getDefinitionCode() {
+ public long getProjectCode() {
+ return projectCode;
+ }
+
+ public void setProjectCode(long projectCode) {
+ this.projectCode = projectCode;
+ }
+
+ public long getDefinitionCode() {
return definitionCode;
}
- public void setDefinitionCode(Long definitionCode) {
+ public void setDefinitionCode(long definitionCode) {
this.definitionCode = definitionCode;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
index 5b4d7d9..f89992e 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.dao.entity;
import java.util.Objects;
@@ -45,4 +46,22 @@ public class WorkFlowRelation {
this.sourceWorkFlowCode = sourceWorkFlowCode;
this.targetWorkFlowCode = targetWorkFlowCode;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WorkFlowRelation that = (WorkFlowRelation) o;
+ return sourceWorkFlowCode == that.sourceWorkFlowCode
+ && targetWorkFlowCode == that.targetWorkFlowCode;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sourceWorkFlowCode, targetWorkFlowCode);
+ }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
index d4c7838..249e42a 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
@@ -44,6 +45,14 @@ public interface WorkFlowLineageMapper {
WorkFlowLineage queryWorkFlowLineageByCode(@Param("projectCode") long
projectCode, @Param("workFlowCode") long workFlowCode);
/**
+ * queryWorkFlowLineageByCode
+ *
+ * @param processLineages processLineages
+ * @return WorkFlowLineage list
+ */
+ List<WorkFlowLineage>
queryWorkFlowLineageByLineage(@Param("processLineages") List<ProcessLineage>
processLineages);
+
+ /**
* queryProcessLineage
*
* @param projectCode projectCode
@@ -54,13 +63,10 @@ public interface WorkFlowLineageMapper {
/**
* queryCodeRelation
*
- * @param taskCode taskCode
- * @param taskVersion taskVersion
+ * @param projectCode projectCode
* @param processDefinitionCode processDefinitionCode
* @return ProcessLineage list
*/
- List<ProcessLineage> queryCodeRelation(@Param("projectCode") long
projectCode,
- @Param("processDefinitionCode")
long processDefinitionCode,
- @Param("taskCode") long taskCode,
- @Param("taskVersion") int
taskVersion);
+ List<ProcessLineage> queryProcessLineageByCode(@Param("projectCode") long
projectCode,
+
@Param("processDefinitionCode") long processDefinitionCode);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index 366c4a6..638ac5d 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -42,6 +42,26 @@
where tepd.project_code = #{projectCode} and tepd.code =
#{workFlowCode}
</select>
+ <select id="queryWorkFlowLineageByLineage"
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
+ select tepd.code as work_flow_code,tepd.name as work_flow_name,
+ "" as source_work_flow_code,
+ tepd.release_state as work_flow_publish_status,
+ tes.start_time as schedule_start_time,
+ tes.end_time as schedule_end_time,
+ tes.crontab as crontab,
+ tes.release_state as schedule_publish_status
+ from t_ds_process_definition tepd
+ left join t_ds_schedules tes on tepd.code = tes.process_definition_code
+ where 1=1
+ <if test="processLineages != null and processLineages.size != 0">
+ and
+ <foreach collection="processLineages" index="index" item="item"
open="(" separator=" or " close=")">
+ (tepd.project_code = #{item.projectCode}
+ and tepd.code = #{item.processDefinitionCode})
+ </foreach>
+ </if>
+ </select>
+
<select id="queryProcessLineage"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
select ptr.project_code,
ptr.post_task_code,
@@ -56,7 +76,7 @@
where pd.project_code = #{projectCode}
</select>
- <select id="queryCodeRelation"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
+ <select id="queryProcessLineageByCode"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
select project_code,
post_task_code,
post_task_version,
@@ -67,7 +87,5 @@
from t_ds_process_task_relation
where project_code = #{projectCode}
and process_definition_code = #{processDefinitionCode}
- and post_task_code = #{taskCode}
- and post_task_version = #{taskVersion}
</select>
</mapper>
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
index da1a617..fc6b090 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
@@ -125,8 +125,8 @@ public class WorkFlowLineageMapperTest {
@Test
public void testQueryCodeRelation() {
ProcessTaskRelation processTaskRelation =
insertOneProcessTaskRelation();
- List<ProcessLineage> workFlowLineages =
workFlowLineageMapper.queryCodeRelation(processTaskRelation.getProjectCode(),
- processTaskRelation.getProcessDefinitionCode(),
processTaskRelation.getPostTaskCode(),
processTaskRelation.getPostTaskVersion());
+ List<ProcessLineage> workFlowLineages =
workFlowLineageMapper.queryProcessLineageByCode(processTaskRelation.getProjectCode(),
+ processTaskRelation.getProcessDefinitionCode());
Assert.assertNotEquals(workFlowLineages.size(), 0);
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 2abce01..5805b2d 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -44,7 +44,6 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.DateInterval;
-import org.apache.dolphinscheduler.common.model.PreviousTaskNode;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;