This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new a8c592b [Bug-7788][MasterServer] fix submit duplicate tasks sometimes
when retry (#7809)
a8c592b is described below
commit a8c592bd93f9705666546204a34a7e48acd8c31b
Author: wind <[email protected]>
AuthorDate: Tue Jan 11 22:54:42 2022 +0800
[Bug-7788][MasterServer] fix submit duplicate tasks sometimes when retry
(#7809)
* [Bug-7788] fix submit duplicate tasks sometimes when retry
* [Bug-7788][MasterServer] fix submit duplicate tasks sometimes when retry
Co-authored-by: caishunfeng <[email protected]>
---
.../master/runner/WorkflowExecuteThread.java | 39 +++++++++++++++++++---
1 file changed, 35 insertions(+), 4 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 2d83f9a..d8d8720 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -81,6 +81,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -1375,13 +1376,22 @@ public class WorkflowExecuteThread {
* @param taskInstance task instance
*/
private void addTaskToStandByList(TaskInstance taskInstance) {
- logger.info("add task to stand by list: {}", taskInstance.getName());
try {
- if (!readyToSubmitTaskQueue.contains(taskInstance)) {
- readyToSubmitTaskQueue.put(taskInstance);
+ if (readyToSubmitTaskQueue.contains(taskInstance)) {
+ logger.warn("task was found in ready submit queue, task
code:{}", taskInstance.getTaskCode());
+ return;
+ }
+ // need to check if the tasks with same task code is active
+ boolean active = hadNotFailTask(taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
+ if (active) {
+ logger.warn("task was found in active task list, task
code:{}", taskInstance.getTaskCode());
+ return;
}
+ logger.info("add task to stand by list, task name:{}, task id:{},
task code:{}",
+ taskInstance.getName(), taskInstance.getId(),
taskInstance.getTaskCode());
+ readyToSubmitTaskQueue.put(taskInstance);
} catch (Exception e) {
- logger.error("add task instance to readyToSubmitTaskQueue error,
taskName: {}", taskInstance.getName(), e);
+ logger.error("add task instance to readyToSubmitTaskQueue,
taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
}
}
@@ -1626,4 +1636,25 @@ public class WorkflowExecuteThread {
return false;
}
}
+
+ /**
+ * check if had not fail task by taskCode and version
+ * @param taskCode
+ * @param version
+ * @return
+ */
+ private boolean hadNotFailTask(long taskCode, int version) {
+ boolean result = false;
+ for (Entry<Integer, TaskInstance> entry : taskInstanceMap.entrySet()) {
+ TaskInstance taskInstance = entry.getValue();
+ if (taskInstance.getTaskCode() == taskCode &&
taskInstance.getTaskDefinitionVersion() == version) {
+ if (!taskInstance.getState().typeIsFailure()) {
+ result = true;
+ break;
+ }
+ }
+ }
+ return result;
+ }
+
}