This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1144360  add convert dependent/conditions (#6710)
1144360 is described below

commit 1144360257cc901c70962971a0777930dcb39e3c
Author: JinYong Li <[email protected]>
AuthorDate: Fri Nov 5 23:02:46 2021 +0800

    add convert dependent/conditions (#6710)
---
 .../dolphinscheduler/dao/upgrade/UpgradeDao.java   | 110 ++++++++++++++++++---
 1 file changed, 99 insertions(+), 11 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
index a7acafc..235f223 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.ConditionType;
 import org.apache.dolphinscheduler.common.enums.DbType;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
@@ -54,6 +55,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import javax.sql.DataSource;
@@ -61,6 +63,8 @@ import javax.sql.DataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -599,7 +603,9 @@ public abstract class UpgradeDao extends AbstractBaseDao {
             List<ProcessDefinitionLog> processDefinitionLogs = new 
ArrayList<>();
             List<ProcessTaskRelationLog> processTaskRelationLogs = new 
ArrayList<>();
             List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
-            splitProcessDefinitionJson(processDefinitions, 
processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs, 
taskDefinitionLogs);
+            Map<Integer, Map<Long, Map<String, Long>>> processTaskMap = new 
HashMap<>();
+            splitProcessDefinitionJson(processDefinitions, 
processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs, 
taskDefinitionLogs, processTaskMap);
+            convertDependence(taskDefinitionLogs, projectIdCodeMap, 
processTaskMap);
 
             // execute json split
             
jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(), 
processDefinitionLogs);
@@ -614,7 +620,8 @@ public abstract class UpgradeDao extends AbstractBaseDao {
                                             Map<Integer, String> 
processDefinitionJsonMap,
                                             List<ProcessDefinitionLog> 
processDefinitionLogs,
                                             List<ProcessTaskRelationLog> 
processTaskRelationLogs,
-                                            List<TaskDefinitionLog> 
taskDefinitionLogs) throws Exception {
+                                            List<TaskDefinitionLog> 
taskDefinitionLogs,
+                                            Map<Integer, Map<Long, Map<String, 
Long>>> processTaskMap) throws Exception {
         Map<Integer, ProcessDefinition> processDefinitionMap = 
processDefinitions.stream()
                 .collect(Collectors.toMap(ProcessDefinition::getId, 
processDefinition -> processDefinition));
         Date now = new Date();
@@ -634,6 +641,8 @@ public abstract class UpgradeDao extends AbstractBaseDao {
             Map<String, Long> taskIdCodeMap = new HashMap<>();
             Map<String, List<String>> taskNamePreMap = new HashMap<>();
             Map<String, Long> taskNameCodeMap = new HashMap<>();
+            Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = new 
HashMap<>();
+            List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
             ArrayNode tasks = 
JSONUtils.parseArray(jsonObject.get("tasks").toString());
             for (int i = 0; i < tasks.size(); i++) {
                 ObjectNode task = (ObjectNode) tasks.path(i);
@@ -647,7 +656,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
                     }
                     param.put("conditionResult", task.get("conditionResult"));
                     param.put("dependence", task.get("dependence"));
-                    taskDefinitionLog.setTaskParams(param.toString());
+                    taskDefinitionLog.setTaskParams(param.asText());
                 }
                 TaskTimeoutParameter timeout = 
JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), 
TaskTimeoutParameter.class);
                 if (timeout != null) {
@@ -655,15 +664,15 @@ public abstract class UpgradeDao extends AbstractBaseDao {
                     taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ? 
TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
                     
taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy());
                 }
-                
taskDefinitionLog.setDescription(task.get("description").toString());
-                
taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").toString())
 ? Flag.YES : Flag.NO);
-                taskDefinitionLog.setTaskType(task.get("type").toString());
+                
taskDefinitionLog.setDescription(task.get("description").asText());
+                
taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText())
 ? Flag.YES : Flag.NO);
+                taskDefinitionLog.setTaskType(task.get("type").asText());
                 
taskDefinitionLog.setFailRetryInterval(task.get("retryInterval").asInt());
                 
taskDefinitionLog.setFailRetryTimes(task.get("maxRetryTimes").asInt());
                 
taskDefinitionLog.setTaskPriority(JSONUtils.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")),
 Priority.class));
-                String name = task.get("name").toString();
+                String name = task.get("name").asText();
                 taskDefinitionLog.setName(name);
-                
taskDefinitionLog.setWorkerGroup(task.get("workerGroup").toString());
+                
taskDefinitionLog.setWorkerGroup(task.get("workerGroup").asText());
                 long taskCode = SnowFlakeUtils.getInstance().nextId();
                 taskDefinitionLog.setCode(taskCode);
                 taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
@@ -675,12 +684,14 @@ public abstract class UpgradeDao extends AbstractBaseDao {
                 taskDefinitionLog.setOperateTime(now);
                 taskDefinitionLog.setCreateTime(now);
                 taskDefinitionLog.setUpdateTime(now);
-                taskDefinitionLogs.add(taskDefinitionLog);
-                taskIdCodeMap.put(task.get("id").toString(), taskCode);
-                List<String> preTasks = 
JSONUtils.toList(task.get("preTasks").toString(), String.class);
+                taskDefinitionLogList.add(taskDefinitionLog);
+                taskIdCodeMap.put(task.get("id").asText(), taskCode);
+                List<String> preTasks = 
JSONUtils.toList(task.get("preTasks").asText(), String.class);
                 taskNamePreMap.put(name, preTasks);
                 taskNameCodeMap.put(name, taskCode);
             }
+            convertConditions(taskDefinitionLogList, taskNameCodeMap);
+            taskDefinitionLogs.addAll(taskDefinitionLogList);
             
processDefinition.setLocations(convertLocations(processDefinition.getLocations(),
 taskIdCodeMap));
             ProcessDefinitionLog processDefinitionLog = new 
ProcessDefinitionLog(processDefinition);
             processDefinitionLog.setOperator(1);
@@ -688,6 +699,45 @@ public abstract class UpgradeDao extends AbstractBaseDao {
             processDefinitionLog.setUpdateTime(now);
             processDefinitionLogs.add(processDefinitionLog);
             handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap, 
processDefinition, processTaskRelationLogs);
+            processCodeTaskNameCodeMap.put(processDefinition.getCode(), 
taskNameCodeMap);
+            processTaskMap.put(entry.getKey(), processCodeTaskNameCodeMap);
+        }
+    }
+
+    public void convertConditions(List<TaskDefinitionLog> 
taskDefinitionLogList, Map<String, Long> taskNameCodeMap) throws Exception {
+        for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) {
+            if 
(TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType())) {
+                ObjectMapper objectMapper = new ObjectMapper();
+                ObjectNode taskParams = 
JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
+                // reset conditionResult
+                ObjectNode conditionResult = (ObjectNode) 
taskParams.get("conditionResult");
+                List<String> successNode = 
JSONUtils.toList(conditionResult.get("successNode").toString(), String.class);
+                List<Long> nodeCode = new ArrayList<>();
+                successNode.forEach(node -> 
nodeCode.add(taskNameCodeMap.get(node)));
+                conditionResult.set("successNode", 
objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
+                List<String> failedNode = 
JSONUtils.toList(conditionResult.get("failedNode").toString(), String.class);
+                nodeCode.clear();
+                failedNode.forEach(node -> 
nodeCode.add(taskNameCodeMap.get(node)));
+                conditionResult.set("failedNode", 
objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
+                // reset dependItemList
+                ObjectNode dependence = (ObjectNode) 
taskParams.get("dependence");
+                ArrayNode dependTaskList = 
JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
+                for (int i = 0; i < dependTaskList.size(); i++) {
+                    ObjectNode dependTask = (ObjectNode) 
dependTaskList.path(i);
+                    ArrayNode dependItemList = 
JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
+                    for (int j = 0; j < dependItemList.size(); j++) {
+                        ObjectNode dependItem = (ObjectNode) 
dependItemList.path(j);
+                        JsonNode depTasks = dependItem.get("depTasks");
+                        dependItem.put("depTaskCode", 
taskNameCodeMap.get(depTasks.asText()));
+                        dependItem.remove("depTasks");
+                        dependItemList.set(j, dependItem);
+                    }
+                    dependTask.put("dependItemList", dependItemList);
+                    dependTaskList.set(i, dependTask);
+                }
+                dependence.put("dependTaskList", dependTaskList);
+                
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
+            }
         }
     }
 
@@ -709,6 +759,44 @@ public abstract class UpgradeDao extends AbstractBaseDao {
         return jsonNodes.toString();
     }
 
+    public void convertDependence(List<TaskDefinitionLog> taskDefinitionLogs,
+                                   Map<Integer, Long> projectIdCodeMap,
+                                   Map<Integer, Map<Long, Map<String, Long>>> 
processTaskMap) {
+        for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+            if 
(TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) {
+                ObjectNode taskParams = 
JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
+                ObjectNode dependence = (ObjectNode) 
taskParams.get("dependence");
+                ArrayNode dependTaskList = 
JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
+                for (int i = 0; i < dependTaskList.size(); i++) {
+                    ObjectNode dependTask = (ObjectNode) 
dependTaskList.path(i);
+                    ArrayNode dependItemList = 
JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
+                    for (int j = 0; j < dependItemList.size(); j++) {
+                        ObjectNode dependItem = (ObjectNode) 
dependItemList.path(j);
+                        dependItem.put("projectCode", 
projectIdCodeMap.get(dependItem.get("projectId").asInt()));
+                        int definitionId = 
dependItem.get("definitionId").asInt();
+                        Map<Long, Map<String, Long>> 
processCodeTaskNameCodeMap = processTaskMap.get(definitionId);
+                        Optional<Map.Entry<Long, Map<String, Long>>> mapEntry 
= processCodeTaskNameCodeMap.entrySet().stream().findFirst();
+                        if (mapEntry.isPresent()) {
+                            Map.Entry<Long, Map<String, Long>> 
processCodeTaskNameCodeEntry = mapEntry.get();
+                            dependItem.put("definitionCode", 
processCodeTaskNameCodeEntry.getKey());
+                            String depTasks = 
dependItem.get("depTasks").asText();
+                            long taskCode = "ALL".equals(depTasks) ? 0L : 
processCodeTaskNameCodeEntry.getValue().get(depTasks);
+                            dependItem.put("depTaskCode", taskCode);
+                        }
+                        dependItem.remove("projectId");
+                        dependItem.remove("definitionId");
+                        dependItem.remove("depTasks");
+                        dependItemList.set(j, dependItem);
+                    }
+                    dependTask.put("dependItemList", dependItemList);
+                    dependTaskList.set(i, dependTask);
+                }
+                dependence.put("dependTaskList", dependTaskList);
+                
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
+            }
+        }
+    }
+
     private void handleProcessTaskRelation(Map<String, List<String>> 
taskNamePreMap,
                                            Map<String, Long> taskNameCodeMap,
                                            ProcessDefinition processDefinition,

Reply via email to