This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 4ddfb85 [Fix-6156] [API] refactor workflow lineage api (#6157)
4ddfb85 is described below
commit 4ddfb855a3d1fbdc9faa99c4f280d6163a937449
Author: JinyLeeChina <[email protected]>
AuthorDate: Fri Sep 10 14:20:18 2021 +0800
[Fix-6156] [API] refactor workflow lineage api (#6157)
* fix mysql create sentence bug
* fix mysql create sentence bug
* fix genTaskCodeList return same code and save proces error
* refactor workflow lineage api
Co-authored-by: JinyLeeChina <[email protected]>
---
.../api/controller/WorkFlowLineageController.java | 50 +++++-----
.../api/service/WorkFlowLineageService.java | 5 +-
.../service/impl/WorkFlowLineageServiceImpl.java | 109 ++++++++++++---------
.../controller/WorkFlowLineageControllerTest.java | 44 ++++++---
.../api/service/WorkFlowLineageServiceTest.java | 35 +++----
.../dao/entity/ProcessLineage.java | 53 +++++-----
.../dao/entity/WorkFlowLineage.java | 28 +++---
.../dao/entity/WorkFlowRelation.java | 38 +++----
.../dao/mapper/WorkFlowLineageMapper.java | 38 ++++---
.../dao/mapper/WorkFlowLineageMapper.xml | 67 ++++++-------
.../dao/mapper/WorkFlowLineageMapperTest.java | 29 ++----
11 files changed, 247 insertions(+), 249 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
index f32a280..4e88486 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
@@ -27,10 +27,8 @@ import
org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,23 +52,23 @@ import springfox.documentation.annotations.ApiIgnore;
*/
@Api(tags = "WORK_FLOW_LINEAGE_TAG")
@RestController
-@RequestMapping("lineages/{projectCode}")
+@RequestMapping("projects/{projectCode}/lineages")
public class WorkFlowLineageController extends BaseController {
private static final Logger logger =
LoggerFactory.getLogger(WorkFlowLineageController.class);
@Autowired
private WorkFlowLineageService workFlowLineageService;
- @ApiOperation(value = "queryWorkFlowLineageByName", notes =
"QUERY_WORKFLOW_LINEAGE_BY_NAME_NOTES")
- @GetMapping(value = "/list-name")
+ @ApiOperation(value = "queryLineageByWorkFlowName", notes =
"QUERY_WORKFLOW_LINEAGE_BY_NAME_NOTES")
+ @GetMapping(value = "/query-by-name")
@ResponseStatus(HttpStatus.OK)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<List<WorkFlowLineage>> queryWorkFlowLineageByName(@ApiIgnore
@RequestAttribute(value = SESSION_USER) User loginUser,
-
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true,
example = "1") @PathVariable long projectCode,
- @ApiIgnore
@RequestParam(value = "searchVal", required = false) String searchVal) {
+
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true)
@PathVariable long projectCode,
+
@RequestParam(value = "workFlowName", required = false) String workFlowName) {
try {
- searchVal = ParameterUtils.handleEscapes(searchVal);
- Map<String, Object> result =
workFlowLineageService.queryWorkFlowLineageByName(searchVal, projectCode);
+ workFlowName = ParameterUtils.handleEscapes(workFlowName);
+ Map<String, Object> result =
workFlowLineageService.queryWorkFlowLineageByName(projectCode, workFlowName);
return returnDataList(result);
} catch (Exception e) {
logger.error(QUERY_WORKFLOW_LINEAGE_ERROR.getMsg(), e);
@@ -78,24 +76,30 @@ public class WorkFlowLineageController extends
BaseController {
}
}
- @ApiOperation(value = "queryWorkFlowLineageByIds", notes =
"QUERY_WORKFLOW_LINEAGE_BY_IDS_NOTES")
- @GetMapping(value = "/list-ids")
+ @ApiOperation(value = "queryLineageByWorkFlowCode", notes =
"QUERY_WORKFLOW_LINEAGE_BY_CODES_NOTES")
+ @GetMapping(value = "/{workFlowCode}")
@ResponseStatus(HttpStatus.OK)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
- public Result<Map<String, Object>> queryWorkFlowLineageByIds(@ApiIgnore
@RequestAttribute(value = SESSION_USER) User loginUser,
-
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true,
example = "1") @PathVariable long projectCode,
- @ApiIgnore
@RequestParam(value = "ids", required = false) String ids) {
+ public Result<Map<String, Object>> queryWorkFlowLineageByCode(@ApiIgnore
@RequestAttribute(value = SESSION_USER) User loginUser,
+
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true)
@PathVariable long projectCode,
+
@PathVariable(value = "workFlowCode", required = true) long workFlowCode) {
try {
- ids = ParameterUtils.handleEscapes(ids);
- Set<Integer> idsSet = new HashSet<>();
- if (ids != null) {
- String[] idsStr = ids.split(",");
- for (String id : idsStr) {
- idsSet.add(Integer.parseInt(id));
- }
- }
+ Map<String, Object> result =
workFlowLineageService.queryWorkFlowLineageByCode(projectCode, workFlowCode);
+ return returnDataList(result);
+ } catch (Exception e) {
+ logger.error(QUERY_WORKFLOW_LINEAGE_ERROR.getMsg(), e);
+ return error(QUERY_WORKFLOW_LINEAGE_ERROR.getCode(),
QUERY_WORKFLOW_LINEAGE_ERROR.getMsg());
+ }
+ }
- Map<String, Object> result =
workFlowLineageService.queryWorkFlowLineageByIds(idsSet, projectCode);
+ @ApiOperation(value = "queryWorkFlowList", notes =
"QUERY_WORKFLOW_LINEAGE_NOTES")
+ @GetMapping(value = "/list")
+ @ResponseStatus(HttpStatus.OK)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<Map<String, Object>> queryWorkFlowLineageByIds(@ApiIgnore
@RequestAttribute(value = SESSION_USER) User loginUser,
+
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true)
@PathVariable long projectCode) {
+ try {
+ Map<String, Object> result =
workFlowLineageService.queryWorkFlowLineage(projectCode);
return returnDataList(result);
} catch (Exception e) {
logger.error(QUERY_WORKFLOW_LINEAGE_ERROR.getMsg(), e);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
index c88d352..64ea003 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
@@ -25,8 +25,9 @@ import java.util.Set;
*/
public interface WorkFlowLineageService {
- Map<String, Object> queryWorkFlowLineageByName(String workFlowName, long
projectCode);
+ Map<String, Object> queryWorkFlowLineageByName(long projectCode, String
workFlowName);
- Map<String, Object> queryWorkFlowLineageByIds(Set<Integer> ids, long
projectCode);
+ Map<String, Object> queryWorkFlowLineageByCode(long projectCode, long
workFlowCode);
+ Map<String, Object> queryWorkFlowLineage(long projectCode);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
index bcf7bdd..3273b7b 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
@@ -50,65 +50,44 @@ public class WorkFlowLineageServiceImpl extends
BaseServiceImpl implements WorkF
private ProjectMapper projectMapper;
@Override
- public Map<String, Object> queryWorkFlowLineageByName(String workFlowName,
long projectCode) {
- Project project = projectMapper.queryByCode(projectCode);
+ public Map<String, Object> queryWorkFlowLineageByName(long projectCode,
String workFlowName) {
Map<String, Object> result = new HashMap<>();
- List<WorkFlowLineage> workFlowLineageList =
workFlowLineageMapper.queryByName(workFlowName, project.getCode());
+ Project project = projectMapper.queryByCode(projectCode);
+ if (project == null) {
+ putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
+ return result;
+ }
+ List<WorkFlowLineage> workFlowLineageList =
workFlowLineageMapper.queryWorkFlowLineageByName(projectCode, workFlowName);
result.put(Constants.DATA_LIST, workFlowLineageList);
putMsg(result, Status.SUCCESS);
return result;
}
- private void getRelation(Map<Integer, WorkFlowLineage> workFlowLineageMap,
- Set<WorkFlowRelation> workFlowRelations,
- ProcessLineage processLineage) {
- List<ProcessLineage> relations =
workFlowLineageMapper.queryCodeRelation(
- processLineage.getPostTaskCode(),
processLineage.getPostTaskVersion(),
- processLineage.getProcessDefinitionCode(),
processLineage.getProjectCode());
- if (!relations.isEmpty()) {
- Set<Integer> preWorkFlowIds = new HashSet<>();
- List<ProcessLineage> preRelations =
workFlowLineageMapper.queryCodeRelation(
- processLineage.getPreTaskCode(),
processLineage.getPreTaskVersion(),
- processLineage.getProcessDefinitionCode(),
processLineage.getProjectCode());
- for (ProcessLineage preRelation : preRelations) {
- WorkFlowLineage pre =
workFlowLineageMapper.queryWorkFlowLineageByCode(
- preRelation.getProcessDefinitionCode(),
preRelation.getProjectCode());
- preWorkFlowIds.add(pre.getWorkFlowId());
- }
- ProcessLineage postRelation = relations.get(0);
- WorkFlowLineage post =
workFlowLineageMapper.queryWorkFlowLineageByCode(
- postRelation.getProcessDefinitionCode(),
postRelation.getProjectCode());
- if (!workFlowLineageMap.containsKey(post.getWorkFlowId())) {
- post.setSourceWorkFlowId(StringUtils.join(preWorkFlowIds,
","));
- workFlowLineageMap.put(post.getWorkFlowId(), post);
- } else {
- WorkFlowLineage workFlowLineage =
workFlowLineageMap.get(post.getWorkFlowId());
- String sourceWorkFlowId =
workFlowLineage.getSourceWorkFlowId();
- if (sourceWorkFlowId.equals("")) {
-
workFlowLineage.setSourceWorkFlowId(StringUtils.join(preWorkFlowIds, ","));
- } else {
- if (!preWorkFlowIds.isEmpty()) {
- workFlowLineage.setSourceWorkFlowId(sourceWorkFlowId +
"," + StringUtils.join(preWorkFlowIds, ","));
- }
- }
- }
- if (preWorkFlowIds.isEmpty()) {
- workFlowRelations.add(new WorkFlowRelation(0,
post.getWorkFlowId()));
- } else {
- for (Integer workFlowId : preWorkFlowIds) {
- workFlowRelations.add(new WorkFlowRelation(workFlowId,
post.getWorkFlowId()));
- }
- }
+ @Override
+ public Map<String, Object> queryWorkFlowLineageByCode(long projectCode,
long workFlowCode) {
+ Map<String, Object> result = new HashMap<>();
+ Project project = projectMapper.queryByCode(projectCode);
+ if (project == null) {
+ putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
+ return result;
}
+ WorkFlowLineage workFlowLineage =
workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, workFlowCode);
+ result.put(Constants.DATA_LIST, workFlowLineage);
+ putMsg(result, Status.SUCCESS);
+ return result;
}
@Override
- public Map<String, Object> queryWorkFlowLineageByIds(Set<Integer> ids,
long projectCode) {
+ public Map<String, Object> queryWorkFlowLineage(long projectCode) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
- List<ProcessLineage> processLineages =
workFlowLineageMapper.queryRelationByIds(ids, project.getCode());
+ if (project == null) {
+ putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
+ return result;
+ }
+ List<ProcessLineage> processLineages =
workFlowLineageMapper.queryProcessLineage(projectCode);
- Map<Integer, WorkFlowLineage> workFlowLineages = new HashMap<>();
+ Map<Long, WorkFlowLineage> workFlowLineages = new HashMap<>();
Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
for (ProcessLineage processLineage : processLineages) {
@@ -123,4 +102,42 @@ public class WorkFlowLineageServiceImpl extends
BaseServiceImpl implements WorkF
return result;
}
+ private void getRelation(Map<Long, WorkFlowLineage> workFlowLineageMap,
+ Set<WorkFlowRelation> workFlowRelations,
+ ProcessLineage processLineage) {
+ List<ProcessLineage> relations =
workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(),
+ processLineage.getProcessDefinitionCode(),
processLineage.getPostTaskCode(), processLineage.getPostTaskVersion());
+ if (!relations.isEmpty()) {
+ Set<Long> preWorkFlowCodes = new HashSet<>();
+ List<ProcessLineage> preRelations =
workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(),
+ processLineage.getProcessDefinitionCode(),
processLineage.getPreTaskCode(), processLineage.getPreTaskVersion());
+ for (ProcessLineage preRelation : preRelations) {
+ preWorkFlowCodes.add(preRelation.getProcessDefinitionCode());
+ }
+ ProcessLineage postRelation = relations.get(0);
+ WorkFlowLineage post =
workFlowLineageMapper.queryWorkFlowLineageByCode(postRelation.getProjectCode(),
postRelation.getProcessDefinitionCode());
+ preWorkFlowCodes.remove(post.getWorkFlowCode());
+ if (!workFlowLineageMap.containsKey(post.getWorkFlowCode())) {
+ post.setSourceWorkFlowCode(StringUtils.join(preWorkFlowCodes,
","));
+ workFlowLineageMap.put(post.getWorkFlowCode(), post);
+ } else {
+ WorkFlowLineage workFlowLineage =
workFlowLineageMap.get(post.getWorkFlowCode());
+ String sourceWorkFlowCode =
workFlowLineage.getSourceWorkFlowCode();
+ if (StringUtils.isBlank(sourceWorkFlowCode)) {
+
post.setSourceWorkFlowCode(StringUtils.join(preWorkFlowCodes, ","));
+ } else {
+ if (!preWorkFlowCodes.isEmpty()) {
+
workFlowLineage.setSourceWorkFlowCode(sourceWorkFlowCode + "," +
StringUtils.join(preWorkFlowCodes, ","));
+ }
+ }
+ }
+ if (preWorkFlowCodes.isEmpty()) {
+ workFlowRelations.add(new WorkFlowRelation(0L,
post.getWorkFlowCode()));
+ } else {
+ for (long workFlowCode : preWorkFlowCodes) {
+ workFlowRelations.add(new WorkFlowRelation(workFlowCode,
post.getWorkFlowCode()));
+ }
+ }
+ }
+ }
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java
index 8c413b2..b5422b3 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java
@@ -21,22 +21,27 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.WorkFlowLineageServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.dao.entity.User;
+import java.text.MessageFormat;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
/**
* work flow lineage controller test
*/
-public class WorkFlowLineageControllerTest extends AbstractControllerTest {
+@RunWith(MockitoJUnitRunner.class)
+public class WorkFlowLineageControllerTest {
@InjectMocks
private WorkFlowLineageController workFlowLineageController;
@@ -44,6 +49,26 @@ public class WorkFlowLineageControllerTest extends
AbstractControllerTest {
@Mock
private WorkFlowLineageServiceImpl workFlowLineageService;
+ protected User user;
+
+ @Before
+ public void before() {
+ User loginUser = new User();
+ loginUser.setId(1);
+ loginUser.setUserType(UserType.GENERAL_USER);
+ loginUser.setUserName("admin");
+ user = loginUser;
+ }
+
+ private void putMsg(Map<String, Object> result, Status status, Object...
statusParams) {
+ result.put(Constants.STATUS, status);
+ if (statusParams != null && statusParams.length > 0) {
+ result.put(Constants.MSG, MessageFormat.format(status.getMsg(),
statusParams));
+ } else {
+ result.put(Constants.MSG, status.getMsg());
+ }
+ }
+
@Test
public void testQueryWorkFlowLineageByName() {
long projectCode = 1L;
@@ -51,23 +76,20 @@ public class WorkFlowLineageControllerTest extends
AbstractControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, 1);
-
Mockito.when(workFlowLineageService.queryWorkFlowLineageByName(searchVal,
projectCode)).thenReturn(result);
+
Mockito.when(workFlowLineageService.queryWorkFlowLineageByName(projectCode,
searchVal)).thenReturn(result);
Result response =
workFlowLineageController.queryWorkFlowLineageByName(user, projectCode,
searchVal);
Assert.assertEquals(Status.SUCCESS.getCode(),
response.getCode().intValue());
}
@Test
- public void testQueryWorkFlowLineageByIds() {
+ public void testQueryWorkFlowLineageByCode() {
long projectCode = 1L;
- String ids = "1";
+ long code = 1L;
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, 1);
- Set<Integer> idSet = new HashSet<>();
- idSet.add(1);
- Mockito.when(workFlowLineageService.queryWorkFlowLineageByIds(idSet,
projectCode)).thenReturn(result);
- Result response =
workFlowLineageController.queryWorkFlowLineageByIds(user, projectCode, ids);
+
Mockito.when(workFlowLineageService.queryWorkFlowLineageByCode(projectCode,
code)).thenReturn(result);
+ Result response =
workFlowLineageController.queryWorkFlowLineageByCode(user, projectCode, code);
Assert.assertEquals(Status.SUCCESS.getCode(),
response.getCode().intValue());
}
-
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
index c43a9ad3..5bbe2ac 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
@@ -30,7 +30,6 @@ import
org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -76,20 +75,16 @@ public class WorkFlowLineageServiceTest {
@Test
public void testQueryWorkFlowLineageByName() {
Project project = getProject("test");
- String searchVal = "test";
+ String name = "test";
when(projectMapper.queryByCode(1L)).thenReturn(project);
- when(workFlowLineageMapper.queryByName(Mockito.any(),
Mockito.any())).thenReturn(getWorkFlowLineages());
- Map<String, Object> result =
workFlowLineageService.queryWorkFlowLineageByName(searchVal, 1L);
+
when(workFlowLineageMapper.queryWorkFlowLineageByName(Mockito.anyLong(),
Mockito.any())).thenReturn(getWorkFlowLineages());
+ Map<String, Object> result =
workFlowLineageService.queryWorkFlowLineageByName(1L, name);
List<WorkFlowLineage> workFlowLineageList = (List<WorkFlowLineage>)
result.get(Constants.DATA_LIST);
Assert.assertTrue(workFlowLineageList.size() > 0);
}
@Test
- public void testQueryWorkFlowLineageByIds() {
- Set<Integer> ids = new HashSet<>();
- ids.add(1);
- ids.add(2);
-
+ public void testQueryWorkFlowLineage() {
Project project = getProject("test");
List<ProcessLineage> processLineages = new ArrayList<>();
@@ -104,20 +99,16 @@ public class WorkFlowLineageServiceTest {
processLineages.add(processLineage);
WorkFlowLineage workFlowLineage = new WorkFlowLineage();
- workFlowLineage.setSourceWorkFlowId("");
+ workFlowLineage.setSourceWorkFlowCode("");
when(projectMapper.queryByCode(1L)).thenReturn(project);
- when(workFlowLineageMapper.queryRelationByIds(ids,
project.getCode())).thenReturn(processLineages);
-
when(workFlowLineageMapper.queryCodeRelation(processLineage.getPostTaskCode()
- , processLineage.getPreTaskVersion()
- , processLineage.getProcessDefinitionCode()
- , processLineage.getProjectCode()))
- .thenReturn(processLineages);
- when(workFlowLineageMapper
-
.queryWorkFlowLineageByCode(processLineage.getProcessDefinitionCode(),
processLineage.getProjectCode()))
- .thenReturn(workFlowLineage);
-
- Map<String, Object> result =
workFlowLineageService.queryWorkFlowLineageByIds(ids, 1L);
+
when(workFlowLineageMapper.queryProcessLineage(project.getCode())).thenReturn(processLineages);
+
when(workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(),
processLineage.getProcessDefinitionCode(),
+ processLineage.getPostTaskCode(),
processLineage.getPreTaskVersion())).thenReturn(processLineages);
+
when(workFlowLineageMapper.queryWorkFlowLineageByCode(processLineage.getProjectCode(),
processLineage.getProcessDefinitionCode()))
+ .thenReturn(workFlowLineage);
+
+ Map<String, Object> result =
workFlowLineageService.queryWorkFlowLineage(1L);
Map<String, Object> workFlowLists = (Map<String, Object>)
result.get(Constants.DATA_LIST);
Collection<WorkFlowLineage> workFlowLineages =
(Collection<WorkFlowLineage>) workFlowLists.get(Constants.WORKFLOW_LIST);
@@ -129,7 +120,7 @@ public class WorkFlowLineageServiceTest {
private List<WorkFlowLineage> getWorkFlowLineages() {
List<WorkFlowLineage> workFlowLineages = new ArrayList<>();
WorkFlowLineage workFlowLineage = new WorkFlowLineage();
- workFlowLineage.setWorkFlowId(1);
+ workFlowLineage.setWorkFlowCode(1);
workFlowLineage.setWorkFlowName("testdag");
workFlowLineages.add(workFlowLineage);
return workFlowLineages;
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java
index 678db42..27d0f8c 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java
@@ -25,12 +25,12 @@ public class ProcessLineage {
/**
* project code
*/
- private Long projectCode;
+ private long projectCode;
/**
* post task code
*/
- private Long postTaskCode;
+ private long postTaskCode;
/**
* post task version
@@ -40,7 +40,7 @@ public class ProcessLineage {
/**
* pre task code
*/
- private Long preTaskCode;
+ private long preTaskCode;
/**
* pre task version
@@ -50,46 +50,42 @@ public class ProcessLineage {
/**
* process definition code
*/
- private Long processDefinitionCode;
+ private long processDefinitionCode;
/**
* process definition version
*/
private int processDefinitionVersion;
- public Long getProjectCode() {
+ public long getProjectCode() {
return projectCode;
}
- public void setProjectCode(Long projectCode) {
+ public void setProjectCode(long projectCode) {
this.projectCode = projectCode;
}
- public Long getProcessDefinitionCode() {
- return processDefinitionCode;
- }
-
- public void setProcessDefinitionCode(Long processDefinitionCode) {
- this.processDefinitionCode = processDefinitionCode;
+ public long getPostTaskCode() {
+ return postTaskCode;
}
- public int getProcessDefinitionVersion() {
- return processDefinitionVersion;
+ public void setPostTaskCode(long postTaskCode) {
+ this.postTaskCode = postTaskCode;
}
- public void setProcessDefinitionVersion(int processDefinitionVersion) {
- this.processDefinitionVersion = processDefinitionVersion;
+ public int getPostTaskVersion() {
+ return postTaskVersion;
}
- public void setPostTaskCode(Long postTaskCode) {
- this.postTaskCode = postTaskCode;
+ public void setPostTaskVersion(int postTaskVersion) {
+ this.postTaskVersion = postTaskVersion;
}
- public Long getPreTaskCode() {
+ public long getPreTaskCode() {
return preTaskCode;
}
- public void setPreTaskCode(Long preTaskCode) {
+ public void setPreTaskCode(long preTaskCode) {
this.preTaskCode = preTaskCode;
}
@@ -101,20 +97,19 @@ public class ProcessLineage {
this.preTaskVersion = preTaskVersion;
}
- public int getPostTaskVersion() {
- return postTaskVersion;
+ public long getProcessDefinitionCode() {
+ return processDefinitionCode;
}
- public void setPostTaskVersion(int postTaskVersion) {
- this.postTaskVersion = postTaskVersion;
+ public void setProcessDefinitionCode(long processDefinitionCode) {
+ this.processDefinitionCode = processDefinitionCode;
}
- public long getPostTaskCode() {
- return postTaskCode;
+ public int getProcessDefinitionVersion() {
+ return processDefinitionVersion;
}
- public void setPostTaskCode(long postTaskCode) {
- this.postTaskCode = postTaskCode;
+ public void setProcessDefinitionVersion(int processDefinitionVersion) {
+ this.processDefinitionVersion = processDefinitionVersion;
}
-
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowLineage.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowLineage.java
index 6c2d9c3..b3fc56c 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowLineage.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowLineage.java
@@ -19,29 +19,21 @@ package org.apache.dolphinscheduler.dao.entity;
import java.util.Date;
public class WorkFlowLineage {
- private int workFlowId;
+ private long workFlowCode;
private String workFlowName;
private String workFlowPublishStatus;
private Date scheduleStartTime;
private Date scheduleEndTime;
private String crontab;
private int schedulePublishStatus;
- private String sourceWorkFlowId;
+ private String sourceWorkFlowCode;
- public String getSourceWorkFlowId() {
- return sourceWorkFlowId;
+ public long getWorkFlowCode() {
+ return workFlowCode;
}
- public void setSourceWorkFlowId(String sourceWorkFlowId) {
- this.sourceWorkFlowId = sourceWorkFlowId;
- }
-
- public int getWorkFlowId() {
- return workFlowId;
- }
-
- public void setWorkFlowId(int workFlowId) {
- this.workFlowId = workFlowId;
+ public void setWorkFlowCode(long workFlowCode) {
+ this.workFlowCode = workFlowCode;
}
public String getWorkFlowName() {
@@ -91,4 +83,12 @@ public class WorkFlowLineage {
public void setSchedulePublishStatus(int schedulePublishStatus) {
this.schedulePublishStatus = schedulePublishStatus;
}
+
+ public String getSourceWorkFlowCode() {
+ return sourceWorkFlowCode;
+ }
+
+ public void setSourceWorkFlowCode(String sourceWorkFlowCode) {
+ this.sourceWorkFlowCode = sourceWorkFlowCode;
+ }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
index d41bba5..5b4d7d9 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
@@ -19,42 +19,30 @@ package org.apache.dolphinscheduler.dao.entity;
import java.util.Objects;
public class WorkFlowRelation {
- private int sourceWorkFlowId;
- private int targetWorkFlowId;
+ private long sourceWorkFlowCode;
+ private long targetWorkFlowCode;
- public int getSourceWorkFlowId() {
- return sourceWorkFlowId;
+ public long getSourceWorkFlowCode() {
+ return sourceWorkFlowCode;
}
- public void setSourceWorkFlowId(int sourceWorkFlowId) {
- this.sourceWorkFlowId = sourceWorkFlowId;
+ public void setSourceWorkFlowCode(long sourceWorkFlowCode) {
+ this.sourceWorkFlowCode = sourceWorkFlowCode;
}
- public int getTargetWorkFlowId() {
- return targetWorkFlowId;
+ public long getTargetWorkFlowCode() {
+ return targetWorkFlowCode;
}
- public void setTargetWorkFlowId(int targetWorkFlowId) {
- this.targetWorkFlowId = targetWorkFlowId;
+ public void setTargetWorkFlowCode(long targetWorkFlowCode) {
+ this.targetWorkFlowCode = targetWorkFlowCode;
}
public WorkFlowRelation() {
}
- public WorkFlowRelation(int sourceWorkFlowId, int targetWorkFlowId) {
- this.sourceWorkFlowId = sourceWorkFlowId;
- this.targetWorkFlowId = targetWorkFlowId;
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj instanceof WorkFlowRelation
- && this.sourceWorkFlowId == ((WorkFlowRelation)
obj).getSourceWorkFlowId()
- && this.targetWorkFlowId == ((WorkFlowRelation)
obj).getTargetWorkFlowId();
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(sourceWorkFlowId, targetWorkFlowId);
+ public WorkFlowRelation(long sourceWorkFlowCode, long targetWorkFlowCode) {
+ this.sourceWorkFlowCode = sourceWorkFlowCode;
+ this.targetWorkFlowCode = targetWorkFlowCode;
}
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
index 026e8bf..d4c7838 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
@@ -22,47 +22,45 @@ import
org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.ibatis.annotations.Param;
import java.util.List;
-import java.util.Set;
public interface WorkFlowLineageMapper {
/**
* queryByName
*
- * @param searchVal searchVal
* @param projectCode projectCode
+ * @param workFlowName workFlowName
* @return WorkFlowLineage list
*/
- List<WorkFlowLineage> queryByName(@Param("searchVal") String searchVal,
@Param("projectCode") Long projectCode);
+ List<WorkFlowLineage> queryWorkFlowLineageByName(@Param("projectCode")
long projectCode, @Param("workFlowName") String workFlowName);
/**
- * queryCodeRelation
+ * queryWorkFlowLineageByCode
*
- * @param taskCode taskCode
- * @param taskVersion taskVersion
- * @param processDefinitionCode processDefinitionCode
- * @return ProcessLineage
+ * @param projectCode projectCode
+ * @param workFlowCode workFlowCode
+ * @return WorkFlowLineage
*/
- List<ProcessLineage> queryCodeRelation(
- @Param("taskCode") Long taskCode, @Param("taskVersion") int
taskVersion,
- @Param("processDefinitionCode") Long processDefinitionCode,
@Param("projectCode") Long projectCode);
+ WorkFlowLineage queryWorkFlowLineageByCode(@Param("projectCode") long
projectCode, @Param("workFlowCode") long workFlowCode);
/**
- * queryRelationByIds
+ * queryProcessLineage
*
- * @param ids ids
* @param projectCode projectCode
- * @return ProcessLineage
+ * @return ProcessLineage list
*/
- List<ProcessLineage> queryRelationByIds(@Param("ids") Set<Integer> ids,
@Param("projectCode") Long projectCode);
+ List<ProcessLineage> queryProcessLineage(@Param("projectCode") long
projectCode);
/**
- * queryWorkFlowLineageByCode
+ * queryCodeRelation
*
+ * @param taskCode taskCode
+ * @param taskVersion taskVersion
* @param processDefinitionCode processDefinitionCode
- * @param projectCode projectCode
- * @return WorkFlowLineage
+ * @return ProcessLineage list
*/
- WorkFlowLineage queryWorkFlowLineageByCode(@Param("processDefinitionCode")
Long processDefinitionCode, @Param("projectCode") Long projectCode);
-
+ List<ProcessLineage> queryCodeRelation(@Param("projectCode") long
projectCode,
+ @Param("processDefinitionCode")
long processDefinitionCode,
+ @Param("taskCode") long taskCode,
+ @Param("taskVersion") int
taskVersion);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index 7d1dbfb..366c4a6 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -19,34 +19,41 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper
namespace="org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper">
- <select id="queryByName"
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
- select tepd.id as work_flow_id,tepd.name as work_flow_name
+ <select id="queryWorkFlowLineageByName"
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
+ select tepd.code as work_flow_code,tepd.name as work_flow_name
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.code = tes.process_definition_code
where tepd.project_code = #{projectCode}
- <if test="searchVal != null and searchVal != ''">
- and tepd.name like concat('%', #{searchVal}, '%')
+ <if test="workFlowName != null and workFlowName != ''">
+ and tepd.name like concat('%', #{workFlowName}, '%')
</if>
</select>
- <select id="queryRelationByIds"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
- select ptr.project_code,
- ptr.post_task_code,
- ptr.post_task_version,
- ptr.pre_task_code,
- ptr.pre_task_version,
- ptr.process_definition_code,
- ptr.process_definition_version
- from t_ds_process_definition pd
- join t_ds_process_task_relation ptr on pd.code =
ptr.process_definition_code and pd.version =
+ <select id="queryWorkFlowLineageByCode"
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
+ select tepd.code as work_flow_code,tepd.name as work_flow_name,
+ "" as source_work_flow_code,
+ tepd.release_state as work_flow_publish_status,
+ tes.start_time as schedule_start_time,
+ tes.end_time as schedule_end_time,
+ tes.crontab as crontab,
+ tes.release_state as schedule_publish_status
+ from t_ds_process_definition tepd
+ left join t_ds_schedules tes on tepd.code = tes.process_definition_code
+ where tepd.project_code = #{projectCode} and tepd.code =
#{workFlowCode}
+ </select>
+
+ <select id="queryProcessLineage"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
+ select ptr.project_code,
+ ptr.post_task_code,
+ ptr.post_task_version,
+ ptr.pre_task_code,
+ ptr.pre_task_version,
+ ptr.process_definition_code,
ptr.process_definition_version
+ from t_ds_process_definition pd
+ join t_ds_process_task_relation ptr on pd.code =
ptr.process_definition_code
+ and pd.version = ptr.process_definition_version
where pd.project_code = #{projectCode}
- <if test="ids != null and ids.size()>0">
- and pd.id in
- <foreach collection="ids" index="index" item="i" open="("
separator="," close=")">
- #{i}
- </foreach>
- </if>
</select>
<select id="queryCodeRelation"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
@@ -58,23 +65,9 @@
process_definition_code,
process_definition_version
from t_ds_process_task_relation
- where post_task_code = #{taskCode}
- and post_task_version = #{taskVersion}
+ where project_code = #{projectCode}
and process_definition_code = #{processDefinitionCode}
- and project_code = #{projectCode}
- </select>
-
- <select id="queryWorkFlowLineageByCode"
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
- select tepd.id as work_flow_id,tepd.name as work_flow_name,
- "" as source_work_flow_id,
- tepd.release_state as work_flow_publish_status,
- tes.start_time as schedule_start_time,
- tes.end_time as schedule_end_time,
- tes.crontab as crontab,
- tes.release_state as schedule_publish_status
- from t_ds_process_definition tepd
- left join t_ds_schedules tes on tepd.code = tes.process_definition_code
- where tepd.project_code = #{projectCode} and tepd.code =
#{processDefinitionCode}
+ and post_task_code = #{taskCode}
+ and post_task_version = #{taskVersion}
</select>
-
</mapper>
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
index cc60ce4..da1a617 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import java.util.Date;
-import java.util.HashSet;
import java.util.List;
import org.junit.Assert;
@@ -81,9 +80,8 @@ public class WorkFlowLineageMapperTest {
/**
* insert
*
- * @return ProcessDefinition
*/
- private ProcessDefinition insertOneProcessDefinition() {
+ private void insertOneProcessDefinition() {
//insertOne
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
@@ -93,15 +91,13 @@ public class WorkFlowLineageMapperTest {
processDefinition.setUpdateTime(new Date());
processDefinition.setCreateTime(new Date());
processDefinitionMapper.insert(processDefinition);
- return processDefinition;
}
/**
* insert
*
- * @return Schedule
*/
- private Schedule insertOneSchedule(int id) {
+ private void insertOneSchedule(int id) {
//insertOne
Schedule schedule = new Schedule();
schedule.setStartTime(new Date());
@@ -114,38 +110,32 @@ public class WorkFlowLineageMapperTest {
schedule.setUpdateTime(new Date());
schedule.setProcessDefinitionCode(id);
scheduleMapper.insert(schedule);
- return schedule;
}
@Test
- public void testQueryByName() {
+ public void testQueryWorkFlowLineageByName() {
insertOneProcessDefinition();
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(1L);
insertOneSchedule(processDefinition.getId());
- List<WorkFlowLineage> workFlowLineages =
workFlowLineageMapper.queryByName(processDefinition.getName(),
processDefinition.getProjectCode());
+ List<WorkFlowLineage> workFlowLineages =
workFlowLineageMapper.queryWorkFlowLineageByName(processDefinition.getProjectCode(),
processDefinition.getName());
Assert.assertNotEquals(workFlowLineages.size(), 0);
}
@Test
public void testQueryCodeRelation() {
ProcessTaskRelation processTaskRelation =
insertOneProcessTaskRelation();
-
- List<ProcessLineage> workFlowLineages =
workFlowLineageMapper.queryCodeRelation(
- processTaskRelation.getPostTaskCode(),
processTaskRelation.getPostTaskVersion(),
- processTaskRelation.getProcessDefinitionCode(),
processTaskRelation.getProjectCode());
+ List<ProcessLineage> workFlowLineages =
workFlowLineageMapper.queryCodeRelation(processTaskRelation.getProjectCode(),
+ processTaskRelation.getProcessDefinitionCode(),
processTaskRelation.getPostTaskCode(),
processTaskRelation.getPostTaskVersion());
Assert.assertNotEquals(workFlowLineages.size(), 0);
}
@Test
- public void testQueryRelationByIds() {
+ public void testQueryWorkFlowLineage() {
insertOneProcessDefinition();
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(1L);
insertOneProcessTaskRelation();
-
- HashSet<Integer> set = new HashSet<>();
- set.add(processDefinition.getId());
- List<ProcessLineage> workFlowLineages =
workFlowLineageMapper.queryRelationByIds(set,
processDefinition.getProjectCode());
+ List<ProcessLineage> workFlowLineages =
workFlowLineageMapper.queryProcessLineage(processDefinition.getProjectCode());
Assert.assertNotEquals(workFlowLineages.size(), 0);
}
@@ -154,8 +144,7 @@ public class WorkFlowLineageMapperTest {
insertOneProcessDefinition();
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(1L);
insertOneSchedule(processDefinition.getId());
-
- WorkFlowLineage workFlowLineages =
workFlowLineageMapper.queryWorkFlowLineageByCode(processDefinition.getCode(),
processDefinition.getProjectCode());
+ WorkFlowLineage workFlowLineages =
workFlowLineageMapper.queryWorkFlowLineageByCode(processDefinition.getProjectCode(),
processDefinition.getCode());
Assert.assertNotNull(workFlowLineages);
}