This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split by this push:
     new db96bf2  [Feature][JsonSplit] Fix batchMove of processDefinition bug 
(#5371)
db96bf2 is described below

commit db96bf2dfece57e75db6238eda13206fe335e7de
Author: JinyLeeChina <[email protected]>
AuthorDate: Sat Apr 24 15:33:23 2021 +0800

    [Feature][JsonSplit] Fix batchMove of processDefinition bug (#5371)
    
    * update SnowFlake
    
    * update processDefinite from processInstance
    
    * update processDefinite from processInstance
    
    * Fix task logger path
    
    * Fix dependTask bug
    
    * Fix batchMove of processDefinition bug
    
    * codeStyle
    
    Co-authored-by: JinyLeeChina <[email protected]>
---
 .../service/impl/ProcessDefinitionServiceImpl.java | 116 ++++++++++-----------
 .../service/process/ProcessService.java            |  54 ++--------
 2 files changed, 65 insertions(+), 105 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 75fe58a..41b97d8 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -55,6 +55,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessData;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -155,6 +156,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     @Autowired
     TaskDefinitionLogMapper taskDefinitionLogMapper;
 
+    @Autowired
     private SchedulerService schedulerService;
 
     /**
@@ -563,7 +565,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                 // To check resources whether they are already cancel 
authorized or deleted
                 String resourceIds = processDefinition.getResourceIds();
                 if (StringUtils.isNotBlank(resourceIds)) {
-                    Integer[] resourceIdArray = 
Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new);
+                    Integer[] resourceIdArray = 
Arrays.stream(resourceIds.split(Constants.COMMA)).map(Integer::parseInt).toArray(Integer[]::new);
                     PermissionCheck<Integer> permissionCheck = new 
PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService, 
resourceIdArray, loginUser.getId(), logger);
                     try {
                         permissionCheck.checkPermission();
@@ -1463,8 +1465,6 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             return checkResult;
         }
 
-        // TODO:
-        //  Project targetProject = 
projectMapper.queryDetailByCode(targetProjectCode);
         Project targetProject = projectMapper.queryDetailById(targetProjectId);
         if (targetProject == null) {
             putMsg(result, Status.PROJECT_NOT_FOUNT, targetProjectId);
@@ -1501,7 +1501,6 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                                                           int targetProjectId) 
{
         Map<String, Object> result = new HashMap<>();
         List<String> failedProcessList = new ArrayList<>();
-
         //check src project auth
         Map<String, Object> checkResult = checkProjectAndAuth(loginUser, 
projectName);
         if (checkResult != null) {
@@ -1513,8 +1512,6 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             return result;
         }
 
-        // TODO :
-        //  Project targetProject = 
projectMapper.queryDetailByCode(targetProjectCode);
         Project targetProject = projectMapper.queryDetailById(targetProjectId);
         if (targetProject == null) {
             putMsg(result, Status.PROJECT_NOT_FOUNT, targetProjectId);
@@ -1528,14 +1525,63 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             }
         }
 
-        String[] processDefinitionIdList = 
processDefinitionIds.split(Constants.COMMA);
-        doBatchMoveProcessDefinition(targetProject, failedProcessList, 
processDefinitionIdList);
+        Integer[] definitionIds = 
Arrays.stream(processDefinitionIds.split(Constants.COMMA)).map(Integer::parseInt).toArray(Integer[]::new);
+        List<ProcessDefinition> processDefinitionList = 
processDefinitionMapper.queryDefinitionListByIdList(definitionIds);
+        for (ProcessDefinition processDefinition : processDefinitionList) {
+            ProcessDefinitionLog processDefinitionLog = 
moveProcessDefinition(loginUser, targetProject.getCode(), processDefinition, 
result, failedProcessList);
+            if (processDefinitionLog != null) {
+                moveTaskRelation(loginUser, 
processDefinition.getProjectCode(), processDefinitionLog);
+            }
+        }
 
         checkBatchOperateResult(projectName, targetProject.getName(), result, 
failedProcessList, false);
-
         return result;
     }
 
+    private ProcessDefinitionLog moveProcessDefinition(User loginUser, Long 
targetProjectCode, ProcessDefinition processDefinition,
+                                                       Map<String, Object> 
result, List<String> failedProcessList) {
+        try {
+            Integer version = 
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+            ProcessDefinitionLog processDefinitionLog = new 
ProcessDefinitionLog(processDefinition);
+            processDefinitionLog.setVersion(version == null || version == 0 ? 
1 : version + 1);
+            processDefinitionLog.setProjectCode(targetProjectCode);
+            processDefinitionLog.setOperator(loginUser.getId());
+            Date now = new Date();
+            processDefinitionLog.setOperateTime(now);
+            processDefinitionLog.setUpdateTime(now);
+            processDefinitionLog.setCreateTime(now);
+            int update = 
processDefinitionMapper.updateById(processDefinitionLog);
+            int insertLog = 
processDefinitionLogMapper.insert(processDefinitionLog);
+            if ((insertLog & update) > 0) {
+                putMsg(result, Status.SUCCESS);
+            } else {
+                failedProcessList.add(processDefinition.getId() + "[" + 
processDefinition.getName() + "]");
+                putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            }
+            return processDefinitionLog;
+        } catch (Exception e) {
+            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            failedProcessList.add(processDefinition.getId() + "[" + 
processDefinition.getName() + "]");
+            logger.error("move processDefinition error: {}", e.getMessage(), 
e);
+        }
+        return null;
+    }
+
+    private void moveTaskRelation(User loginUser, Long projectCode, 
ProcessDefinitionLog processDefinition) {
+        List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinition.getCode());
+        if (!processTaskRelationList.isEmpty()) {
+            processTaskRelationMapper.deleteByCode(projectCode, 
processDefinition.getCode());
+        }
+        Date now = new Date();
+        for (ProcessTaskRelation processTaskRelation : 
processTaskRelationList) {
+            
processTaskRelation.setProjectCode(processDefinition.getProjectCode());
+            
processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
+            processTaskRelation.setCreateTime(now);
+            processTaskRelation.setUpdateTime(now);
+            processService.saveTaskRelation(loginUser, processTaskRelation);
+        }
+    }
+
     /**
      * switch the defined process definition verison
      *
@@ -1586,30 +1632,6 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     }
 
     /**
-     * do batch move process definition
-     *
-     * @param targetProject targetProject
-     * @param failedProcessList failedProcessList
-     * @param processDefinitionIdList processDefinitionIdList
-     */
-    private void doBatchMoveProcessDefinition(Project targetProject, 
List<String> failedProcessList, String[] processDefinitionIdList) {
-        for (String processDefinitionId : processDefinitionIdList) {
-            try {
-                Map<String, Object> moveProcessDefinitionResult =
-                        
moveProcessDefinition(Integer.valueOf(processDefinitionId), targetProject);
-                if 
(!Status.SUCCESS.equals(moveProcessDefinitionResult.get(Constants.STATUS))) {
-                    setFailedProcessList(failedProcessList, 
processDefinitionId);
-                    logger.error((String) 
moveProcessDefinitionResult.get(Constants.MSG));
-                }
-            } catch (Exception e) {
-                setFailedProcessList(failedProcessList, processDefinitionId);
-                logger.error("move processDefinition error: {}", 
e.getMessage(), e);
-
-            }
-        }
-    }
-
-    /**
      * batch copy process definition
      *
      * @param loginUser loginUser
@@ -1669,34 +1691,6 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     }
 
     /**
-     * move process definition
-     *
-     * @param processId processId
-     * @param targetProject targetProject
-     * @return move result code
-     */
-    private Map<String, Object> moveProcessDefinition(Integer processId,
-                                                      Project targetProject) {
-
-        Map<String, Object> result = new HashMap<>();
-
-        ProcessDefinition processDefinition = 
processDefinitionMapper.selectById(processId);
-        if (processDefinition == null) {
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
-            return result;
-        }
-
-        processDefinition.setProjectId(targetProject.getId());
-        processDefinition.setUpdateTime(new Date());
-        if (processDefinitionMapper.updateById(processDefinition) > 0) {
-            putMsg(result, Status.SUCCESS);
-        } else {
-            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
-        }
-        return result;
-    }
-
-    /**
      * check batch operate result
      *
      * @param srcProjectName srcProjectName
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 7433824..7868348 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2370,16 +2370,20 @@ public class ProcessService {
             }
         }
         for (ProcessTaskRelation processTaskRelation : builderRelationList) {
-            processTaskRelationMapper.insert(processTaskRelation);
-            // save process task relation log
-            ProcessTaskRelationLog processTaskRelationLog = new 
ProcessTaskRelationLog(processTaskRelation);
-            processTaskRelationLog.setOperator(operator.getId());
-            processTaskRelationLog.setOperateTime(now);
-            processTaskRelationLogMapper.insert(processTaskRelationLog);
+            saveTaskRelation(operator, processTaskRelation);
         }
         return 0;
     }
 
+    public void saveTaskRelation(User operator, ProcessTaskRelation 
processTaskRelation) {
+        processTaskRelationMapper.insert(processTaskRelation);
+        // save process task relation log
+        ProcessTaskRelationLog processTaskRelationLog = new 
ProcessTaskRelationLog(processTaskRelation);
+        processTaskRelationLog.setOperator(operator.getId());
+        processTaskRelationLog.setOperateTime(new Date());
+        processTaskRelationLogMapper.insert(processTaskRelationLog);
+    }
+
     public int saveTaskDefinition(User operator, Long projectCode, TaskNode 
taskNode, TaskDefinition taskDefinition) {
         Date now = new Date();
         taskDefinition.setProjectCode(projectCode);
@@ -2499,44 +2503,6 @@ public class ProcessService {
     }
 
     /**
-     * getTaskNodeFromTaskInstance
-     * return null if task definition do not exists
-     *
-     * @param taskInstance
-     * @return
-     */
-    public TaskNode getTaskNodeFromTaskInstance(TaskInstance taskInstance) {
-        TaskNode taskNode = new TaskNode();
-        ProcessInstance processInstance = 
processInstanceMapper.selectById(taskInstance.getProcessInstanceId());
-        TaskDefinition taskDefinition = 
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
-                taskInstance.getTaskCode(),
-                taskInstance.getTaskDefinitionVersion());
-        if (taskDefinition == null) {
-            return null;
-        }
-        List<ProcessTaskRelationLog> taskRelationList = 
processTaskRelationLogMapper.queryByProcessCodeAndVersion(
-                processInstance.getProcessDefinitionCode(), 
processInstance.getProcessDefinitionVersion()
-        );
-        Map<Long, Integer> taskCodeMap = new HashedMap();
-
-        taskRelationList.forEach(relation -> 
taskCodeMap.putIfAbsent(relation.getPostTaskCode(), 
relation.getPostTaskVersion()));
-
-        taskNode.setCode(taskDefinition.getCode());
-        taskNode.setVersion(taskDefinition.getVersion());
-        taskNode.setName(taskDefinition.getName());
-        taskNode.setName(taskDefinition.getName());
-        taskNode.setDesc(taskDefinition.getDescription());
-        taskNode.setType(taskDefinition.getTaskType());
-        taskNode.setRunFlag(taskDefinition.getFlag() == Flag.YES ? 
Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : Constants.FLOWNODE_RUN_FLAG_NORMAL);
-        taskNode.setMaxRetryTimes(taskDefinition.getFailRetryTimes());
-        taskNode.setRetryInterval(taskDefinition.getFailRetryInterval());
-        taskNode.setParams(taskDefinition.getTaskParams());
-        taskNode.setTaskInstancePriority(taskDefinition.getTaskPriority());
-        taskNode.setWorkerGroup(taskDefinition.getWorkerGroup());
-        return taskNode;
-    }
-
-    /**
      * find task definition by code and verision
      *
      * @param taskCode

Reply via email to