This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch 2.0.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.8-prepare by this push:
new ddfdc5dc62 [Bugfix-12568] [Master] The retry task was submitted in
advance will block other task (#12570)
ddfdc5dc62 is described below
commit ddfdc5dc6258e764d075b1a81d4a4d0a8aa5ef47
Author: JinYong Li <[email protected]>
AuthorDate: Wed Nov 2 11:55:17 2022 +0800
[Bugfix-12568] [Master] The retry task was submitted in advance will block
other task (#12570)
* fix 12568
* fix 12568
* code style
Co-authored-by: JinyLeeChina <[email protected]>
---
.../server/master/runner/WorkflowExecuteThread.java | 7 +++++++
1 file changed, 7 insertions(+)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 4d84227c40..89090cb639 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -1452,6 +1452,7 @@ public class WorkflowExecuteThread implements Runnable {
private void submitStandByTask() {
try {
int length = readyToSubmitTaskQueue.size();
+ List<TaskInstance> skipSubmitInstances = new ArrayList<>();
for (int i = 0; i < length; i++) {
TaskInstance task = readyToSubmitTaskQueue.peek();
if (task == null) {
@@ -1472,6 +1473,8 @@ public class WorkflowExecuteThread implements Runnable {
long failedTimeInterval = DateUtils.differSec(new
Date(), retryTask.getEndTime());
if ((long) retryTask.getRetryInterval() *
SEC_2_MINUTES_TIME_UNIT > failedTimeInterval) {
logger.info("task name: {} retry waiting has not
exceeded the interval time, and skip submission this time, task id:{}",
task.getName(), task.getId());
+ readyToSubmitTaskQueue.remove(task);
+ skipSubmitInstances.add(task);
continue;
}
}
@@ -1512,6 +1515,10 @@ public class WorkflowExecuteThread implements Runnable {
logger.info("remove task {},id:{} , because depend result
: {}", task.getName(), task.getId(), dependResult);
}
}
+ for (TaskInstance task : skipSubmitInstances) {
+ readyToSubmitTaskQueue.put(task);
+ }
+ skipSubmitInstances.clear();
} catch (Exception e) {
logger.error("submit standby task error", e);
}