This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.0.1-prepare by this push:
new 041e1fd3e6 Set master's task running status in `runTask` to avoid the
task group acquire failed, but the task status is in running (#11451) (#12011)
041e1fd3e6 is described below
commit 041e1fd3e61292427b5746c92ce850405ef2f2c4
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Sep 17 22:55:40 2022 +0800
Set master's task running status in `runTask` to avoid the task group
acquire failed, but the task status is in running (#11451) (#12011)
(cherry picked from commit 05589606a2fe6b7287b146e82f927b1eceaed701)
---
.../master/runner/task/BlockingTaskProcessor.java | 6 ++-
.../master/runner/task/ConditionTaskProcessor.java | 6 ++-
.../master/runner/task/SwitchTaskProcessor.java | 43 ++++++++++------------
3 files changed, 28 insertions(+), 27 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index 1fa6b28625..494b9e8434 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -110,19 +110,21 @@ public class BlockingTaskProcessor extends
BaseTaskProcessor {
return false;
}
this.setTaskExecutionLogger();
- initTaskParameters();
- logger.info("blocking task start");
+ logger.info("blocking task submit success");
return true;
}
@Override
protected boolean runTask() {
+ logger.info("blocking task starting");
+ initTaskParameters();
if (conditionResult.equals(DependResult.WAITING)) {
setConditionResult();
endTask();
} else {
endTask();
}
+ logger.info("blocking task finished");
return true;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 4749e20f0f..f98174b19d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -67,19 +67,21 @@ public class ConditionTaskProcessor extends
BaseTaskProcessor {
return false;
}
this.setTaskExecutionLogger();
- initTaskParameters();
- logger.info("condition task start");
+ logger.info("condition task submit success");
return true;
}
@Override
public boolean runTask() {
+ initTaskParameters();
+ logger.info("condition task start");
if (conditionResult.equals(DependResult.WAITING)) {
setConditionResult();
endTask();
} else {
endTask();
}
+ logger.info("condition task finished");
return true;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 444d41622c..ffd124c18b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -17,8 +17,9 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
-
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -30,9 +31,6 @@ import
org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -41,7 +39,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import com.google.auto.service.AutoService;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
/**
* switch task processor
@@ -64,29 +62,28 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
return false;
}
this.setTaskExecutionLogger();
-
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion(),
- taskInstance.getProcessInstanceId(),
- taskInstance.getId()));
- taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
- taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
- taskInstance.setStartTime(new Date());
- processService.updateTaskInstance(taskInstance);
+ logger.info("switch task submit success");
return true;
}
@Override
public boolean runTask() {
- try {
- if (!this.taskInstance().getState().typeIsFinished() &&
setSwitchResult()) {
- endTaskState();
- }
- } catch (Exception e) {
- logger.error("update work flow {} switch task {} state error:",
- this.processInstance.getId(),
- this.taskInstance.getId(),
- e);
+ logger.info("switch task starting");
+ taskInstance.setLogPath(
+ LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(),
+ taskInstance.getProcessInstanceId(),
+ taskInstance.getId()));
+ taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
+ taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+ taskInstance.setStartTime(new Date());
+ processService.updateTaskInstance(taskInstance);
+
+ if (!this.taskInstance().getState().typeIsFinished()) {
+ setSwitchResult();
}
+ endTaskState();
+ logger.info("switch task finished");
return true;
}