This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new a32b40e [Feature][JsonSplit] add gen taskNodeList method (#4861)
a32b40e is described below
commit a32b40ee95e74ef83d673f5687e6902125443c41
Author: JinyLeeChina <[email protected]>
AuthorDate: Wed Feb 24 17:36:01 2021 +0800
[Feature][JsonSplit] add gen taskNodeList method (#4861)
* add gen taskNodeList method
* modify code style
* modify code style
Co-authored-by: JinyLeeChina <[email protected]>
---
.../common/model/PreviousTaskNode.java | 66 +++++++++
.../dolphinscheduler/common/model/TaskNode.java | 28 ++--
.../dao/entity/TaskDefinition.java | 8 ++
.../dao/mapper/TaskDefinitionLogMapper.java | 12 +-
.../dao/mapper/TaskDefinitionLogMapper.xml | 13 ++
.../service/process/ProcessService.java | 149 ++++++++++++++-------
6 files changed, 216 insertions(+), 60 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/PreviousTaskNode.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/PreviousTaskNode.java
new file mode 100644
index 0000000..c0ac665
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/PreviousTaskNode.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.model;
+
+public class PreviousTaskNode {
+
+ /**
+ * code
+ */
+ private long code;
+
+ /**
+ * name
+ */
+ private String name;
+
+ /**
+ * version
+ */
+ private int version;
+
+ public PreviousTaskNode(long code, String name, int version) {
+ this.code = code;
+ this.name = name;
+ this.version = version;
+ }
+
+ public long getCode() {
+ return code;
+ }
+
+ public void setCode(long code) {
+ this.code = code;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
index 0072b93..036d175 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
@@ -22,18 +22,15 @@ import
org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.*;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import java.io.IOException;
import java.util.List;
import java.util.Objects;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
public class TaskNode {
@@ -102,6 +99,11 @@ public class TaskNode {
private String preTasks;
/**
+ * node dependency list
+ */
+ private List<PreviousTaskNode> preTaskNodeList;
+
+ /**
* users store additional information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@@ -197,7 +199,7 @@ public class TaskNode {
return preTasks;
}
- public void setPreTasks(String preTasks) throws IOException {
+ public void setPreTasks(String preTasks) {
this.preTasks = preTasks;
this.depList = JSONUtils.toList(preTasks, String.class);
}
@@ -214,7 +216,7 @@ public class TaskNode {
return depList;
}
- public void setDepList(List<String> depList) throws
JsonProcessingException {
+ public void setDepList(List<String> depList) {
this.depList = depList;
this.preTasks = JSONUtils.toJsonString(depList);
}
@@ -373,6 +375,14 @@ public class TaskNode {
return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType());
}
+ public List<PreviousTaskNode> getPreTaskNodeList() {
+ return preTaskNodeList;
+ }
+
+ public void setPreTaskNodeList(List<PreviousTaskNode> preTaskNodeList) {
+ this.preTaskNodeList = preTaskNodeList;
+ }
+
@Override
public String toString() {
return "TaskNode{"
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index 1158aae..3ca10a2 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -167,6 +167,14 @@ public class TaskDefinition {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date updateTime;
+ public TaskDefinition() {
+ }
+
+ public TaskDefinition(long code, int version) {
+ this.code = code;
+ this.version = version;
+ }
+
public String getName() {
return name;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
index e49e01e..71f72ab 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
@@ -17,10 +17,12 @@
package org.apache.dolphinscheduler.dao.mapper;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.ibatis.annotations.Param;
+import java.util.Collection;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@@ -38,7 +40,7 @@ public interface TaskDefinitionLogMapper extends
BaseMapper<TaskDefinitionLog> {
* @return task definition log list
*/
List<TaskDefinitionLog> queryByDefinitionName(@Param("projectCode") Long
projectCode,
- @Param("taskDefinitionName") String
name);
+ @Param("taskDefinitionName")
String name);
/**
* query task definition log list
@@ -57,4 +59,12 @@ public interface TaskDefinitionLogMapper extends
BaseMapper<TaskDefinitionLog> {
*/
TaskDefinitionLog
queryByDefinitionCodeAndVersion(@Param("taskDefinitionCode") long
taskDefinitionCode,
@Param("version") int
version);
+
+ /**
+ * query task definition log
+ *
+ * @param taskDefinitions taskDefinition collection
+ * @return task definition log
+ */
+ List<TaskDefinitionLog> queryByTaskDefinitions(@Param("taskDefinitions")
Collection<TaskDefinition> taskDefinitions);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
index 689eff2..5670f06 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
@@ -47,4 +47,17 @@
WHERE code = #{taskDefinitionCode}
and version = #{version}
</select>
+ <select id="queryByTaskDefinitions"
resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog">
+ select
+ <include refid="baseSql"/>
+ from t_ds_task_definition_log
+ WHERE 1 = 1
+ <if test="taskDefinitions != null and taskDefinitions.length != 0">
+ and
+ <foreach collection="taskDefinitions" index="index" item="item"
open="(" separator=" or " close=")">
+ (code = #{item.code}
+ and version = #{item.version})
+ </foreach>
+ </if>
+ </select>
</mapper>
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index fd888e9..abc0037 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -46,13 +46,14 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.DateInterval;
+import org.apache.dolphinscheduler.common.model.PreviousTaskNode;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
+import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
@@ -499,7 +500,7 @@ public class ProcessService {
* recursive query sub process definition id by parent id.
*
* @param parentId parentId
- * @param ids ids
+ * @param ids ids
*/
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
List<TaskNode> taskNodeList =
this.getTaskNodeListByDefinitionId(parentId);
@@ -524,7 +525,7 @@ public class ProcessService {
* create recovery waiting thread command and delete origin command at
the same time.
* if the recovery command is exists, only update the field update_time
*
- * @param originCommand originCommand
+ * @param originCommand originCommand
* @param processInstance processInstance
*/
public void createRecoveryWaitingThreadCommand(Command originCommand,
ProcessInstance processInstance) {
@@ -576,7 +577,7 @@ public class ProcessService {
/**
* get schedule time from command
*
- * @param command command
+ * @param command command
* @param cmdParam cmdParam map
* @return date
*/
@@ -592,8 +593,8 @@ public class ProcessService {
* generate a new work process instance from command.
*
* @param processDefinition processDefinition
- * @param command command
- * @param cmdParam cmdParam map
+ * @param command command
+ * @param cmdParam cmdParam map
* @return process instance
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition
processDefinition,
@@ -667,7 +668,7 @@ public class ProcessService {
* use definition creator's tenant.
*
* @param tenantId tenantId
- * @param userId userId
+ * @param userId userId
* @return tenant
*/
public Tenant getTenantForProcess(int tenantId, int userId) {
@@ -690,7 +691,7 @@ public class ProcessService {
/**
* check command parameters is valid
*
- * @param command command
+ * @param command command
* @param cmdParam cmdParam map
* @return whether command param is valid
*/
@@ -710,7 +711,7 @@ public class ProcessService {
* construct process instance according to one command.
*
* @param command command
- * @param host host
+ * @param host host
* @return process instance
*/
private ProcessInstance constructProcessInstance(Command command, String
host) {
@@ -868,7 +869,7 @@ public class ProcessService {
* return complement data if the process start with complement data
*
* @param processInstance processInstance
- * @param command command
+ * @param command command
* @return command type
*/
private CommandType getCommandTypeIfComplement(ProcessInstance
processInstance, Command command) {
@@ -883,8 +884,8 @@ public class ProcessService {
* initialize complement data parameters
*
* @param processDefinition processDefinition
- * @param processInstance processInstance
- * @param cmdParam cmdParam
+ * @param processInstance processInstance
+ * @param cmdParam cmdParam
*/
private void initComplementDataParam(ProcessDefinition processDefinition,
ProcessInstance processInstance,
@@ -956,7 +957,7 @@ public class ProcessService {
* only the keys doesn't in sub process global would be joined.
*
* @param parentGlobalParams parentGlobalParams
- * @param subGlobalParams subGlobalParams
+ * @param subGlobalParams subGlobalParams
* @return global params join
*/
private String joinGlobalParams(String parentGlobalParams, String
subGlobalParams) {
@@ -1026,7 +1027,7 @@ public class ProcessService {
* set map {parent instance id, task instance id, 0(child instance id)}
*
* @param parentInstance parentInstance
- * @param parentTask parentTask
+ * @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance
parentInstance, TaskInstance parentTask) {
@@ -1055,7 +1056,7 @@ public class ProcessService {
* find previous task work process map.
*
* @param parentProcessInstance parentProcessInstance
- * @param parentTask parentTask
+ * @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance
parentProcessInstance,
@@ -1081,7 +1082,7 @@ public class ProcessService {
* create sub work process command
*
* @param parentProcessInstance parentProcessInstance
- * @param task task
+ * @param task task
*/
public void createSubWorkProcess(ProcessInstance parentProcessInstance,
TaskInstance task) {
if (!task.isSubProcess()) {
@@ -1180,7 +1181,7 @@ public class ProcessService {
* update sub process definition
*
* @param parentProcessInstance parentProcessInstance
- * @param childDefinitionId childDefinitionId
+ * @param childDefinitionId childDefinitionId
*/
private void updateSubProcessDefinitionByParent(ProcessInstance
parentProcessInstance, int childDefinitionId) {
ProcessDefinition fatherDefinition =
this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
@@ -1194,7 +1195,7 @@ public class ProcessService {
/**
* submit task to mysql
*
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
* @param processInstance processInstance
* @return task instance
*/
@@ -1248,7 +1249,7 @@ public class ProcessService {
* return stop if work process state is ready stop
* if all of above are not satisfied, return submit success
*
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
* @param processInstanceState processInstanceState
* @return process instance state
*/
@@ -1424,7 +1425,7 @@ public class ProcessService {
* get id list by task state
*
* @param instanceId instanceId
- * @param state state
+ * @param state state
* @return task instance states
*/
public List<Integer> findTaskIdByInstanceState(int instanceId,
ExecutionStatus state) {
@@ -1479,7 +1480,7 @@ public class ProcessService {
* find work process map by parent process id and parent task id.
*
* @param parentWorkProcessId parentWorkProcessId
- * @param parentTaskId parentTaskId
+ * @param parentTaskId parentTaskId
* @return process instance map
*/
public ProcessInstanceMap findWorkProcessMapByParent(Integer
parentWorkProcessId, Integer parentTaskId) {
@@ -1501,7 +1502,7 @@ public class ProcessService {
* find sub process instance
*
* @param parentProcessId parentProcessId
- * @param parentTaskId parentTaskId
+ * @param parentTaskId parentTaskId
* @return process instance
*/
public ProcessInstance findSubProcessInstance(Integer parentProcessId,
Integer parentTaskId) {
@@ -1533,12 +1534,12 @@ public class ProcessService {
/**
* change task state
*
- * @param state state
- * @param startTime startTime
- * @param host host
+ * @param state state
+ * @param startTime startTime
+ * @param host host
* @param executePath executePath
- * @param logPath logPath
- * @param taskInstId taskInstId
+ * @param logPath logPath
+ * @param taskInstId taskInstId
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus
state, Date startTime, String host,
String executePath,
@@ -1566,12 +1567,12 @@ public class ProcessService {
* update the process instance
*
* @param processInstanceId processInstanceId
- * @param processJson processJson
- * @param globalParams globalParams
- * @param scheduleTime scheduleTime
- * @param flag flag
- * @param locations locations
- * @param connects connects
+ * @param processJson processJson
+ * @param globalParams globalParams
+ * @param scheduleTime scheduleTime
+ * @param flag flag
+ * @param locations locations
+ * @param connects connects
* @return update process instance result
*/
public int updateProcessInstance(Integer processInstanceId, String
processJson,
@@ -1592,10 +1593,10 @@ public class ProcessService {
/**
* change task state
*
- * @param state state
- * @param endTime endTime
+ * @param state state
+ * @param endTime endTime
* @param taskInstId taskInstId
- * @param varPool varPool
+ * @param varPool varPool
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus
state,
Date endTime,
@@ -1763,7 +1764,7 @@ public class ProcessService {
* update process instance state by id
*
* @param processInstanceId processInstanceId
- * @param executionStatus executionStatus
+ * @param executionStatus executionStatus
* @return update process result
*/
public int updateProcessInstanceState(Integer processInstanceId,
ExecutionStatus executionStatus) {
@@ -1800,7 +1801,7 @@ public class ProcessService {
/**
* find tenant code by resource name
*
- * @param resName resource name
+ * @param resName resource name
* @param resourceType resource type
* @return tenant code
*/
@@ -1824,9 +1825,9 @@ public class ProcessService {
/**
* get dependency cycle by work process define id and scheduler fire time
*
- * @param masterId masterId
+ * @param masterId masterId
* @param processDefinitionId processDefinitionId
- * @param scheduledFireTime the time the task schedule is expected to
trigger
+ * @param scheduledFireTime the time the task schedule is expected to
trigger
* @return CycleDependency
* @throws Exception if error throws Exception
*/
@@ -1839,8 +1840,8 @@ public class ProcessService {
/**
* get dependency cycle list by work process define id list and scheduler
fire time
*
- * @param masterId masterId
- * @param ids ids
+ * @param masterId masterId
+ * @param ids ids
* @param scheduledFireTime the time the task schedule is expected to
trigger
* @return CycleDependency list
* @throws Exception if error throws Exception
@@ -1935,8 +1936,8 @@ public class ProcessService {
* find last running process instance
*
* @param definitionId process definition id
- * @param startTime start time
- * @param endTime end time
+ * @param startTime start time
+ * @param endTime end time
* @return process instance
*/
public ProcessInstance findLastRunningProcess(int definitionId, Date
startTime, Date endTime) {
@@ -2036,7 +2037,7 @@ public class ProcessService {
/**
* list unauthorized udf function
*
- * @param userId user id
+ * @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
*/
@@ -2227,7 +2228,6 @@ public class ProcessService {
/**
* save processDefinition (including create or update processDefinition)
- *
*/
public int saveProcessDefinition(User operator, Project project, String
name, String desc, String locations,
String connects, ProcessData processData,
ProcessDefinition processDefinition) {
@@ -2403,10 +2403,6 @@ public class ProcessService {
/**
* get process task relation list
* this function can be query relation list from log record
- *
- * @param processCode
- * @param processVersion
- * @return
*/
public List<ProcessTaskRelation> getProcessTaskRelationList(Long
processCode, int processVersion) {
List<ProcessTaskRelationLog> taskRelationLogs =
processTaskRelationLogMapper.queryByProcessCodeAndVersion(
@@ -2419,4 +2415,57 @@ public class ProcessService {
return processTaskRelations;
}
+ public List<TaskNode> genTaskNodeList(Long processCode, int
processVersion) {
+ List<ProcessTaskRelation> processTaskRelations =
this.getProcessTaskRelationList(processCode, processVersion);
+ Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
+ Map<Long, TaskNode> taskNodeMap = new HashMap<>();
+ for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+ if (processTaskRelation.getPreTaskCode() > 0) {
+ taskDefinitionSet.add(new
TaskDefinition(processTaskRelation.getPreTaskCode(),
processTaskRelation.getPreNodeVersion()));
+ }
+ if (processTaskRelation.getPostTaskCode() > 0) {
+ taskDefinitionSet.add(new
TaskDefinition(processTaskRelation.getPostTaskCode(),
processTaskRelation.getPostNodeVersion()));
+ }
+ taskNodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v)
-> {
+ if (v == null) {
+ v = new TaskNode();
+ v.setCode(processTaskRelation.getPostTaskCode());
+ v.setVersion(processTaskRelation.getPostNodeVersion());
+
v.setConditionResult(processTaskRelation.getConditionParams());
+ List<PreviousTaskNode> preTaskNodeList = new ArrayList<>();
+ if (processTaskRelation.getPreTaskCode() > 0) {
+ preTaskNodeList.add(new
PreviousTaskNode(processTaskRelation.getPreTaskCode(), "",
processTaskRelation.getPreNodeVersion()));
+ }
+ v.setPreTaskNodeList(preTaskNodeList);
+ } else {
+ List<PreviousTaskNode> preTaskDefinitionList =
v.getPreTaskNodeList();
+ preTaskDefinitionList.add(new
PreviousTaskNode(processTaskRelation.getPreTaskCode(), "",
processTaskRelation.getPreNodeVersion()));
+ }
+ return v;
+ });
+ }
+ List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
+ Map<Long, TaskDefinitionLog> taskDefinitionLogMap =
taskDefinitionLogs.stream().collect(Collectors.toMap(TaskDefinitionLog::getCode,
log -> log));
+ taskNodeMap.forEach((k, v) -> {
+ TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(k);
+ v.setId("task-" + taskDefinitionLog.getId());
+ v.setCode(taskDefinitionLog.getCode());
+ v.setName(taskDefinitionLog.getName());
+ v.setDesc(taskDefinitionLog.getDescription());
+ v.setType(taskDefinitionLog.getTaskType().getDescp());
+ v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ?
Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : "NORMAL");
+ v.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
+ v.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
+ v.setParams(taskDefinitionLog.getTaskParams());
+ v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
+ v.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
+ v.setTimeout(JSONUtils.toJsonString(new
TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
+ taskDefinitionLog.getTaskTimeoutStrategy(),
+ taskDefinitionLog.getTimeout())));
+ // TODO name will be remove
+ v.getPreTaskNodeList().forEach(task ->
task.setName(taskDefinitionLogMap.get(task.getCode()).getName()));
+
v.setPreTasks(StringUtils.join(v.getPreTaskNodeList().stream().map(PreviousTaskNode::getName).collect(Collectors.toList()),
","));
+ });
+ return new ArrayList<>(taskNodeMap.values());
+ }
}