This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 1144360 add convert dependent/conditions (#6710)
1144360 is described below
commit 1144360257cc901c70962971a0777930dcb39e3c
Author: JinYong Li <[email protected]>
AuthorDate: Fri Nov 5 23:02:46 2021 +0800
add convert dependent/conditions (#6710)
---
.../dolphinscheduler/dao/upgrade/UpgradeDao.java | 110 ++++++++++++++++++---
1 file changed, 99 insertions(+), 11 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
index a7acafc..235f223 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
@@ -54,6 +55,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import javax.sql.DataSource;
@@ -61,6 +63,8 @@ import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -599,7 +603,9 @@ public abstract class UpgradeDao extends AbstractBaseDao {
List<ProcessDefinitionLog> processDefinitionLogs = new
ArrayList<>();
List<ProcessTaskRelationLog> processTaskRelationLogs = new
ArrayList<>();
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
- splitProcessDefinitionJson(processDefinitions,
processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs,
taskDefinitionLogs);
+ Map<Integer, Map<Long, Map<String, Long>>> processTaskMap = new
HashMap<>();
+ splitProcessDefinitionJson(processDefinitions,
processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs,
taskDefinitionLogs, processTaskMap);
+ convertDependence(taskDefinitionLogs, projectIdCodeMap,
processTaskMap);
// execute json split
jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(),
processDefinitionLogs);
@@ -614,7 +620,8 @@ public abstract class UpgradeDao extends AbstractBaseDao {
Map<Integer, String>
processDefinitionJsonMap,
List<ProcessDefinitionLog>
processDefinitionLogs,
List<ProcessTaskRelationLog>
processTaskRelationLogs,
- List<TaskDefinitionLog>
taskDefinitionLogs) throws Exception {
+ List<TaskDefinitionLog>
taskDefinitionLogs,
+ Map<Integer, Map<Long, Map<String,
Long>>> processTaskMap) throws Exception {
Map<Integer, ProcessDefinition> processDefinitionMap =
processDefinitions.stream()
.collect(Collectors.toMap(ProcessDefinition::getId,
processDefinition -> processDefinition));
Date now = new Date();
@@ -634,6 +641,8 @@ public abstract class UpgradeDao extends AbstractBaseDao {
Map<String, Long> taskIdCodeMap = new HashMap<>();
Map<String, List<String>> taskNamePreMap = new HashMap<>();
Map<String, Long> taskNameCodeMap = new HashMap<>();
+ Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = new
HashMap<>();
+ List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
ArrayNode tasks =
JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
@@ -647,7 +656,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
}
param.put("conditionResult", task.get("conditionResult"));
param.put("dependence", task.get("dependence"));
- taskDefinitionLog.setTaskParams(param.toString());
+ taskDefinitionLog.setTaskParams(param.asText());
}
TaskTimeoutParameter timeout =
JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")),
TaskTimeoutParameter.class);
if (timeout != null) {
@@ -655,15 +664,15 @@ public abstract class UpgradeDao extends AbstractBaseDao {
taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ?
TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy());
}
-
taskDefinitionLog.setDescription(task.get("description").toString());
-
taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").toString())
? Flag.YES : Flag.NO);
- taskDefinitionLog.setTaskType(task.get("type").toString());
+
taskDefinitionLog.setDescription(task.get("description").asText());
+
taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText())
? Flag.YES : Flag.NO);
+ taskDefinitionLog.setTaskType(task.get("type").asText());
taskDefinitionLog.setFailRetryInterval(task.get("retryInterval").asInt());
taskDefinitionLog.setFailRetryTimes(task.get("maxRetryTimes").asInt());
taskDefinitionLog.setTaskPriority(JSONUtils.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")),
Priority.class));
- String name = task.get("name").toString();
+ String name = task.get("name").asText();
taskDefinitionLog.setName(name);
-
taskDefinitionLog.setWorkerGroup(task.get("workerGroup").toString());
+
taskDefinitionLog.setWorkerGroup(task.get("workerGroup").asText());
long taskCode = SnowFlakeUtils.getInstance().nextId();
taskDefinitionLog.setCode(taskCode);
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
@@ -675,12 +684,14 @@ public abstract class UpgradeDao extends AbstractBaseDao {
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setCreateTime(now);
taskDefinitionLog.setUpdateTime(now);
- taskDefinitionLogs.add(taskDefinitionLog);
- taskIdCodeMap.put(task.get("id").toString(), taskCode);
- List<String> preTasks =
JSONUtils.toList(task.get("preTasks").toString(), String.class);
+ taskDefinitionLogList.add(taskDefinitionLog);
+ taskIdCodeMap.put(task.get("id").asText(), taskCode);
+ List<String> preTasks =
JSONUtils.toList(task.get("preTasks").asText(), String.class);
taskNamePreMap.put(name, preTasks);
taskNameCodeMap.put(name, taskCode);
}
+ convertConditions(taskDefinitionLogList, taskNameCodeMap);
+ taskDefinitionLogs.addAll(taskDefinitionLogList);
processDefinition.setLocations(convertLocations(processDefinition.getLocations(),
taskIdCodeMap));
ProcessDefinitionLog processDefinitionLog = new
ProcessDefinitionLog(processDefinition);
processDefinitionLog.setOperator(1);
@@ -688,6 +699,45 @@ public abstract class UpgradeDao extends AbstractBaseDao {
processDefinitionLog.setUpdateTime(now);
processDefinitionLogs.add(processDefinitionLog);
handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap,
processDefinition, processTaskRelationLogs);
+ processCodeTaskNameCodeMap.put(processDefinition.getCode(),
taskNameCodeMap);
+ processTaskMap.put(entry.getKey(), processCodeTaskNameCodeMap);
+ }
+ }
+
+ public void convertConditions(List<TaskDefinitionLog>
taskDefinitionLogList, Map<String, Long> taskNameCodeMap) throws Exception {
+ for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) {
+ if
(TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType())) {
+ ObjectMapper objectMapper = new ObjectMapper();
+ ObjectNode taskParams =
JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
+ // reset conditionResult
+ ObjectNode conditionResult = (ObjectNode)
taskParams.get("conditionResult");
+ List<String> successNode =
JSONUtils.toList(conditionResult.get("successNode").toString(), String.class);
+ List<Long> nodeCode = new ArrayList<>();
+ successNode.forEach(node ->
nodeCode.add(taskNameCodeMap.get(node)));
+ conditionResult.set("successNode",
objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
+ List<String> failedNode =
JSONUtils.toList(conditionResult.get("failedNode").toString(), String.class);
+ nodeCode.clear();
+ failedNode.forEach(node ->
nodeCode.add(taskNameCodeMap.get(node)));
+ conditionResult.set("failedNode",
objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
+ // reset dependItemList
+ ObjectNode dependence = (ObjectNode)
taskParams.get("dependence");
+ ArrayNode dependTaskList =
JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
+ for (int i = 0; i < dependTaskList.size(); i++) {
+ ObjectNode dependTask = (ObjectNode)
dependTaskList.path(i);
+ ArrayNode dependItemList =
JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
+ for (int j = 0; j < dependItemList.size(); j++) {
+ ObjectNode dependItem = (ObjectNode)
dependItemList.path(j);
+ JsonNode depTasks = dependItem.get("depTasks");
+ dependItem.put("depTaskCode",
taskNameCodeMap.get(depTasks.asText()));
+ dependItem.remove("depTasks");
+ dependItemList.set(j, dependItem);
+ }
+ dependTask.put("dependItemList", dependItemList);
+ dependTaskList.set(i, dependTask);
+ }
+ dependence.put("dependTaskList", dependTaskList);
+
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
+ }
}
}
@@ -709,6 +759,44 @@ public abstract class UpgradeDao extends AbstractBaseDao {
return jsonNodes.toString();
}
+ public void convertDependence(List<TaskDefinitionLog> taskDefinitionLogs,
+ Map<Integer, Long> projectIdCodeMap,
+ Map<Integer, Map<Long, Map<String, Long>>>
processTaskMap) {
+ for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+ if
(TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) {
+ ObjectNode taskParams =
JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
+ ObjectNode dependence = (ObjectNode)
taskParams.get("dependence");
+ ArrayNode dependTaskList =
JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
+ for (int i = 0; i < dependTaskList.size(); i++) {
+ ObjectNode dependTask = (ObjectNode)
dependTaskList.path(i);
+ ArrayNode dependItemList =
JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
+ for (int j = 0; j < dependItemList.size(); j++) {
+ ObjectNode dependItem = (ObjectNode)
dependItemList.path(j);
+ dependItem.put("projectCode",
projectIdCodeMap.get(dependItem.get("projectId").asInt()));
+ int definitionId =
dependItem.get("definitionId").asInt();
+ Map<Long, Map<String, Long>>
processCodeTaskNameCodeMap = processTaskMap.get(definitionId);
+ Optional<Map.Entry<Long, Map<String, Long>>> mapEntry
= processCodeTaskNameCodeMap.entrySet().stream().findFirst();
+ if (mapEntry.isPresent()) {
+ Map.Entry<Long, Map<String, Long>>
processCodeTaskNameCodeEntry = mapEntry.get();
+ dependItem.put("definitionCode",
processCodeTaskNameCodeEntry.getKey());
+ String depTasks =
dependItem.get("depTasks").asText();
+ long taskCode = "ALL".equals(depTasks) ? 0L :
processCodeTaskNameCodeEntry.getValue().get(depTasks);
+ dependItem.put("depTaskCode", taskCode);
+ }
+ dependItem.remove("projectId");
+ dependItem.remove("definitionId");
+ dependItem.remove("depTasks");
+ dependItemList.set(j, dependItem);
+ }
+ dependTask.put("dependItemList", dependItemList);
+ dependTaskList.set(i, dependTask);
+ }
+ dependence.put("dependTaskList", dependTaskList);
+
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
+ }
+ }
+ }
+
private void handleProcessTaskRelation(Map<String, List<String>>
taskNamePreMap,
Map<String, Long> taskNameCodeMap,
ProcessDefinition processDefinition,