This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new 2fdeba0 [Feature][JsonSplit]modify import/export processdefinition,
add genProcessData (#4864)
2fdeba0 is described below
commit 2fdeba09881c8aa5d0030d2ce1ed24adda86153e
Author: Simon <[email protected]>
AuthorDate: Wed Feb 24 18:55:45 2021 +0800
[Feature][JsonSplit]modify import/export processdefinition, add
genProcessData (#4864)
* Modify Project and ProjectUser Mapper
* Modify Project and ProjectUser Mapper
* project_code is bigint(20)
* modify ERROR name
* modify saveProcessDefine, remove the duplicate code with
createTaskAndRelation
* modify import/export processdefinition, add genProcessData
---
.../service/impl/ProcessDefinitionServiceImpl.java | 132 +++++++++++----------
.../service/impl/TaskDefinitionServiceImpl.java | 9 +-
.../service/process/ProcessService.java | 14 +++
3 files changed, 92 insertions(+), 63 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index f7ddf06..5abd71a 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -115,7 +115,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
private static final Logger logger =
LoggerFactory.getLogger(ProcessDefinitionServiceImpl.class);
- private static final String PROCESSDEFINITIONID = "processDefinitionId";
+ private static final String PROCESSDEFINITIONCODE =
"processDefinitionCode";
private static final String RELEASESTATE = "releaseState";
@@ -153,7 +153,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
@Autowired
TaskDefinitionLogMapper taskDefinitionLogMapper;
-
+
private SchedulerService schedulerService;
/**
@@ -277,6 +277,12 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
List<ProcessDefinition> resourceList =
processDefinitionMapper.queryAllDefinitionList(project.getId());
+
+ resourceList.stream().forEach(processDefinition -> {
+ ProcessData processData =
processService.genProcessData(processDefinition);
+
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
+ });
+
result.put(Constants.DATA_LIST, resourceList);
putMsg(result, Status.SUCCESS);
@@ -310,6 +316,13 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
IPage<ProcessDefinition> processDefinitionIPage =
processDefinitionMapper.queryDefineListPaging(
page, searchVal, userId, project.getId(), isAdmin(loginUser));
+ List<ProcessDefinition> records = processDefinitionIPage.getRecords();
+ records.stream().forEach(processDefinition -> {
+ ProcessData processData =
processService.genProcessData(processDefinition);
+
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
+ });
+ processDefinitionIPage.setRecords(records);
+
PageInfo<ProcessDefinition> pageInfo = new PageInfo<>(pageNo,
pageSize);
pageInfo.setTotalCount((int) processDefinitionIPage.getTotal());
pageInfo.setLists(processDefinitionIPage.getRecords());
@@ -340,6 +353,10 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition =
processDefinitionMapper.selectById(processId);
+
+ ProcessData processData =
processService.genProcessData(processDefinition);
+
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
+
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
} else {
@@ -362,6 +379,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByDefineName(project.getId(),
processDefinitionName);
+ ProcessData processData =
processService.genProcessData(processDefinition);
+
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
+
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
processDefinitionName);
} else {
@@ -534,8 +554,6 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
}
- // TODO: replace id to code
- // ProcessDefinition processDefinition =
processDefineMapper.deleteByCode(processDefinitionCode);
int delete = processDefinitionMapper.deleteById(processDefinitionId);
processTaskRelationMapper.deleteByCode(project.getCode(),
processDefinition.getCode());
if (delete > 0) {
@@ -657,9 +675,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
//get workflow info
int processDefinitionId = Integer.parseInt(strProcessDefinitionId);
ProcessDefinition processDefinition =
processDefinitionMapper.queryByDefineId(processDefinitionId);
- if (null != processDefinition) {
-
processDefinitionList.add(exportProcessMetaData(processDefinitionId,
processDefinition));
- }
+ String processDefinitionJson =
JSONUtils.toJsonString(processService.genProcessData(processDefinition));
+ processDefinition.setProcessDefinitionJson(processDefinitionJson);
+
processDefinitionList.add(exportProcessMetaData(processDefinitionId,
processDefinition));
}
return processDefinitionList;
@@ -718,15 +736,16 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
* @return export process metadata string
*/
public ProcessMeta exportProcessMetaData(Integer processDefinitionId,
ProcessDefinition processDefinition) {
+ String processDefinitionJson =
processDefinition.getProcessDefinitionJson();
//correct task param which has data source or dependent param
- String correctProcessDefinitionJson =
addExportTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson());
+ String correctProcessDefinitionJson =
addExportTaskNodeSpecialParam(processDefinitionJson);
processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);
//export process metadata
ProcessMeta exportProcessMeta = new ProcessMeta();
exportProcessMeta.setProjectName(processDefinition.getProjectName());
exportProcessMeta.setProcessDefinitionName(processDefinition.getName());
-
exportProcessMeta.setProcessDefinitionJson(processDefinition.getProcessDefinitionJson());
+ exportProcessMeta.setProcessDefinitionJson(processDefinitionJson);
exportProcessMeta.setProcessDefinitionDescription(processDefinition.getDescription());
exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations());
exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects());
@@ -963,15 +982,15 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
}
- //recursive sub-process parameter correction map key for old process
id value for new process id
- Map<Integer, Integer> subProcessIdMap = new HashMap<>();
+ //recursive sub-process parameter correction map key for old process
code value for new process code
+ Map<Long, Long> subProcessCodeMap = new HashMap<>();
List<Object> subProcessList =
StreamUtils.asStream(jsonArray.elements())
.filter(elem ->
checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText()))
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(subProcessList)) {
- importSubProcess(loginUser, targetProject, jsonArray,
subProcessIdMap);
+ importSubProcess(loginUser, targetProject, jsonArray,
subProcessCodeMap);
}
jsonObject.set(TASKS, jsonArray);
@@ -1038,9 +1057,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
* @param loginUser login user
* @param targetProject target project
* @param jsonArray process task array
- * @param subProcessIdMap correct sub process id map
+ * @param subProcessCodeMap correct sub process id map
*/
- private void importSubProcess(User loginUser, Project targetProject,
ArrayNode jsonArray, Map<Integer, Integer> subProcessIdMap) {
+ private void importSubProcess(User loginUser, Project targetProject,
ArrayNode jsonArray, Map<Long, Long> subProcessCodeMap) {
for (int i = 0; i < jsonArray.size(); i++) {
ObjectNode taskNode = (ObjectNode) jsonArray.path(i);
String taskType = taskNode.path("type").asText();
@@ -1050,68 +1069,59 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
//get sub process info
ObjectNode subParams = (ObjectNode) taskNode.path("params");
- Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt();
- ProcessDefinition subProcess =
processDefinitionMapper.queryByDefineId(subProcessId);
+ Long subProcessCode =
subParams.path(PROCESSDEFINITIONCODE).asLong();
+ ProcessDefinition subProcess =
processDefinitionMapper.queryByCode(subProcessCode);
//check is sub process exist in db
if (null == subProcess) {
continue;
}
- String subProcessJson = subProcess.getProcessDefinitionJson();
+
+ String subProcessJson =
JSONUtils.toJsonString(processService.genProcessData(subProcess));
//check current project has sub process
ProcessDefinition currentProjectSubProcess =
processDefinitionMapper.queryByDefineName(targetProject.getId(),
subProcess.getName());
if (null == currentProjectSubProcess) {
- ArrayNode subJsonArray = (ArrayNode)
JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS);
+ ArrayNode subJsonArray = (ArrayNode)
JSONUtils.parseObject(subProcessJson).get(TASKS);
List<Object> subProcessList =
StreamUtils.asStream(subJsonArray.elements())
.filter(item ->
checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).path("type").asText()))
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(subProcessList)) {
- importSubProcess(loginUser, targetProject, subJsonArray,
subProcessIdMap);
+ importSubProcess(loginUser, targetProject, subJsonArray,
subProcessCodeMap);
//sub process processId correct
- if (!subProcessIdMap.isEmpty()) {
+ if (!subProcessCodeMap.isEmpty()) {
- for (Map.Entry<Integer, Integer> entry :
subProcessIdMap.entrySet()) {
- String oldSubProcessId =
"\"processDefinitionId\":" + entry.getKey();
- String newSubProcessId =
"\"processDefinitionId\":" + entry.getValue();
- subProcessJson =
subProcessJson.replaceAll(oldSubProcessId, newSubProcessId);
+ for (Map.Entry<Long, Long> entry :
subProcessCodeMap.entrySet()) {
+ String oldSubProcessCode =
"\"processDefinitionCode\":" + entry.getKey();
+ String newSubProcessCode =
"\"processDefinitionCode\":" + entry.getValue();
+ subProcessJson =
subProcessJson.replaceAll(oldSubProcessCode, newSubProcessCode);
}
- subProcessIdMap.clear();
+ subProcessCodeMap.clear();
}
}
- //if sub-process recursion
- Date now = new Date();
- //create sub process in target project
- ProcessDefinition processDefine = new ProcessDefinition();
- processDefine.setName(subProcess.getName());
- processDefine.setVersion(subProcess.getVersion());
- processDefine.setReleaseState(subProcess.getReleaseState());
- processDefine.setProjectId(targetProject.getId());
- processDefine.setUserId(loginUser.getId());
- processDefine.setProcessDefinitionJson(subProcessJson);
- processDefine.setDescription(subProcess.getDescription());
- processDefine.setLocations(subProcess.getLocations());
- processDefine.setConnects(subProcess.getConnects());
- processDefine.setTimeout(subProcess.getTimeout());
- processDefine.setTenantId(subProcess.getTenantId());
- processDefine.setGlobalParams(subProcess.getGlobalParams());
- processDefine.setCreateTime(now);
- processDefine.setUpdateTime(now);
- processDefine.setFlag(subProcess.getFlag());
-
processDefine.setWarningGroupId(subProcess.getWarningGroupId());
- processDefinitionMapper.insert(processDefine);
-
- logger.info("create sub process, project: {}, process name:
{}", targetProject.getName(), processDefine.getName());
+ try {
+ createProcessDefinition(loginUser
+ , targetProject.getName(),
+ subProcess.getName(),
+ subProcessJson,
+ subProcess.getDescription(),
+ subProcess.getLocations(),
+ subProcess.getConnects());
+ logger.info("create sub process, project: {}, process
name: {}", targetProject.getName(), subProcess.getName());
+
+ } catch (Exception e) {
+ logger.error("import process meta json data: {}",
e.getMessage(), e);
+ }
//modify task node
- ProcessDefinition newSubProcessDefine =
processDefinitionMapper.queryByDefineName(processDefine.getProjectId(),
processDefine.getName());
+ ProcessDefinition newSubProcessDefine =
processDefinitionMapper.queryByDefineName(subProcess.getProjectId(),
subProcess.getName());
if (null != newSubProcessDefine) {
- subProcessIdMap.put(subProcessId,
newSubProcessDefine.getId());
- subParams.put(PROCESSDEFINITIONID,
newSubProcessDefine.getId());
+ subProcessCodeMap.put(subProcessCode,
newSubProcessDefine.getCode());
+ subParams.put(PROCESSDEFINITIONCODE,
newSubProcessDefine.getId());
taskNode.set("params", subParams);
}
}
@@ -1187,15 +1197,12 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineId);
return result;
}
-
- String processDefinitionJson =
processDefinition.getProcessDefinitionJson();
-
- ProcessData processData = JSONUtils.parseObject(processDefinitionJson,
ProcessData.class);
+ ProcessData processData =
processService.genProcessData(processDefinition);
//process data check
if (null == processData) {
logger.error("process data is null");
- putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson);
+ putMsg(result, Status.DATA_IS_NOT_VALID,
JSONUtils.toJsonString(processData));
return result;
}
@@ -1233,8 +1240,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
for (ProcessDefinition processDefinition : processDefinitionList) {
- String processDefinitionJson =
processDefinition.getProcessDefinitionJson();
- ProcessData processData =
JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
+ ProcessData processData =
processService.genProcessData(processDefinition);
List<TaskNode> taskNodeList = (processData.getTasks() == null) ?
new ArrayList<>() : processData.getTasks();
taskNodeMap.put(processDefinition.getId(), taskNodeList);
}
@@ -1258,6 +1264,10 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
HashMap<String, Object> result = new HashMap<>();
List<ProcessDefinition> resourceList =
processDefinitionMapper.queryAllDefinitionList(projectId);
+ resourceList.stream().forEach(processDefinition -> {
+ ProcessData processData =
processService.genProcessData(processDefinition);
+
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
+ });
result.put(Constants.DATA_LIST, resourceList);
putMsg(result, Status.SUCCESS);
@@ -1449,16 +1459,18 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
return result;
} else {
- ProcessData processData =
JSONUtils.parseObject(processDefinition.getProcessDefinitionJson(),
ProcessData.class);
+ ProcessData processData =
processService.genProcessData(processDefinition);
List<TaskNode> taskNodeList = processData.getTasks();
taskNodeList.stream().forEach(taskNode -> {
taskNode.setCode(0L);
});
+ processData.setTasks(taskNodeList);
+ String processDefinitionJson = JSONUtils.toJsonString(processData);
return createProcessDefinition(
loginUser,
targetProject.getName(),
processDefinition.getName() + "_copy_" +
DateUtils.getCurrentTimeStamp(),
- processDefinition.getProcessDefinitionJson(),
+ processDefinitionJson,
processDefinition.getDescription(),
processDefinition.getLocations(),
processDefinition.getConnects());
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 8e32133..9a52acb 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -17,8 +17,9 @@
package org.apache.dolphinscheduler.api.service.impl;
+import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID;
+
import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
@@ -102,7 +103,8 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson,
TaskNode.class);
checkTaskNode(result, taskNode, taskDefinitionJson);
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID
+ || result.get(Constants.STATUS) ==
Status.PROCESS_NODE_S_PARAMETER_INVALID) {
return result;
}
TaskDefinition taskDefinition = new TaskDefinition();
@@ -218,7 +220,8 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
}
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson,
TaskNode.class);
checkTaskNode(result, taskNode, taskDefinitionJson);
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID
+ || result.get(Constants.STATUS) ==
Status.PROCESS_NODE_S_PARAMETER_INVALID) {
return result;
}
int update = processService.updateTaskDefinition(loginUser,
project.getCode(), taskNode, taskDefinition);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index abc0037..3eb6923 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2415,6 +2415,20 @@ public class ProcessService {
return processTaskRelations;
}
+ /**
+ * generate ProcessData
+ */
+ public ProcessData genProcessData(ProcessDefinition processDefinition) {
+ List<TaskNode> taskNodes = genTaskNodeList(processDefinition.getCode()
+ , processDefinition.getVersion());
+ ProcessData processData = new ProcessData();
+ processData.setTasks(taskNodes);
+
processData.setGlobalParams(JSONUtils.toList(processDefinition.getGlobalParams(),
Property.class));
+ processData.setTenantId(processDefinition.getTenantId());
+ processData.setTimeout(processDefinition.getTimeout());
+ return processData;
+ }
+
public List<TaskNode> genTaskNodeList(Long processCode, int
processVersion) {
List<ProcessTaskRelation> processTaskRelations =
this.getProcessTaskRelationList(processCode, processVersion);
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();