This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new 86ce8f1 [JsonSplit-4417][master/worker/api]json split- remove json in
task instance. (#5160)
86ce8f1 is described below
commit 86ce8f133a42a2464a4916c6e0b42dc74628c70d
Author: bao liang <[email protected]>
AuthorDate: Sat Apr 3 17:58:04 2021 +0800
[JsonSplit-4417][master/worker/api]json split- remove json in task
instance. (#5160)
* [json split] refactor json in task instance
* add task_pramas in task_instance
* feature #4417 refactor json split in task instance
* code style
* code style
* code style
* code style
* code style
* update
* update
* update unit test
* update
---
.../service/impl/ProcessDefinitionServiceImpl.java | 15 +-
.../api/service/ProcessDefinitionServiceTest.java | 33 +---
.../common/enums/TaskTimeoutStrategy.java | 1 +
.../dolphinscheduler/dao/entity/TaskInstance.java | 33 ++--
.../dao/mapper/ProcessTaskRelationLogMapper.java | 5 +
.../dao/mapper/TaskDefinitionLogMapper.java | 7 +-
.../dao/mapper/ProcessTaskRelationLogMapper.xml | 10 ++
.../dao/entity/TaskInstanceTest.java | 37 +++--
.../dao/mapper/TaskInstanceMapperTest.java | 1 -
.../builder/TaskExecutionContextBuilder.java | 13 +-
.../cache/impl/TaskInstanceCacheManagerImpl.java | 1 -
.../master/consumer/TaskPriorityQueueConsumer.java | 41 +++--
.../master/runner/ConditionsTaskExecThread.java | 2 +-
.../master/runner/DependentTaskExecThread.java | 3 +-
.../master/runner/MasterBaseTaskExecThread.java | 12 +-
.../server/master/runner/MasterExecThread.java | 9 +-
.../server/worker/runner/TaskExecuteThread.java | 39 +----
.../server/master/ConditionsTaskTest.java | 1 -
.../server/master/DependentTaskTest.java | 22 ++-
.../server/master/SubProcessTaskTest.java | 3 +-
.../consumer/TaskPriorityQueueConsumerTest.java | 182 +--------------------
.../master/runner/MasterTaskExecThreadTest.java | 1 -
.../worker/shell/ShellCommandExecutorTest.java | 1 -
.../server/worker/sql/SqlExecutorTest.java | 4 +-
.../service/process/ProcessService.java | 93 +++++++++--
.../service/process/ProcessServiceTest.java | 2 +-
sql/dolphinscheduler_mysql.sql | 1 +
sql/dolphinscheduler_postgre.sql | 1 +
28 files changed, 238 insertions(+), 335 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 319c636..13048c5 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -57,6 +57,8 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
@@ -70,6 +72,8 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.commons.collections.map.HashedMap;
+
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -1249,6 +1253,10 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
* List of process instances
*/
List<ProcessInstance> processInstanceList =
processInstanceService.queryByProcessDefineCode(processDefinition.getCode(),
limit);
+ List<TaskDefinitionLog> taskDefinitionList =
processService.queryTaskDefinitionList(processDefinition.getCode(),
+ processDefinition.getVersion());
+ Map<Long, TaskDefinition> taskDefinitionMap = new HashedMap();
+ taskDefinitionList.forEach(taskDefinitionLog ->
taskDefinitionMap.put(taskDefinitionLog.getCode(), taskDefinitionLog));
for (ProcessInstance processInstance : processInstanceList) {
processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(),
processInstance.getEndTime()));
@@ -1305,11 +1313,10 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* if process is sub process, the return sub id, or
sub id=0
*/
- if
(taskInstance.getTaskType().equals(TaskType.SUB_PROCESS.name())) {
- String taskJson = taskInstance.getTaskJson();
- taskNode = JSONUtils.parseObject(taskJson,
TaskNode.class);
+ if (taskInstance.isSubProcess()) {
+ TaskDefinition taskDefinition =
taskDefinitionMap.get(taskInstance.getTaskCode());
subProcessId =
Integer.parseInt(JSONUtils.parseObject(
-
taskNode.getParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
+
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
}
treeViewDto.getInstances().add(new
Instance(taskInstance.getId(), taskInstance.getName(),
taskInstance.getTaskType(), taskInstance.getState().toString()
, taskInstance.getStartTime(),
taskInstance.getEndTime(), taskInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - startTime.getTime()),
subProcessId));
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 234dd4d..9735512 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -837,38 +837,7 @@ public class ProcessDefinitionServiceTest {
taskInstance.setName("test_task_instance");
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setHost("192.168.xx.xx");
- taskInstance.setTaskJson("{\n"
- + " \"conditionResult\": {\n"
- + " \"failedNode\": [\n"
- + " \"\"\n"
- + " ],\n"
- + " \"successNode\": [\n"
- + " \"\"\n"
- + " ]\n"
- + " },\n"
- + " \"delayTime\": \"0\",\n"
- + " \"dependence\": {},\n"
- + " \"description\": \"\",\n"
- + " \"id\": \"1\",\n"
- + " \"maxRetryTimes\": \"0\",\n"
- + " \"name\": \"test_task_instance\",\n"
- + " \"params\": {\n"
- + " \"processDefinitionId\": \"222\",\n"
- + " \"resourceList\": []\n"
- + " },\n"
- + " \"preTasks\": [],\n"
- + " \"retryInterval\": \"1\",\n"
- + " \"runFlag\": \"NORMAL\",\n"
- + " \"taskInstancePriority\": \"MEDIUM\",\n"
- + " \"timeout\": {\n"
- + " \"enable\": false,\n"
- + " \"interval\": null,\n"
- + " \"strategy\": \"\"\n"
- + " },\n"
- + " \"type\": \"SUB_PROCESS\",\n"
- + " \"workerGroup\": \"default\"\n"
- + "}");
- //task instance exist
+ taskInstance.setTaskParams("\"processDefinitionId\": \"222\",\n");
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new
DAG<>());
Mockito.when(processInstanceService.queryByProcessDefineCode(46L,
10)).thenReturn(processInstanceList);
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java
index a8bd325..335b986 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java
@@ -57,4 +57,5 @@ public enum TaskTimeoutStrategy {
}
throw new IllegalArgumentException("invalid status : " + status);
}
+
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 73d4bca..00f2b9a 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
@@ -75,7 +76,7 @@ public class TaskInstance implements Serializable {
private long taskCode;
/**
- * process definition code
+ * process definition code
*/
private long processDefinitionCode;
@@ -184,7 +185,7 @@ public class TaskInstance implements Serializable {
* dependency
*/
@TableField(exist = false)
- private String dependency;
+ private DependentParameters dependency;
/**
* duration
@@ -235,7 +236,7 @@ public class TaskInstance implements Serializable {
* varPool string
*/
private String varPool;
-
+
/**
* executor name
*/
@@ -251,6 +252,11 @@ public class TaskInstance implements Serializable {
*/
private int delayTime;
+ /**
+ * task params
+ */
+ private String taskParams;
+
public void init(String host, Date startTime, String executePath) {
this.host = host;
this.startTime = startTime;
@@ -264,7 +270,7 @@ public class TaskInstance implements Serializable {
public void setVarPool(String varPool) {
this.varPool = varPool;
}
-
+
public ProcessInstance getProcessInstance() {
return processInstance;
}
@@ -429,15 +435,14 @@ public class TaskInstance implements Serializable {
this.appLink = appLink;
}
- public String getDependency() {
- if (this.dependency != null) {
- return this.dependency;
+ public DependentParameters getDependency() {
+ if (this.dependency == null) {
+ this.dependency = JSONUtils.parseObject(this.getTaskParams(),
DependentParameters.class);
}
- TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
- return taskNode == null ? null : taskNode.getDependence();
+ return this.dependency;
}
- public void setDependency(String dependency) {
+ public void setDependency(DependentParameters dependency) {
this.dependency = dependency;
}
@@ -644,4 +649,12 @@ public class TaskInstance implements Serializable {
public void setTaskDefinitionVersion(int taskDefinitionVersion) {
this.taskDefinitionVersion = taskDefinitionVersion;
}
+
+ public String getTaskParams() {
+ return taskParams;
+ }
+
+ public void setTaskParams(String taskParams) {
+ this.taskParams = taskParams;
+ }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
index deda046..9183b0f 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
@@ -39,4 +39,9 @@ public interface ProcessTaskRelationLogMapper extends
BaseMapper<ProcessTaskRela
*/
List<ProcessTaskRelationLog>
queryByProcessCodeAndVersion(@Param("processCode") long processCode,
@Param("processVersion") int processVersion);
+
+ List<ProcessTaskRelationLog> queryByTaskRelationList(@Param("processCode")
long processCode,
+
@Param("processVersion") int processVersion,
+ @Param("taskCode")
long taskCode,
+ @Param("taskVersion")
long taskVersion);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
index 59ea5c5..1ad40e0 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
@@ -59,11 +59,12 @@ public interface TaskDefinitionLogMapper extends
BaseMapper<TaskDefinitionLog> {
TaskDefinitionLog
queryByDefinitionCodeAndVersion(@Param("taskDefinitionCode") long
taskDefinitionCode,
@Param("version") int
version);
+
/**
- * query task definition log
*
- * @param taskDefinitions taskDefinition collection
- * @return task definition log
+ * @param taskDefinitions
+ * @return
*/
List<TaskDefinitionLog> queryByTaskDefinitions(@Param("taskDefinitions")
Collection<TaskDefinition> taskDefinitions);
+
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
index b604a1d..e7e4a12 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
@@ -29,4 +29,14 @@
WHERE process_definition_code = #{processCode}
and process_definition_version = #{processVersion}
</select>
+ <select id="queryByTaskRelationList"
+
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_task_relation_log
+ WHERE process_definition_code = #{processCode}
+ and process_definition_version = #{processVersion}
+ and post_task_code = #{taskCode}
+ and post_task_version = #{taskVersion}
+ </select>
</mapper>
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
index 5742c95..d2117dc 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
@@ -16,9 +16,16 @@
*/
package org.apache.dolphinscheduler.dao.entity;
+import org.apache.dolphinscheduler.common.enums.DependentRelation;
+import org.apache.dolphinscheduler.common.model.DependentItem;
+import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import java.util.ArrayList;
+import java.util.List;
+
import org.junit.Assert;
import org.junit.Test;
@@ -57,25 +64,33 @@ public class TaskInstanceTest {
TaskNode taskNode;
taskInstance = new TaskInstance();
- taskInstance.setTaskJson(null);
Assert.assertNull(taskInstance.getDependency());
taskInstance = new TaskInstance();
taskNode = new TaskNode();
taskNode.setDependence(null);
- taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
Assert.assertNull(taskInstance.getDependency());
taskInstance = new TaskInstance();
- taskNode = new TaskNode();
- // expect a JSON here, and will be unwrap when toJsonString
- taskNode.setDependence("\"A\"");
- taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
- Assert.assertEquals("A", taskInstance.getDependency());
+
taskInstance.setTaskParams(JSONUtils.toJsonString(getDependentParameters()));
+ taskInstance.getDependency();
+ }
- taskInstance = new TaskInstance();
- taskInstance.setTaskJson(null);
- taskInstance.setDependency("{}");
- Assert.assertEquals("{}", taskInstance.getDependency());
+ /**
+ *
+ * @return
+ */
+ private DependentParameters getDependentParameters() {
+ DependentParameters dependentParameters = new DependentParameters();
+ List<DependentTaskModel> dependTaskList = new ArrayList<>();
+ List<DependentItem> dependentItems = new ArrayList<>();
+ DependentItem dependentItem = new DependentItem();
+ dependentItem.setDepTasks("A");
+ dependentItem.setDefinitionCode(222L);
+ dependentItem.setCycle("today");
+ dependentItems.add(dependentItem);
+ dependentParameters.setDependTaskList(dependTaskList);
+ dependentParameters.setRelation(DependentRelation.AND);
+ return dependentParameters;
}
}
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
index 6dc348c..ee57a70 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
@@ -107,7 +107,6 @@ public class TaskInstanceMapperTest {
taskInstance.setState(state);
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
- taskInstance.setTaskJson("{}");
taskInstance.setProcessInstanceId(processInstanceId);
taskInstance.setTaskType(taskType);
taskInstance.setProcessDefinitionCode(1L);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 7bfd9a0..620100f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.builder;
+import static
org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
+
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.entity.*;
@@ -44,7 +46,6 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskExecutionContext.setTaskType(taskInstance.getTaskType());
taskExecutionContext.setLogPath(taskInstance.getLogPath());
- taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
taskExecutionContext.setHost(taskInstance.getHost());
taskExecutionContext.setResources(taskInstance.getResources());
@@ -52,6 +53,16 @@ public class TaskExecutionContextBuilder {
return this;
}
+ public TaskExecutionContextBuilder
buildTaskDefinitionRelatedInfo(TaskDefinition taskDefinition) {
+ int timeoutSeconds = taskDefinition.getTimeout() *
SEC_2_MINUTES_TIME_UNIT;
+ if (timeoutSeconds >= Integer.MAX_VALUE) {
+ timeoutSeconds = Integer.MAX_VALUE;
+ }
+
taskExecutionContext.setTaskTimeoutStrategy(taskDefinition.getTimeoutNotifyStrategy().getCode());
+ taskExecutionContext.setTaskTimeout(timeoutSeconds);
+ return this;
+ }
+
/**
* build processInstance related info
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
index 4d55490..0dc7035 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
@@ -76,7 +76,6 @@ public class TaskInstanceCacheManagerImpl implements
TaskInstanceCacheManager {
taskInstance.setStartTime(taskExecutionContext.getStartTime());
taskInstance.setTaskType(taskInstance.getTaskType());
taskInstance.setExecutePath(taskInstance.getExecutePath());
- taskInstance.setTaskJson(taskInstance.getTaskJson());
taskInstanceCache.put(taskExecutionContext.getTaskInstanceId(),
taskInstance);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 69058a4..d198292 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -145,7 +145,6 @@ public class TaskPriorityQueueConsumer extends Thread {
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
-
} catch (Exception e) {
logger.error("dispatcher task error", e);
}
@@ -201,8 +200,6 @@ public class TaskPriorityQueueConsumer extends Thread {
// task type
TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
- // task node
- TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(),
TaskNode.class);
Integer userId = taskInstance.getProcessDefine() == null ? 0 :
taskInstance.getProcessDefine().getUserId();
Tenant tenant =
processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
userId);
@@ -221,7 +218,7 @@ public class TaskPriorityQueueConsumer extends Thread {
String userQueue =
processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ?
tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
- taskInstance.setResources(getResourceFullNames(taskNode));
+ taskInstance.setResources(getResourceFullNames(taskInstance));
SQLTaskExecutionContext sqlTaskExecutionContext = new
SQLTaskExecutionContext();
DataxTaskExecutionContext dataxTaskExecutionContext = new
DataxTaskExecutionContext();
@@ -230,21 +227,21 @@ public class TaskPriorityQueueConsumer extends Thread {
// SQL task
if (taskType == TaskType.SQL) {
- setSQLTaskRelation(sqlTaskExecutionContext, taskNode);
+ setSQLTaskRelation(sqlTaskExecutionContext, taskInstance);
}
// DATAX task
if (taskType == TaskType.DATAX) {
- setDataxTaskRelation(dataxTaskExecutionContext, taskNode);
+ setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
}
// procedure task
if (taskType == TaskType.PROCEDURE) {
- setProcedureTaskRelation(procedureTaskExecutionContext, taskNode);
+ setProcedureTaskRelation(procedureTaskExecutionContext,
taskInstance);
}
if (taskType == TaskType.SQOOP) {
- setSqoopTaskRelation(sqoopTaskExecutionContext, taskNode);
+ setSqoopTaskRelation(sqoopTaskExecutionContext, taskInstance);
}
return TaskExecutionContextBuilder.get()
@@ -262,10 +259,10 @@ public class TaskPriorityQueueConsumer extends Thread {
* set procedure task relation
*
* @param procedureTaskExecutionContext procedureTaskExecutionContext
- * @param taskNode taskNode
+ * @param taskInstance taskInstance
*/
- private void setProcedureTaskRelation(ProcedureTaskExecutionContext
procedureTaskExecutionContext, TaskNode taskNode) {
- ProcedureParameters procedureParameters =
JSONUtils.parseObject(taskNode.getParams(), ProcedureParameters.class);
+ private void setProcedureTaskRelation(ProcedureTaskExecutionContext
procedureTaskExecutionContext, TaskInstance taskInstance) {
+ ProcedureParameters procedureParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
int datasourceId = procedureParameters.getDatasource();
DataSource datasource =
processService.findDataSourceById(datasourceId);
procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
@@ -275,10 +272,10 @@ public class TaskPriorityQueueConsumer extends Thread {
* set datax task relation
*
* @param dataxTaskExecutionContext dataxTaskExecutionContext
- * @param taskNode taskNode
+ * @param taskInstance taskInstance
*/
- protected void setDataxTaskRelation(DataxTaskExecutionContext
dataxTaskExecutionContext, TaskNode taskNode) {
- DataxParameters dataxParameters =
JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class);
+ protected void setDataxTaskRelation(DataxTaskExecutionContext
dataxTaskExecutionContext, TaskInstance taskInstance) {
+ DataxParameters dataxParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
DataSource dbSource =
processService.findDataSourceById(dataxParameters.getDataSource());
DataSource dbTarget =
processService.findDataSourceById(dataxParameters.getDataTarget());
@@ -300,10 +297,10 @@ public class TaskPriorityQueueConsumer extends Thread {
* set sqoop task relation
*
* @param sqoopTaskExecutionContext sqoopTaskExecutionContext
- * @param taskNode taskNode
+ * @param taskInstance taskInstance
*/
- private void setSqoopTaskRelation(SqoopTaskExecutionContext
sqoopTaskExecutionContext, TaskNode taskNode) {
- SqoopParameters sqoopParameters =
JSONUtils.parseObject(taskNode.getParams(), SqoopParameters.class);
+ private void setSqoopTaskRelation(SqoopTaskExecutionContext
sqoopTaskExecutionContext, TaskInstance taskInstance) {
+ SqoopParameters sqoopParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
// sqoop job type is template set task relation
if
(sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
@@ -331,10 +328,10 @@ public class TaskPriorityQueueConsumer extends Thread {
* set SQL task relation
*
* @param sqlTaskExecutionContext sqlTaskExecutionContext
- * @param taskNode taskNode
+ * @param taskInstance taskInstance
*/
- private void setSQLTaskRelation(SQLTaskExecutionContext
sqlTaskExecutionContext, TaskNode taskNode) {
- SqlParameters sqlParameters =
JSONUtils.parseObject(taskNode.getParams(), SqlParameters.class);
+ private void setSQLTaskRelation(SQLTaskExecutionContext
sqlTaskExecutionContext, TaskInstance taskInstance) {
+ SqlParameters sqlParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
int datasourceId = sqlParameters.getDatasource();
DataSource datasource =
processService.findDataSourceById(datasourceId);
sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
@@ -381,9 +378,9 @@ public class TaskPriorityQueueConsumer extends Thread {
/**
* get resource map key is full name and value is tenantCode
*/
- protected Map<String, String> getResourceFullNames(TaskNode taskNode) {
+ protected Map<String, String> getResourceFullNames(TaskInstance
taskInstance) {
Map<String, String> resourcesMap = new HashMap<>();
- AbstractParameters baseParam =
TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
+ AbstractParameters baseParam =
TaskParametersUtils.getParameters(taskInstance.getTaskType(),
taskInstance.getTaskParams());
if (baseParam != null) {
List<ResourceInfo> projectResourceFiles =
baseParam.getResourceFilesList();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
index d0b314e..8cf6176 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
@@ -130,7 +130,7 @@ public class ConditionsTaskExecThread extends
MasterBaseTaskExecThread {
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);
- this.dependentParameters =
JSONUtils.parseObject(this.taskInstance.getDependency(),
DependentParameters.class);
+ this.dependentParameters = taskInstance.getDependency();
}
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
index 9f78e0c..4ef4783 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
@@ -102,8 +102,7 @@ public class DependentTaskExecThread extends
MasterBaseTaskExecThread {
*/
private void initDependParameters() {
- this.dependentParameters =
JSONUtils.parseObject(this.taskInstance.getDependency(),
- DependentParameters.class);
+ this.dependentParameters = taskInstance.getDependency();
for(DependentTaskModel taskModel :
dependentParameters.getDependTaskList()){
this.dependentTaskList.add(new DependentExecute(
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index be666ae..30aa39b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -19,12 +19,13 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
-import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -121,10 +122,11 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
* init task timeout parameters
*/
private void initTimeoutParams() {
- String taskJson = taskInstance.getTaskJson();
- TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
- taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
-
+ TaskDefinition taskDefinition =
processService.findTaskDefinition(taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
+ boolean timeoutEnable = taskDefinition.getTimeoutFlag() ==
TimeoutFlag.OPEN;
+ taskTimeoutParameter = new TaskTimeoutParameter(timeoutEnable,
+
taskDefinition.getTimeoutNotifyStrategy(),
+
taskDefinition.getTimeout());
if (taskTimeoutParameter.getEnable()) {
checkTimeoutFlag = true;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 41eb150..4f83958 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -380,7 +380,8 @@ public class MasterExecThread implements Runnable {
*/
private void buildFlowDag() throws Exception {
recoverNodeIdList =
getStartTaskInstanceList(processInstance.getCommandParam());
- List<TaskNode> taskNodeList =
processService.genTaskNodeList(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), new HashMap<>());
+ List<TaskNode> taskNodeList =
+
processService.genTaskNodeList(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), new HashMap<>());
forbiddenTaskList.clear();
taskNodeList.stream().forEach(taskNode -> {
if (taskNode.isForbidden()) {
@@ -485,8 +486,6 @@ public class MasterExecThread implements Runnable {
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
// process instance id
taskInstance.setProcessInstanceId(processInstance.getId());
- // task instance node json
- taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
// task instance type
taskInstance.setTaskType(taskNode.getType());
// task instance whether alert
@@ -556,9 +555,7 @@ public class MasterExecThread implements Runnable {
}
}
result.put(LOCAL_PARAMS, allParam);
- taskNode.setParams(JSONUtils.toJsonString(result));
- // task instance node json
- taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+ taskInstance.setTaskParams(JSONUtils.toJsonString(result));
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 409c2b7..3263554 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -132,8 +132,6 @@ public class TaskExecuteThread implements Runnable, Delayed
{
return;
}
- // task node
- TaskNode taskNode =
JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
@@ -148,13 +146,10 @@ public class TaskExecuteThread implements Runnable,
Delayed {
taskExecutionContext.getResources(),
logger);
- taskExecutionContext.setTaskParams(taskNode.getParams());
+
taskExecutionContext.setTaskParams(taskExecutionContext.getTaskParams());
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setDefinedParams(getGlobalParamsMap());
- // set task timeout
- setTaskTimeout(taskExecutionContext, taskNode);
-
taskExecutionContext.setTaskAppId(String.format("%s_%s_%s",
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
@@ -238,38 +233,6 @@ public class TaskExecuteThread implements Runnable,
Delayed {
return globalParamsMap;
}
- /**
- * set task timeout
- * @param taskExecutionContext TaskExecutionContext
- * @param taskNode
- */
- private void setTaskTimeout(TaskExecutionContext taskExecutionContext,
TaskNode taskNode) {
- // the default timeout is the maximum value of the integer
- taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
- TaskTimeoutParameter taskTimeoutParameter =
taskNode.getTaskTimeoutParameter();
- if (taskTimeoutParameter.getEnable()) {
- // get timeout strategy
-
taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
- switch (taskTimeoutParameter.getStrategy()) {
- case WARN:
- break;
- case FAILED:
- if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval()
* 60) {
-
taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60);
- }
- break;
- case WARNFAILED:
- if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval()
* 60) {
-
taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60);
- }
- break;
- default:
- logger.error("not support task timeout strategy: {}",
taskTimeoutParameter.getStrategy());
- throw new IllegalArgumentException("not support task
timeout strategy");
-
- }
- }
- }
/**
* kill task
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
index 61058de..9a750e1 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
@@ -171,7 +171,6 @@ public class ConditionsTaskTest {
private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance
processInstance) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1000);
- taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
taskInstance.setName(taskNode.getName());
taskInstance.setTaskType(taskNode.getType());
taskInstance.setProcessInstanceId(processInstance.getId());
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
index 8b0410b..571e79a 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -60,6 +61,10 @@ public class DependentTaskTest {
*/
public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL";
+
+ public static final Long TASK_CODE = 1111L;
+ public static final int TASK_VERSION = 1;
+
private ProcessService processService;
/**
@@ -113,6 +118,9 @@ public class DependentTaskTest {
Mockito.when(processService
.findTaskInstanceById(1000))
.thenAnswer(i -> taskInstance);
+
+ Mockito.when(processService.findTaskDefinition(TASK_CODE,
TASK_VERSION))
+ .thenReturn(getTaskDefinition());
}
private void testBasicInit() {
@@ -359,14 +367,25 @@ public class DependentTaskTest {
return taskNode;
}
+ private TaskDefinition getTaskDefinition() {
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setCode(TASK_CODE);
+ taskDefinition.setVersion(TASK_VERSION);
+ taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE);
+ taskDefinition.setTimeout(0);
+ return taskDefinition;
+ }
+
private void setupTaskInstance(TaskNode taskNode) {
taskInstance = new TaskInstance();
taskInstance.setId(1000);
+ taskInstance.setTaskCode(TASK_CODE);
+ taskInstance.setTaskDefinitionVersion(TASK_VERSION);
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
- taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
taskInstance.setTaskType(taskNode.getType());
+
taskInstance.setDependency(JSONUtils.parseObject(taskNode.getDependence(),
DependentParameters.class));
taskInstance.setName(taskNode.getName());
}
@@ -405,7 +424,6 @@ public class DependentTaskTest {
taskInstance.setName(taskName);
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
- taskInstance.setTaskJson("{}");
taskInstance.setState(state);
return taskInstance;
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
index 518b116..046ca5b 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
@@ -148,7 +148,8 @@ public class SubProcessTaskTest {
private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance
processInstance) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1000);
- taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+ taskInstance.setName("S");
+ taskInstance.setTaskType(TaskType.SUB_PROCESS.toString());
taskInstance.setName(taskNode.getName());
taskInstance.setTaskType(taskNode.getType());
taskInstance.setProcessInstanceId(processInstance.getId());
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 14b1c61..bf2cf4c 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -110,25 +110,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
- + "\"conditionsTask\":false,"
- + "\"depList\":[],"
- + "\"dependence\":\"{}\","
- + "\"forbidden\":false,"
- + "\"id\":\"tasks-55201\","
- + "\"maxRetryTimes\":0,"
- + "\"name\":\"测试任务\","
- + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
- + "\"preTasks\":\"[]\","
- + "\"retryInterval\":1,"
- + "\"runFlag\":\"NORMAL\","
- + "\"taskInstancePriority\":\"MEDIUM\","
- + "\"taskTimeoutParameter\":{\"enable\":false,"
- + "\"interval\":0},"
- + "\"timeout\":\"{\\\"enable\\\":false,"
- + "\\\"strategy\\\":\\\"\\\"}\","
- + "\"type\":\"SHELL\","
- + "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@@ -161,13 +142,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\","
- +
"\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\","
- + "\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from
t_journey_user\\\",\\\"preStatements\\\":[],"
- +
"\\\"sqlType\\\":0,\\\"receivers\\\":\\\"[email protected]\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\","
- +
"\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\","
- + "\"taskInstancePriority\":\"MEDIUM\","
- +
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@@ -212,26 +186,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
- +
"\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
- + "\"forbidden\":false,\"id\":\"tasks-97625\","
- + "\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\","
- + "\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\","
- + " \\\"postStatements\\\":[],"
- + " \\\"jobSpeedRecord\\\":1000,"
- + " \\\"customConfig\\\":0,"
- + " \\\"dtType\\\":\\\"MYSQL\\\","
- + " \\\"dsType\\\":\\\"MYSQL\\\","
- + " \\\"jobSpeedByte\\\":0,"
- + " \\\"dataSource\\\":80,"
- + " \\\"dataTarget\\\":80,"
- + " \\\"sql\\\":\\\"SELECT dt,count FROM pv\\\","
- + " \\\"preStatements\\\":[]}\","
- + "\"preTasks\":\"[]\","
- +
"\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
- +
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
- + "\"type\":\"DATAX\","
- + "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@@ -274,32 +228,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
- + "\"forbidden\":false,\"id\":\"tasks-63634\","
- + "\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\","
- + "\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\","
- + " \\\"targetType\\\":\\\"HDFS\\\","
- + "
\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\","
- + "
\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\","
- + " \\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\","
- + " \\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\","
- + "
\\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\","
- + " \\\"modelType\\\":\\\"import\\\","
- + "
\\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\","
- + "
\\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\","
- + " \\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\","
- + " \\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\","
- + " \\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\","
- + " \\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\","
- + "
\\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\","
- + " \\\"localParams\\\":[],\\\"concurrency\\\":1}\","
- + "\"preTasks\":\"[]\","
- + "\"retryInterval\":1,"
- + "\"runFlag\":\"NORMAL\","
- + "\"taskInstancePriority\":\"MEDIUM\","
- +
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
- + "\"type\":\"SQOOP\","
- + "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@@ -342,16 +270,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
- +
"\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
- + "\"forbidden\":false,\"id\":\"tasks-55201\","
- + "\"maxRetryTimes\":0,\"name\":\"测试任务\","
- + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\","
- + "\"retryInterval\":1,\"runFlag\":\"NORMAL\","
- + "\"taskInstancePriority\":\"MEDIUM\","
- +
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
- + "\"type\":\"SHELL\","
- + "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@@ -370,24 +288,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
- + "\"conditionsTask\":false,"
- + "\"depList\":[],"
- + "\"dependence\":\"{}\","
- + "\"forbidden\":false,"
- + "\"id\":\"tasks-55201\","
- + "\"maxRetryTimes\":0,"
- + "\"name\":\"测试任务\","
- + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
- + "\"preTasks\":\"[]\","
- + "\"retryInterval\":1,"
- + "\"runFlag\":\"NORMAL\","
- + "\"taskInstancePriority\":\"MEDIUM\","
- + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
- + "\"timeout\":\"{\\\"enable\\\":false,"
- + "\\\"strategy\\\":\\\"\\\"}\","
- + "\"type\":\"SHELL\","
- + "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
@@ -424,24 +324,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
- + "\"conditionsTask\":false,"
- + "\"depList\":[],"
- + "\"dependence\":\"{}\","
- + "\"forbidden\":false,"
- + "\"id\":\"tasks-55201\","
- + "\"maxRetryTimes\":0,"
- + "\"name\":\"测试任务\","
- + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
- + "\"preTasks\":\"[]\","
- + "\"retryInterval\":1,"
- + "\"runFlag\":\"NORMAL\","
- + "\"taskInstancePriority\":\"MEDIUM\","
- + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
- + "\"timeout\":\"{\\\"enable\\\":false,"
- + "\\\"strategy\\\":\\\"\\\"}\","
- + "\"type\":\"SHELL\","
- + "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
@@ -477,24 +359,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
- + "\"conditionsTask\":false,"
- + "\"depList\":[],"
- + "\"dependence\":\"{}\","
- + "\"forbidden\":false,"
- + "\"id\":\"tasks-55201\","
- + "\"maxRetryTimes\":0,"
- + "\"name\":\"测试任务\","
- + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
- + "\"preTasks\":\"[]\","
- + "\"retryInterval\":1,"
- + "\"runFlag\":\"NORMAL\","
- + "\"taskInstancePriority\":\"MEDIUM\","
- + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
- + "\"timeout\":\"{\\\"enable\\\":false,"
- + "\\\"strategy\\\":\\\"\\\"}\","
- + "\"type\":\"SHELL\","
- + "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
@@ -527,32 +391,12 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
- + "\"conditionsTask\":false,"
- + "\"depList\":[],"
- + "\"dependence\":\"{}\","
- + "\"forbidden\":false,"
- + "\"id\":\"tasks-55201\","
- + "\"maxRetryTimes\":0,"
- + "\"name\":\"测试任务\","
- + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[{\\\"id\\\":123},{\\\"res\\\":\\\"/data/file\\\"}]}\","
- + "\"preTasks\":\"[]\","
- + "\"retryInterval\":1,"
- + "\"runFlag\":\"NORMAL\","
- + "\"taskInstancePriority\":\"MEDIUM\","
- + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
- + "\"timeout\":\"{\\\"enable\\\":false,"
- + "\\\"strategy\\\":\\\"\\\"}\","
- + "\"type\":\"SHELL\","
- + "\"workerGroup\":\"NoWorkGroup\"}");
-
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
// task node
- TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(),
TaskNode.class);
- Map<String, String> map =
taskPriorityQueueConsumer.getResourceFullNames(taskNode);
+ Map<String, String> map =
taskPriorityQueueConsumer.getResourceFullNames(taskInstance);
List<Resource> resourcesList = new ArrayList<Resource>();
Resource resource = new Resource();
@@ -598,15 +442,15 @@ public class TaskPriorityQueueConsumerTest {
public void testSetDataxTaskRelation() throws Exception {
DataxTaskExecutionContext dataxTaskExecutionContext = new
DataxTaskExecutionContext();
- TaskNode taskNode = new TaskNode();
- taskNode.setParams("{\"dataSource\":1,\"dataTarget\":1}");
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setTaskParams("{\"dataSource\":1,\"dataTarget\":1}");
DataSource dataSource = new DataSource();
dataSource.setId(1);
dataSource.setConnectionParams("");
dataSource.setType(DbType.MYSQL);
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
-
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext,taskNode);
+
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext,
taskInstance);
Assert.assertEquals(1,dataxTaskExecutionContext.getDataSourceId());
Assert.assertEquals(1,dataxTaskExecutionContext.getDataTargetId());
@@ -620,24 +464,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
-
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
- + "\"conditionsTask\":false,"
- + "\"depList\":[],"
- + "\"dependence\":\"{}\","
- + "\"forbidden\":false,"
- + "\"id\":\"tasks-55201\","
- + "\"maxRetryTimes\":0,"
- + "\"name\":\"测试任务\","
- + "\"params\":\"{\\\"rawScript\\\":\\\"echo
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
- + "\"preTasks\":\"[]\","
- + "\"retryInterval\":1,"
- + "\"runFlag\":\"NORMAL\","
- + "\"taskInstancePriority\":\"MEDIUM\","
- + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
- + "\"timeout\":\"{\\\"enable\\\":false,"
- + "\\\"strategy\\\":\\\"\\\"}\","
- + "\"type\":\"SHELL\","
- + "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
index 405ad43..9229e59 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
@@ -114,7 +114,6 @@ public class MasterTaskExecThreadTest {
taskInstance.setTaskType("SHELL");
taskInstance.setId(252612);
taskInstance.setName("C");
- taskInstance.setTaskJson("{}");
taskInstance.setProcessInstanceId(10111);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
return taskInstance;
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
index ac91e1d..e224fa8 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
@@ -87,7 +87,6 @@ public class ShellCommandExecutorTest {
TaskInstance taskInstance = processService.findTaskInstanceById(7657);
- String taskJson = taskInstance.getTaskJson();
// TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class);
// taskProps.setTaskParams(taskNode.getParams());
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
index dbaa132..caf1551 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
@@ -110,9 +110,7 @@ public class SqlExecutorTest {
TaskInstance taskInstance =
processService.findTaskInstanceById(taskInstId);
- String taskJson = taskInstance.getTaskJson();
- TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
- taskProps.setTaskParams(taskNode.getParams());
+ taskProps.setTaskParams(taskInstance.getTaskParams());
// custom logger
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index bccd926..2c6349c 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -110,6 +110,8 @@ import
org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
+import org.apache.commons.collections.map.HashedMap;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
@@ -1160,9 +1162,12 @@ public class ProcessService {
ProcessInstanceMap instanceMap,
TaskInstance task) {
CommandType commandType = getSubCommandType(parentProcessInstance,
childInstance);
- TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(),
TaskNode.class);
- Map<String, Object> subProcessParam =
JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
- Integer childDefineId =
Integer.parseInt(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID)));
+ TaskDefinition taskDefinition =
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
+ task.getTaskCode(), task.getTaskDefinitionVersion()
+ );
+ Map<String, String> subProcessParam =
JSONUtils.toMap(taskDefinition.getTaskParams());
+ Integer childDefineId =
Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
+
Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
List<Property> allParam =
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
Map<String, String> globalMap =
this.getGlobalParamMap(parentProcessInstance.getGlobalParams());
@@ -1663,8 +1668,7 @@ public class ProcessService {
if (row == null || row.size() == 0) {
return;
}
- TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(),
TaskNode.class);
- Map<String, Object> taskParams = JSONUtils.toMap(taskNode.getParams(),
String.class, Object.class);
+ Map<String, Object> taskParams =
JSONUtils.toMap(taskInstance.getTaskParams(), String.class, Object.class);
Object localParams = taskParams.get(LOCAL_PARAMS);
if (localParams == null) {
return;
@@ -1689,9 +1693,7 @@ public class ProcessService {
}
}
taskParams.put(LOCAL_PARAMS, allParam);
- taskNode.setParams(JSONUtils.toJsonString(taskParams));
- // task instance node json
- taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+ taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams));
String params4ProcessString = JSONUtils.toJsonString(params4Property);
int updateCount =
this.processInstanceMapper.updateGlobalParamsById(params4ProcessString,
processInstance.getId());
logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}",
updateCount, params4ProcessString, processInstance.getId());
@@ -2392,7 +2394,7 @@ public class ProcessService {
taskDefinitionMap.get(preTaskName).getVersion(),
taskDefinitionMap.get(taskNode.getName()).getCode(),
taskDefinitionMap.get(taskNode.getName()).getVersion(),
- ConditionType.of("none"),
+ ConditionType.NONE,
taskNode.getConditionResult(),
now,
now));
@@ -2542,6 +2544,77 @@ public class ProcessService {
}
/**
+ * getTaskNodeFromTaskInstance
+ * return null if task definition do not exists
+ *
+ * @param taskInstance
+ * @return
+ */
+ public TaskNode getTaskNodeFromTaskInstance(TaskInstance taskInstance) {
+ TaskNode taskNode = new TaskNode();
+ ProcessInstance processInstance =
processInstanceMapper.selectById(taskInstance.getProcessInstanceId());
+ TaskDefinition taskDefinition =
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
+ taskInstance.getTaskCode(),
+ taskInstance.getTaskDefinitionVersion());
+ if (taskDefinition == null) {
+ return null;
+ }
+ List<ProcessTaskRelationLog> taskRelationList =
processTaskRelationLogMapper.queryByProcessCodeAndVersion(
+ taskInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion()
+ );
+ Map<Long, Integer> taskCodeMap = new HashedMap();
+
+ taskRelationList.forEach(relation ->
taskCodeMap.putIfAbsent(relation.getPostTaskCode(),
relation.getPostTaskVersion()));
+
+ taskNode.setCode(String.valueOf(taskDefinition.getCode()));
+ taskNode.setVersion(taskDefinition.getVersion());
+ taskNode.setName(taskDefinition.getName());
+ taskNode.setName(taskDefinition.getName());
+ taskNode.setDesc(taskDefinition.getDescription());
+ taskNode.setType(taskDefinition.getTaskType().getDescp());
+ taskNode.setRunFlag(taskDefinition.getFlag() == Flag.YES ?
Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : Constants.FLOWNODE_RUN_FLAG_NORMAL);
+ taskNode.setMaxRetryTimes(taskDefinition.getFailRetryTimes());
+ taskNode.setRetryInterval(taskDefinition.getFailRetryInterval());
+ taskNode.setParams(taskDefinition.getTaskParams());
+ taskNode.setTaskInstancePriority(taskDefinition.getTaskPriority());
+ taskNode.setWorkerGroup(taskDefinition.getWorkerGroup());
+ return taskNode;
+ }
+
+ /**
+ * find task definition by code and verision
+ *
+ * @param taskCode
+ * @param taskDefinitionVersion
+ * @return
+ */
+ public TaskDefinition findTaskDefinition(long taskCode, int
taskDefinitionVersion) {
+ return
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode,
taskDefinitionVersion);
+ }
+
+ /**
+ * query taks definition list by process code and process version
+ *
+ * @param processCode
+ * @param processVersion
+ * @return
+ */
+ public List<TaskDefinitionLog> queryTaskDefinitionList(Long processCode,
int processVersion) {
+ List<ProcessTaskRelationLog> processTaskRelationLogs =
+
processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode,
processVersion);
+ Map<Long, TaskDefinition> postTaskDefinitionMap = new HashedMap();
+ processTaskRelationLogs.forEach(processTaskRelationLog -> {
+ Long code = processTaskRelationLog.getPostTaskCode();
+ int version = processTaskRelationLog.getPostTaskVersion();
+ if (postTaskDefinitionMap.containsKey(code)) {
+ TaskDefinition taskDefinition =
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version);
+ postTaskDefinitionMap.putIfAbsent(code, taskDefinition);
+ }
+ });
+ return new ArrayList(postTaskDefinitionMap.values());
+ }
+
+ /**
* parse locations
*
* @param locations processDefinition locations
@@ -2565,7 +2638,7 @@ public class ProcessService {
* add authorized resources
*
* @param ownResources own resources
- * @param userId userId
+ * @param userId userId
*/
private void addAuthorizedResources(List<Resource> ownResources, int
userId) {
List<Integer> relationResourceIds =
resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7);
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 1593374..20ba75c 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -114,7 +114,7 @@ public class ProcessServiceTest {
parentInstance.setWarningGroupId(0);
TaskInstance task = new TaskInstance();
- task.setTaskJson("{\"params\":{\"processDefinitionId\":100}}");
+ task.setTaskParams("{\"processDefinitionId\":100}}");
task.setId(10);
ProcessInstance childInstance = null;
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index e081590..ded1038 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -830,6 +830,7 @@ CREATE TABLE `t_ds_task_instance` (
`retry_times` int(4) DEFAULT '0' COMMENT 'task retry times',
`pid` int(4) DEFAULT NULL COMMENT 'pid of task',
`app_link` text COMMENT 'yarn app id',
+ `task_params` text COMMENT 'job custom parameters',
`flag` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available',
`retry_interval` int(4) DEFAULT NULL COMMENT 'retry interval when task
failed ',
`max_retry_times` int(2) DEFAULT NULL COMMENT 'max retry times',
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index 2b381a4..5d0c214 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -689,6 +689,7 @@ CREATE TABLE t_ds_task_instance (
retry_times int DEFAULT '0' ,
pid int DEFAULT NULL ,
app_link text ,
+ task_params text ,
flag int DEFAULT '1' ,
retry_interval int DEFAULT NULL ,
max_retry_times int DEFAULT NULL ,