This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 2.0.7-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.7-prepare by this push:
new 200d4554f1 [Fix-12422][MasterServer] The task needing fault tolerance
cannot be submitted (#12423)
200d4554f1 is described below
commit 200d4554f18b14b3cebaf9e034271f8967528b36
Author: JinYong Li <[email protected]>
AuthorDate: Wed Oct 19 10:41:27 2022 +0800
[Fix-12422][MasterServer] The task needing fault tolerance cannot be
submitted (#12423)
* fix complement data bug
* fix 12167
* fix complememt data bug
* fix 12422
Co-authored-by: JinyLeeChina <[email protected]>
---
.../server/master/runner/WorkflowExecuteThread.java | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index b90453349f..269d921aba 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -22,6 +22,7 @@ import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static
org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -421,8 +422,11 @@ public class WorkflowExecuteThread implements Runnable {
private void taskFinished(TaskInstance task) {
logger.info("work flow {} task {} state:{} ", processInstance.getId(),
task.getId(), task.getState());
- if (task.isDependTask() && task.getState() ==
ExecutionStatus.NEED_FAULT_TOLERANCE) {
- logger.info("resubmit NEED_FAULT_TOLERANCE dependent task");
+ if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
+ logger.info("resubmit NEED_FAULT_TOLERANCE {} task", task.getId());
+ if (task.getMaxRetryTimes() == 0) {
+ task.setRetryTimes(task.getRetryTimes() + 1);
+ }
addTaskToStandByList(task);
submitStandByTask();
return;
@@ -1464,9 +1468,12 @@ public class WorkflowExecuteThread implements Runnable {
submitPostNode(Long.toString(task.getTaskCode()));
continue;
}
- if (retryTask != null &&
!retryTask.retryTaskIntervalOverTime()) {
- logger.info("task name: {} retry waiting has not
exceeded the interval time, and skip submission this time, task id:{}",
task.getName(), task.getId());
- continue;
+ if (retryTask != null && retryTask.getState() ==
ExecutionStatus.FAILURE && retryTask.getMaxRetryTimes() !=0 &&
retryTask.getRetryInterval() != 0) {
+ long failedTimeInterval = DateUtils.differSec(new
Date(), retryTask.getEndTime());
+ if ((long) retryTask.getRetryInterval() *
SEC_2_MINUTES_TIME_UNIT > failedTimeInterval) {
+ logger.info("task name: {} retry waiting has not
exceeded the interval time, and skip submission this time, task id:{}",
task.getName(), task.getId());
+ continue;
+ }
}
}
//init varPool only this task is the first time running