This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new 862565a [Feature][JsonSplit] Fix dependTask bug (#5360)
862565a is described below
commit 862565a7c7d404845a43fb5db3eda28fc5cbf05a
Author: JinyLeeChina <[email protected]>
AuthorDate: Fri Apr 23 14:23:04 2021 +0800
[Feature][JsonSplit] Fix dependTask bug (#5360)
* update SnowFlake
* update processDefinite from processInstance
* update processDefinite from processInstance
* Fix task logger path
* Fix dependTask bug
Co-authored-by: JinyLeeChina <[email protected]>
---
.../dolphinscheduler/dao/entity/TaskInstance.java | 1 -
.../dao/mapper/TaskInstanceMapper.xml | 4 +-
.../master/runner/DependentTaskExecThread.java | 61 ++++++++++------------
.../server/master/runner/MasterExecThread.java | 53 ++++++++++---------
.../server/utils/DependentExecute.java | 6 +--
.../service/process/ProcessService.java | 1 -
6 files changed, 60 insertions(+), 66 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index f1da73f..b417d2f 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -241,7 +241,6 @@ public class TaskInstance implements Serializable {
/**
* task params
*/
- @TableField(exist = false)
private String taskParams;
public void init(String host, Date startTime, String executePath) {
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 96ec4bb..8c96e44 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -22,13 +22,13 @@
id, name, task_type, process_instance_id, task_code,
task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag,
retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority,
worker_group, executor_id,
- first_submit_time, delay_time, var_pool
+ first_submit_time, delay_time, task_params, var_pool
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code,
${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state,
${alias}.submit_time,
${alias}.start_time, ${alias}.end_time, ${alias}.host,
${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag,
${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times,
${alias}.task_instance_priority, ${alias}.worker_group, ${alias}.executor_id,
- ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.var_pool
+ ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params,
${alias}.var_pool
</sql>
<update id="setFailoverByHostAndStateArray">
update t_ds_task_instance
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
index 4d13f3b..6b2bceb 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
@@ -26,12 +26,11 @@ import
org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
+import org.apache.dolphinscheduler.server.utils.LogUtils;
import java.util.ArrayList;
import java.util.Date;
@@ -62,13 +61,13 @@ public class DependentTaskExecThread extends
MasterBaseTaskExecThread {
/**
* dependent date
*/
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date dependentDate;
/**
* constructor of MasterBaseTaskExecThread
*
- * @param taskInstance task instance
+ * @param taskInstance task instance
*/
public DependentTaskExecThread(TaskInstance taskInstance) {
super(taskInstance);
@@ -78,7 +77,7 @@ public class DependentTaskExecThread extends
MasterBaseTaskExecThread {
@Override
public Boolean submitWaitComplete() {
- try{
+ try {
logger.info("dependent task start");
this.taskInstance = submit();
logger =
LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
@@ -92,8 +91,8 @@ public class DependentTaskExecThread extends
MasterBaseTaskExecThread {
initDependParameters();
waitTaskQuit();
updateTaskState();
- }catch (Exception e){
- logger.error("dependent task run exception" , e);
+ } catch (Exception e) {
+ logger.error("dependent task run exception", e);
}
return true;
}
@@ -102,16 +101,13 @@ public class DependentTaskExecThread extends
MasterBaseTaskExecThread {
* init dependent parameters
*/
private void initDependParameters() {
-
this.dependentParameters = taskInstance.getDependency();
-
- for(DependentTaskModel taskModel :
dependentParameters.getDependTaskList()){
- this.dependentTaskList.add(new DependentExecute(
- taskModel.getDependItemList(), taskModel.getRelation()));
+ for (DependentTaskModel taskModel :
dependentParameters.getDependTaskList()) {
+ this.dependentTaskList.add(new
DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
}
- if(this.processInstance.getScheduleTime() != null){
+ if (this.processInstance.getScheduleTime() != null) {
this.dependentDate = this.processInstance.getScheduleTime();
- }else{
+ } else {
this.dependentDate = new Date();
}
}
@@ -121,9 +117,9 @@ public class DependentTaskExecThread extends
MasterBaseTaskExecThread {
*/
private void updateTaskState() {
ExecutionStatus status;
- if(this.cancel){
+ if (this.cancel) {
status = ExecutionStatus.KILL;
- }else{
+ } else {
DependResult result = getTaskDependResult();
status = (result == DependResult.SUCCESS) ?
ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
}
@@ -144,8 +140,8 @@ public class DependentTaskExecThread extends
MasterBaseTaskExecThread {
return true;
}
while (Stopper.isRunning()) {
- try{
- if(this.processInstance == null){
+ try {
+ if (this.processInstance == null) {
logger.error("process instance not exists , master task
exec thread exit");
return true;
}
@@ -153,12 +149,12 @@ public class DependentTaskExecThread extends
MasterBaseTaskExecThread {
this.checkTimeoutFlag = !alertTimeout();
handleTimeoutFailed();
}
- if(this.cancel || this.processInstance.getState() ==
ExecutionStatus.READY_STOP){
+ if (this.cancel || this.processInstance.getState() ==
ExecutionStatus.READY_STOP) {
cancelTaskInstance();
break;
}
- if ( allDependentTaskFinish() ||
taskInstance.getState().typeIsFinished()){
+ if (allDependentTaskFinish() ||
taskInstance.getState().typeIsFinished()) {
break;
}
// update process task
@@ -166,7 +162,7 @@ public class DependentTaskExecThread extends
MasterBaseTaskExecThread {
processInstance =
processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
- logger.error("exception",e);
+ logger.error("exception", e);
if (processInstance != null) {
logger.error("wait task quit failed, instance id:{}, task
id:{}",
processInstance.getId(), taskInstance.getId());
@@ -196,20 +192,20 @@ public class DependentTaskExecThread extends
MasterBaseTaskExecThread {
/**
* judge all dependent tasks finish
+ *
* @return whether all dependent tasks finish
*/
- private boolean allDependentTaskFinish(){
+ private boolean allDependentTaskFinish() {
boolean finish = true;
- for(DependentExecute dependentExecute : dependentTaskList){
- for(Map.Entry<String, DependResult> entry:
dependentExecute.getDependResultMap().entrySet()) {
- if(!dependResultMap.containsKey(entry.getKey())){
+ for (DependentExecute dependentExecute : dependentTaskList) {
+ for (Map.Entry<String, DependResult> entry :
dependentExecute.getDependResultMap().entrySet()) {
+ if (!dependResultMap.containsKey(entry.getKey())) {
dependResultMap.put(entry.getKey(), entry.getValue());
//save depend result to log
- logger.info("dependent item complete {} {},{}",
- DEPENDENT_SPLIT, entry.getKey(), entry.getValue());
+ logger.info("dependent item complete {} {},{}",
DEPENDENT_SPLIT, entry.getKey(), entry.getValue());
}
}
- if(!dependentExecute.finish(dependentDate)){
+ if (!dependentExecute.finish(dependentDate)) {
finish = false;
}
}
@@ -218,17 +214,16 @@ public class DependentTaskExecThread extends
MasterBaseTaskExecThread {
/**
* get dependent result
+ *
* @return DependResult
*/
- private DependResult getTaskDependResult(){
+ private DependResult getTaskDependResult() {
List<DependResult> dependResultList = new ArrayList<>();
- for(DependentExecute dependentExecute : dependentTaskList){
+ for (DependentExecute dependentExecute : dependentTaskList) {
DependResult dependResult =
dependentExecute.getModelDependResult(dependentDate);
dependResultList.add(dependResult);
}
- DependResult result = DependentUtils.getDependResultForRelation(
- this.dependentParameters.getRelation(), dependResultList
- );
+ DependResult result =
DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(),
dependResultList);
logger.info("dependent task completed, dependent result:{}", result);
return result;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 23121cb..a0c560a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -34,6 +34,7 @@ import
org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -448,13 +449,14 @@ public class MasterExecThread implements Runnable {
* find task instance in db.
* in case submit more than one same name task in the same time.
*
- * @param taskName task name
+ * @param taskCode task code
+ * @param taskVersion task version
* @return TaskInstance
*/
- private TaskInstance findTaskIfExists(String taskName) {
+ private TaskInstance findTaskIfExists(Long taskCode, int taskVersion) {
List<TaskInstance> taskInstanceList =
processService.findValidTaskListByProcessId(this.processInstance.getId());
for (TaskInstance taskInstance : taskInstanceList) {
- if (taskInstance.getName().equals(taskName)) {
+ if (taskInstance.getTaskCode() == taskCode &&
taskInstance.getTaskDefinitionVersion() == taskVersion) {
return taskInstance;
}
}
@@ -465,20 +467,19 @@ public class MasterExecThread implements Runnable {
* encapsulation task
*
* @param processInstance process instance
- * @param nodeName node name
+ * @param taskNode taskNode
* @return TaskInstance
*/
- private TaskInstance createTaskInstance(ProcessInstance processInstance,
String nodeName,
- TaskNode taskNode) {
+ private TaskInstance createTaskInstance(ProcessInstance processInstance,
TaskNode taskNode) {
//update processInstance for update the globalParams
this.processInstance =
this.processService.findProcessInstanceById(this.processInstance.getId());
- TaskInstance taskInstance = findTaskIfExists(nodeName);
+ TaskInstance taskInstance = findTaskIfExists(taskNode.getCode(),
taskNode.getVersion());
if (taskInstance == null) {
taskInstance = new TaskInstance();
taskInstance.setTaskCode(taskNode.getCode());
taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
// task name
- taskInstance.setName(nodeName);
+ taskInstance.setName(taskNode.getName());
// task instance state
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
// process instance id
@@ -518,27 +519,28 @@ public class MasterExecThread implements Runnable {
} else {
taskInstance.setWorkerGroup(taskWorkerGroup);
}
- //get process global
- setProcessGlobal(taskNode, taskInstance);
+ if
(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNode.getType())) {
+ taskInstance.setTaskParams(taskNode.getDependence());
+ } else {
+
taskInstance.setTaskParams(globalParamToTaskParams(taskNode.getParams()));
+ }
// delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime());
}
return taskInstance;
}
- private void setProcessGlobal(TaskNode taskNode, TaskInstance
taskInstance) {
+ private String globalParamToTaskParams(String params) {
String globalParams = this.processInstance.getGlobalParams();
- if (StringUtils.isNotEmpty(globalParams)) {
- Map<String, String> globalMap =
processService.getGlobalParamMap(globalParams);
- if (globalMap != null && globalMap.size() != 0) {
- setGlobalMapToTask(taskNode, taskInstance, globalMap);
- }
+ if (StringUtils.isBlank(globalParams)) {
+ return params;
}
- }
-
- private void setGlobalMapToTask(TaskNode taskNode, TaskInstance
taskInstance, Map<String, String> globalMap) {
- // the param save in localParams
- Map<String, Object> result = JSONUtils.toMap(taskNode.getParams(),
String.class, Object.class);
+ Map<String, String> globalMap =
processService.getGlobalParamMap(globalParams);
+ if (globalMap == null || globalMap.size() == 0) {
+ return params;
+ }
+ // the process global param save in localParams
+ Map<String, Object> result = JSONUtils.toMap(params, String.class,
Object.class);
Object localParams = result.get(LOCAL_PARAMS);
if (localParams != null) {
List<Property> allParam =
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
@@ -552,8 +554,8 @@ public class MasterExecThread implements Runnable {
}
}
result.put(LOCAL_PARAMS, allParam);
- taskInstance.setTaskParams(JSONUtils.toJsonString(result));
}
+ return JSONUtils.toJsonString(result);
}
private void submitPostNode(String parentNodeName) {
@@ -567,9 +569,10 @@ public class MasterExecThread implements Runnable {
throw new RuntimeException();
}
TaskNode taskNodeObject = dag.getNode(taskNode);
- VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue);
- taskInstances.add(createTaskInstance(processInstance, taskNode,
- taskNodeObject));
+ if
(!TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNodeObject.getType())) {
+ VarPoolUtils.setTaskNodeLocalParams(taskNodeObject,
propToValue);
+ }
+ taskInstances.add(createTaskInstance(processInstance,
taskNodeObject));
}
// if previous node success , post node submit
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
index 09c98fc..f273afd 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
@@ -87,7 +87,7 @@ public class DependentExecute {
*/
private DependResult getDependentResultForItem(DependentItem
dependentItem, Date currentTime){
List<DateInterval> dateIntervals =
DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue());
- return calculateResultForTasks(dependentItem, dateIntervals );
+ return calculateResultForTasks(dependentItem, dateIntervals);
}
/**
@@ -257,9 +257,7 @@ public class DependentExecute {
}
dependResultList.add(dependResult);
}
- modelDependResult = DependentUtils.getDependResultForRelation(
- this.relation, dependResultList
- );
+ modelDependResult =
DependentUtils.getDependResultForRelation(this.relation, dependResultList);
return modelDependResult;
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 89a89be..7433824 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1247,7 +1247,6 @@ public class ProcessService {
if (taskInstance.isSubProcess()) {
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
} else {
-
if (processInstanceState != ExecutionStatus.READY_STOP
&& processInstanceState !=
ExecutionStatus.READY_PAUSE) {
// failure task set invalid