This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 38f30b5 add null check when import process has sub-process (#1806)
38f30b5 is described below
commit 38f30b5c4d2d16e4e6e8d5cf36110518c88d52e9
Author: Yelli <[email protected]>
AuthorDate: Mon Jan 13 10:11:35 2020 +0800
add null check when import process has sub-process (#1806)
* fix bug: zk hasTask method NPE
* add retMap null check for AlertSender
* add null check when import process has sub-process
---
.../api/service/ProcessDefinitionService.java | 114 +++++++++++----------
1 file changed, 58 insertions(+), 56 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 2d462b3..5fe708c 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -802,66 +802,68 @@ public class ProcessDefinitionService extends
BaseDAGService {
JSONObject subParams =
JSONUtils.parseObject(taskNode.getString("params"));
Integer subProcessId =
subParams.getInteger("processDefinitionId");
ProcessDefinition subProcess =
processDefineMapper.queryByDefineId(subProcessId);
- String subProcessJson = subProcess.getProcessDefinitionJson();
- //check current project has sub process
- ProcessDefinition currentProjectSubProcess =
processDefineMapper.queryByDefineName(targetProject.getId(),
subProcess.getName());
-
- if (null == currentProjectSubProcess) {
- JSONArray subJsonArray = (JSONArray)
JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get("tasks");
-
- List<Object> subProcessList = subJsonArray.stream()
- .filter(item ->
checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type")))
- .collect(Collectors.toList());
-
- if (CollectionUtils.isNotEmpty(subProcessList)) {
- importSubProcess(loginUser, targetProject,
subJsonArray, subProcessIdMap);
- //sub process processId correct
- if (!subProcessIdMap.isEmpty()) {
-
- for (Map.Entry<Integer, Integer> entry :
subProcessIdMap.entrySet()) {
- String oldSubProcessId =
"\"processDefinitionId\":" + entry.getKey();
- String newSubProcessId =
"\"processDefinitionId\":" + entry.getValue();
- subProcessJson =
subProcessJson.replaceAll(oldSubProcessId, newSubProcessId);
+ //check is sub process exist in db
+ if (null != subProcess) {
+ String subProcessJson =
subProcess.getProcessDefinitionJson();
+ //check current project has sub process
+ ProcessDefinition currentProjectSubProcess =
processDefineMapper.queryByDefineName(targetProject.getId(),
subProcess.getName());
+
+ if (null == currentProjectSubProcess) {
+ JSONArray subJsonArray = (JSONArray)
JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get("tasks");
+
+ List<Object> subProcessList = subJsonArray.stream()
+ .filter(item ->
checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type")))
+ .collect(Collectors.toList());
+
+ if (CollectionUtils.isNotEmpty(subProcessList)) {
+ importSubProcess(loginUser, targetProject,
subJsonArray, subProcessIdMap);
+ //sub process processId correct
+ if (!subProcessIdMap.isEmpty()) {
+
+ for (Map.Entry<Integer, Integer> entry :
subProcessIdMap.entrySet()) {
+ String oldSubProcessId =
"\"processDefinitionId\":" + entry.getKey();
+ String newSubProcessId =
"\"processDefinitionId\":" + entry.getValue();
+ subProcessJson =
subProcessJson.replaceAll(oldSubProcessId, newSubProcessId);
+ }
+
+ subProcessIdMap.clear();
}
-
- subProcessIdMap.clear();
}
- }
- //if sub-process recursion
- Date now = new Date();
- //create sub process in target project
- ProcessDefinition processDefine = new ProcessDefinition();
- processDefine.setName(subProcess.getName());
- processDefine.setVersion(subProcess.getVersion());
-
processDefine.setReleaseState(subProcess.getReleaseState());
- processDefine.setProjectId(targetProject.getId());
- processDefine.setUserId(loginUser.getId());
- processDefine.setProcessDefinitionJson(subProcessJson);
- processDefine.setDescription(subProcess.getDescription());
- processDefine.setLocations(subProcess.getLocations());
- processDefine.setConnects(subProcess.getConnects());
- processDefine.setTimeout(subProcess.getTimeout());
- processDefine.setTenantId(subProcess.getTenantId());
-
processDefine.setGlobalParams(subProcess.getGlobalParams());
- processDefine.setCreateTime(now);
- processDefine.setUpdateTime(now);
- processDefine.setFlag(subProcess.getFlag());
- processDefine.setReceivers(subProcess.getReceivers());
- processDefine.setReceiversCc(subProcess.getReceiversCc());
- processDefineMapper.insert(processDefine);
-
- logger.info("create sub process, project: {}, process
name: {}", targetProject.getName(), processDefine.getName());
-
- //modify task node
- ProcessDefinition newSubProcessDefine =
processDefineMapper.queryByDefineName(processDefine.getProjectId(),processDefine.getName());
-
- if (null != newSubProcessDefine) {
- subProcessIdMap.put(subProcessId,
newSubProcessDefine.getId());
- subParams.put("processDefinitionId",
newSubProcessDefine.getId());
- taskNode.put("params", subParams);
+ //if sub-process recursion
+ Date now = new Date();
+ //create sub process in target project
+ ProcessDefinition processDefine = new
ProcessDefinition();
+ processDefine.setName(subProcess.getName());
+ processDefine.setVersion(subProcess.getVersion());
+
processDefine.setReleaseState(subProcess.getReleaseState());
+ processDefine.setProjectId(targetProject.getId());
+ processDefine.setUserId(loginUser.getId());
+ processDefine.setProcessDefinitionJson(subProcessJson);
+
processDefine.setDescription(subProcess.getDescription());
+ processDefine.setLocations(subProcess.getLocations());
+ processDefine.setConnects(subProcess.getConnects());
+ processDefine.setTimeout(subProcess.getTimeout());
+ processDefine.setTenantId(subProcess.getTenantId());
+
processDefine.setGlobalParams(subProcess.getGlobalParams());
+ processDefine.setCreateTime(now);
+ processDefine.setUpdateTime(now);
+ processDefine.setFlag(subProcess.getFlag());
+ processDefine.setReceivers(subProcess.getReceivers());
+
processDefine.setReceiversCc(subProcess.getReceiversCc());
+ processDefineMapper.insert(processDefine);
+
+ logger.info("create sub process, project: {}, process
name: {}", targetProject.getName(), processDefine.getName());
+
+ //modify task node
+ ProcessDefinition newSubProcessDefine =
processDefineMapper.queryByDefineName(processDefine.getProjectId(),processDefine.getName());
+
+ if (null != newSubProcessDefine) {
+ subProcessIdMap.put(subProcessId,
newSubProcessDefine.getId());
+ subParams.put("processDefinitionId",
newSubProcessDefine.getId());
+ taskNode.put("params", subParams);
+ }
}
-
}
}
}