This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split by this push:
     new 2fdeba0  [Feature][JsonSplit]modify import/export processdefinition, 
add genProcessData (#4864)
2fdeba0 is described below

commit 2fdeba09881c8aa5d0030d2ce1ed24adda86153e
Author: Simon <[email protected]>
AuthorDate: Wed Feb 24 18:55:45 2021 +0800

    [Feature][JsonSplit]modify import/export processdefinition, add 
genProcessData (#4864)
    
    * Modify Project and ProjectUser Mapper
    
    * Modify Project and ProjectUser Mapper
    
    * project_code is bigint(20)
    
    * modify ERROR name
    
    * modify saveProcessDefine, remove the duplicate code with 
createTaskAndRelation
    
    * modify import/export processdefinition, add genProcessData
---
 .../service/impl/ProcessDefinitionServiceImpl.java | 132 +++++++++++----------
 .../service/impl/TaskDefinitionServiceImpl.java    |   9 +-
 .../service/process/ProcessService.java            |  14 +++
 3 files changed, 92 insertions(+), 63 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index f7ddf06..5abd71a 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -115,7 +115,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
 
     private static final Logger logger = 
LoggerFactory.getLogger(ProcessDefinitionServiceImpl.class);
 
-    private static final String PROCESSDEFINITIONID = "processDefinitionId";
+    private static final String PROCESSDEFINITIONCODE = 
"processDefinitionCode";
 
     private static final String RELEASESTATE = "releaseState";
 
@@ -153,7 +153,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
 
     @Autowired
     TaskDefinitionLogMapper taskDefinitionLogMapper;
-    
+
     private SchedulerService schedulerService;
 
     /**
@@ -277,6 +277,12 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         }
 
         List<ProcessDefinition> resourceList = 
processDefinitionMapper.queryAllDefinitionList(project.getId());
+
+        resourceList.stream().forEach(processDefinition -> {
+            ProcessData processData = 
processService.genProcessData(processDefinition);
+            
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
+        });
+
         result.put(Constants.DATA_LIST, resourceList);
         putMsg(result, Status.SUCCESS);
 
@@ -310,6 +316,13 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         IPage<ProcessDefinition> processDefinitionIPage = 
processDefinitionMapper.queryDefineListPaging(
                 page, searchVal, userId, project.getId(), isAdmin(loginUser));
 
+        List<ProcessDefinition> records = processDefinitionIPage.getRecords();
+        records.stream().forEach(processDefinition -> {
+            ProcessData processData = 
processService.genProcessData(processDefinition);
+            
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
+        });
+        processDefinitionIPage.setRecords(records);
+
         PageInfo<ProcessDefinition> pageInfo = new PageInfo<>(pageNo, 
pageSize);
         pageInfo.setTotalCount((int) processDefinitionIPage.getTotal());
         pageInfo.setLists(processDefinitionIPage.getRecords());
@@ -340,6 +353,10 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         }
 
         ProcessDefinition processDefinition = 
processDefinitionMapper.selectById(processId);
+
+        ProcessData processData = 
processService.genProcessData(processDefinition);
+        
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
+
         if (processDefinition == null) {
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
         } else {
@@ -362,6 +379,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         }
 
         ProcessDefinition processDefinition = 
processDefinitionMapper.queryByDefineName(project.getId(), 
processDefinitionName);
+        ProcessData processData = 
processService.genProcessData(processDefinition);
+        
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
+
         if (processDefinition == null) {
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, 
processDefinitionName);
         } else {
@@ -534,8 +554,6 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             }
         }
 
-        // TODO: replace id to code
-        // ProcessDefinition processDefinition = 
processDefineMapper.deleteByCode(processDefinitionCode);
         int delete = processDefinitionMapper.deleteById(processDefinitionId);
         processTaskRelationMapper.deleteByCode(project.getCode(), 
processDefinition.getCode());
         if (delete > 0) {
@@ -657,9 +675,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             //get workflow info
             int processDefinitionId = Integer.parseInt(strProcessDefinitionId);
             ProcessDefinition processDefinition = 
processDefinitionMapper.queryByDefineId(processDefinitionId);
-            if (null != processDefinition) {
-                
processDefinitionList.add(exportProcessMetaData(processDefinitionId, 
processDefinition));
-            }
+            String processDefinitionJson = 
JSONUtils.toJsonString(processService.genProcessData(processDefinition));
+            processDefinition.setProcessDefinitionJson(processDefinitionJson);
+            
processDefinitionList.add(exportProcessMetaData(processDefinitionId, 
processDefinition));
         }
 
         return processDefinitionList;
@@ -718,15 +736,16 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
      * @return export process metadata string
      */
     public ProcessMeta exportProcessMetaData(Integer processDefinitionId, 
ProcessDefinition processDefinition) {
+        String processDefinitionJson = 
processDefinition.getProcessDefinitionJson();
         //correct task param which has data source or dependent param
-        String correctProcessDefinitionJson = 
addExportTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson());
+        String correctProcessDefinitionJson = 
addExportTaskNodeSpecialParam(processDefinitionJson);
         
processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);
 
         //export process metadata
         ProcessMeta exportProcessMeta = new ProcessMeta();
         exportProcessMeta.setProjectName(processDefinition.getProjectName());
         
exportProcessMeta.setProcessDefinitionName(processDefinition.getName());
-        
exportProcessMeta.setProcessDefinitionJson(processDefinition.getProcessDefinitionJson());
+        exportProcessMeta.setProcessDefinitionJson(processDefinitionJson);
         
exportProcessMeta.setProcessDefinitionDescription(processDefinition.getDescription());
         
exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations());
         
exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects());
@@ -963,15 +982,15 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             }
         }
 
-        //recursive sub-process parameter correction map key for old process 
id value for new process id
-        Map<Integer, Integer> subProcessIdMap = new HashMap<>();
+        //recursive sub-process parameter correction map key for old process 
code value for new process code
+        Map<Long, Long> subProcessCodeMap = new HashMap<>();
 
         List<Object> subProcessList = 
StreamUtils.asStream(jsonArray.elements())
                 .filter(elem -> 
checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText()))
                 .collect(Collectors.toList());
 
         if (CollectionUtils.isNotEmpty(subProcessList)) {
-            importSubProcess(loginUser, targetProject, jsonArray, 
subProcessIdMap);
+            importSubProcess(loginUser, targetProject, jsonArray, 
subProcessCodeMap);
         }
 
         jsonObject.set(TASKS, jsonArray);
@@ -1038,9 +1057,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
      * @param loginUser login user
      * @param targetProject target project
      * @param jsonArray process task array
-     * @param subProcessIdMap correct sub process id map
+     * @param subProcessCodeMap correct sub process id map
      */
-    private void importSubProcess(User loginUser, Project targetProject, 
ArrayNode jsonArray, Map<Integer, Integer> subProcessIdMap) {
+    private void importSubProcess(User loginUser, Project targetProject, 
ArrayNode jsonArray, Map<Long, Long> subProcessCodeMap) {
         for (int i = 0; i < jsonArray.size(); i++) {
             ObjectNode taskNode = (ObjectNode) jsonArray.path(i);
             String taskType = taskNode.path("type").asText();
@@ -1050,68 +1069,59 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             }
             //get sub process info
             ObjectNode subParams = (ObjectNode) taskNode.path("params");
-            Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt();
-            ProcessDefinition subProcess = 
processDefinitionMapper.queryByDefineId(subProcessId);
+            Long subProcessCode = 
subParams.path(PROCESSDEFINITIONCODE).asLong();
+            ProcessDefinition subProcess = 
processDefinitionMapper.queryByCode(subProcessCode);
             //check is sub process exist in db
             if (null == subProcess) {
                 continue;
             }
-            String subProcessJson = subProcess.getProcessDefinitionJson();
+
+            String subProcessJson = 
JSONUtils.toJsonString(processService.genProcessData(subProcess));
             //check current project has sub process
             ProcessDefinition currentProjectSubProcess = 
processDefinitionMapper.queryByDefineName(targetProject.getId(), 
subProcess.getName());
 
             if (null == currentProjectSubProcess) {
-                ArrayNode subJsonArray = (ArrayNode) 
JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS);
+                ArrayNode subJsonArray = (ArrayNode) 
JSONUtils.parseObject(subProcessJson).get(TASKS);
 
                 List<Object> subProcessList = 
StreamUtils.asStream(subJsonArray.elements())
                         .filter(item -> 
checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).path("type").asText()))
                         .collect(Collectors.toList());
 
                 if (CollectionUtils.isNotEmpty(subProcessList)) {
-                    importSubProcess(loginUser, targetProject, subJsonArray, 
subProcessIdMap);
+                    importSubProcess(loginUser, targetProject, subJsonArray, 
subProcessCodeMap);
                     //sub process processId correct
-                    if (!subProcessIdMap.isEmpty()) {
+                    if (!subProcessCodeMap.isEmpty()) {
 
-                        for (Map.Entry<Integer, Integer> entry : 
subProcessIdMap.entrySet()) {
-                            String oldSubProcessId = 
"\"processDefinitionId\":" + entry.getKey();
-                            String newSubProcessId = 
"\"processDefinitionId\":" + entry.getValue();
-                            subProcessJson = 
subProcessJson.replaceAll(oldSubProcessId, newSubProcessId);
+                        for (Map.Entry<Long, Long> entry : 
subProcessCodeMap.entrySet()) {
+                            String oldSubProcessCode = 
"\"processDefinitionCode\":" + entry.getKey();
+                            String newSubProcessCode = 
"\"processDefinitionCode\":" + entry.getValue();
+                            subProcessJson = 
subProcessJson.replaceAll(oldSubProcessCode, newSubProcessCode);
                         }
 
-                        subProcessIdMap.clear();
+                        subProcessCodeMap.clear();
                     }
                 }
 
-                //if sub-process recursion
-                Date now = new Date();
-                //create sub process in target project
-                ProcessDefinition processDefine = new ProcessDefinition();
-                processDefine.setName(subProcess.getName());
-                processDefine.setVersion(subProcess.getVersion());
-                processDefine.setReleaseState(subProcess.getReleaseState());
-                processDefine.setProjectId(targetProject.getId());
-                processDefine.setUserId(loginUser.getId());
-                processDefine.setProcessDefinitionJson(subProcessJson);
-                processDefine.setDescription(subProcess.getDescription());
-                processDefine.setLocations(subProcess.getLocations());
-                processDefine.setConnects(subProcess.getConnects());
-                processDefine.setTimeout(subProcess.getTimeout());
-                processDefine.setTenantId(subProcess.getTenantId());
-                processDefine.setGlobalParams(subProcess.getGlobalParams());
-                processDefine.setCreateTime(now);
-                processDefine.setUpdateTime(now);
-                processDefine.setFlag(subProcess.getFlag());
-                
processDefine.setWarningGroupId(subProcess.getWarningGroupId());
-                processDefinitionMapper.insert(processDefine);
-
-                logger.info("create sub process, project: {}, process name: 
{}", targetProject.getName(), processDefine.getName());
+                try {
+                    createProcessDefinition(loginUser
+                            , targetProject.getName(),
+                            subProcess.getName(),
+                            subProcessJson,
+                            subProcess.getDescription(),
+                            subProcess.getLocations(),
+                            subProcess.getConnects());
+                    logger.info("create sub process, project: {}, process 
name: {}", targetProject.getName(), subProcess.getName());
+
+                } catch (Exception e) {
+                    logger.error("import process meta json data: {}", 
e.getMessage(), e);
+                }
 
                 //modify task node
-                ProcessDefinition newSubProcessDefine = 
processDefinitionMapper.queryByDefineName(processDefine.getProjectId(), 
processDefine.getName());
+                ProcessDefinition newSubProcessDefine = 
processDefinitionMapper.queryByDefineName(subProcess.getProjectId(), 
subProcess.getName());
 
                 if (null != newSubProcessDefine) {
-                    subProcessIdMap.put(subProcessId, 
newSubProcessDefine.getId());
-                    subParams.put(PROCESSDEFINITIONID, 
newSubProcessDefine.getId());
+                    subProcessCodeMap.put(subProcessCode, 
newSubProcessDefine.getCode());
+                    subParams.put(PROCESSDEFINITIONCODE, 
newSubProcessDefine.getId());
                     taskNode.set("params", subParams);
                 }
             }
@@ -1187,15 +1197,12 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineId);
             return result;
         }
-
-        String processDefinitionJson = 
processDefinition.getProcessDefinitionJson();
-
-        ProcessData processData = JSONUtils.parseObject(processDefinitionJson, 
ProcessData.class);
+        ProcessData processData = 
processService.genProcessData(processDefinition);
 
         //process data check
         if (null == processData) {
             logger.error("process data is null");
-            putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson);
+            putMsg(result, Status.DATA_IS_NOT_VALID, 
JSONUtils.toJsonString(processData));
             return result;
         }
 
@@ -1233,8 +1240,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         }
 
         for (ProcessDefinition processDefinition : processDefinitionList) {
-            String processDefinitionJson = 
processDefinition.getProcessDefinitionJson();
-            ProcessData processData = 
JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
+            ProcessData processData = 
processService.genProcessData(processDefinition);
             List<TaskNode> taskNodeList = (processData.getTasks() == null) ? 
new ArrayList<>() : processData.getTasks();
             taskNodeMap.put(processDefinition.getId(), taskNodeList);
         }
@@ -1258,6 +1264,10 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         HashMap<String, Object> result = new HashMap<>();
 
         List<ProcessDefinition> resourceList = 
processDefinitionMapper.queryAllDefinitionList(projectId);
+        resourceList.stream().forEach(processDefinition -> {
+            ProcessData processData = 
processService.genProcessData(processDefinition);
+            
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
+        });
         result.put(Constants.DATA_LIST, resourceList);
         putMsg(result, Status.SUCCESS);
 
@@ -1449,16 +1459,18 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
             return result;
         } else {
-            ProcessData processData = 
JSONUtils.parseObject(processDefinition.getProcessDefinitionJson(), 
ProcessData.class);
+            ProcessData processData = 
processService.genProcessData(processDefinition);
             List<TaskNode> taskNodeList = processData.getTasks();
             taskNodeList.stream().forEach(taskNode -> {
                 taskNode.setCode(0L);
             });
+            processData.setTasks(taskNodeList);
+            String processDefinitionJson = JSONUtils.toJsonString(processData);
             return createProcessDefinition(
                     loginUser,
                     targetProject.getName(),
                     processDefinition.getName() + "_copy_" + 
DateUtils.getCurrentTimeStamp(),
-                    processDefinition.getProcessDefinitionJson(),
+                    processDefinitionJson,
                     processDefinition.getDescription(),
                     processDefinition.getLocations(),
                     processDefinition.getConnects());
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 8e32133..9a52acb 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -17,8 +17,9 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
+import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID;
+
 import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.service.BaseService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
 import org.apache.dolphinscheduler.api.utils.CheckUtils;
@@ -102,7 +103,8 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
 
         TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, 
TaskNode.class);
         checkTaskNode(result, taskNode, taskDefinitionJson);
-        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+        if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID
+                || result.get(Constants.STATUS) == 
Status.PROCESS_NODE_S_PARAMETER_INVALID) {
             return result;
         }
         TaskDefinition taskDefinition = new TaskDefinition();
@@ -218,7 +220,8 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         }
         TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, 
TaskNode.class);
         checkTaskNode(result, taskNode, taskDefinitionJson);
-        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+        if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID
+                || result.get(Constants.STATUS) == 
Status.PROCESS_NODE_S_PARAMETER_INVALID) {
             return result;
         }
         int update = processService.updateTaskDefinition(loginUser, 
project.getCode(), taskNode, taskDefinition);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index abc0037..3eb6923 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2415,6 +2415,20 @@ public class ProcessService {
         return processTaskRelations;
     }
 
+    /**
+     * generate ProcessData
+     */
+    public ProcessData genProcessData(ProcessDefinition processDefinition) {
+        List<TaskNode> taskNodes = genTaskNodeList(processDefinition.getCode()
+                , processDefinition.getVersion());
+        ProcessData processData = new ProcessData();
+        processData.setTasks(taskNodes);
+        
processData.setGlobalParams(JSONUtils.toList(processDefinition.getGlobalParams(),
 Property.class));
+        processData.setTenantId(processDefinition.getTenantId());
+        processData.setTimeout(processDefinition.getTimeout());
+        return processData;
+    }
+
     public List<TaskNode> genTaskNodeList(Long processCode, int 
processVersion) {
         List<ProcessTaskRelation> processTaskRelations = 
this.getProcessTaskRelationList(processCode, processVersion);
         Set<TaskDefinition> taskDefinitionSet = new HashSet<>();

Reply via email to