This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new e6c57430e3 [DSIP-44] Set a delay time to TaskExecuteRunnable if it 
dispatched failed (#16069)
e6c57430e3 is described below

commit e6c57430e33df31b56bb0f1a4ecfaecd04e6cfc9
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon May 27 21:53:11 2024 +0800

    [DSIP-44] Set a delay time to TaskExecuteRunnable if it dispatched failed 
(#16069)
---
 .../builder/TaskExecutionContextBuilder.java       |   1 -
 .../master/runner/BaseTaskExecuteRunnable.java     |  28 ++++
 .../master/runner/DefaultTaskExecuteRunnable.java  |   2 +-
 .../runner/GlobalTaskDispatchWaitingQueue.java     |  35 +++-
 .../GlobalTaskDispatchWaitingQueueLooper.java      |  53 +++---
 .../runner/PriorityDelayTaskExecuteRunnable.java   |  85 ----------
 .../server/master/runner/TaskExecuteRunnable.java  |   3 +-
 .../BaseTaskExecuteRunnableDispatchOperator.java   |  14 +-
 .../server/master/runner/queue/DelayEntry.java     |  82 +++++++++
 .../PriorityDelayQueue.java}                       |  32 ++--
 .../runner/GlobalTaskDispatchWaitingQueueTest.java | 184 +++++++++++++++++++++
 .../PriorityDelayTaskExecuteRunnableTest.java      |  67 --------
 .../master/runner/queue/DelayEntryTest.java}       |  32 ++--
 .../plugin/task/api/TaskExecutionContext.java      |  12 +-
 14 files changed, 385 insertions(+), 245 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java
index 832c1b336b..5990a53e0f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java
@@ -66,7 +66,6 @@ public class TaskExecutionContextBuilder {
         taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
         
taskExecutionContext.setEnvironmentConfig(taskInstance.getEnvironmentConfig());
         taskExecutionContext.setHost(taskInstance.getHost());
-        taskExecutionContext.setDelayTime(taskInstance.getDelayTime());
         taskExecutionContext.setVarPool(taskInstance.getVarPool());
         taskExecutionContext.setDryRun(taskInstance.getDryRun());
         taskExecutionContext.setTestFlag(taskInstance.getTestFlag());
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
index fefdbf3493..2e41173c4b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
@@ -52,4 +52,32 @@ public abstract class BaseTaskExecuteRunnable implements 
TaskExecuteRunnable {
         return taskExecutionContext;
     }
 
+    @Override
+    public int compareTo(TaskExecuteRunnable other) {
+        if (other == null) {
+            return 1;
+        }
+        int workflowInstancePriorityCompareResult = 
workflowInstance.getProcessInstancePriority().getCode() -
+                
other.getWorkflowInstance().getProcessInstancePriority().getCode();
+        if (workflowInstancePriorityCompareResult != 0) {
+            return workflowInstancePriorityCompareResult;
+        }
+
+        // smaller number, higher priority
+        int taskInstancePriorityCompareResult = 
taskInstance.getTaskInstancePriority().getCode()
+                - other.getTaskInstance().getTaskInstancePriority().getCode();
+        if (taskInstancePriorityCompareResult != 0) {
+            return taskInstancePriorityCompareResult;
+        }
+
+        // larger number, higher priority
+        int taskGroupPriorityCompareResult =
+                taskInstance.getTaskGroupPriority() - 
other.getTaskInstance().getTaskGroupPriority();
+        if (taskGroupPriorityCompareResult != 0) {
+            return -taskGroupPriorityCompareResult;
+        }
+        // earlier submit time, higher priority
+        return 
taskInstance.getFirstSubmitTime().compareTo(other.getTaskInstance().getFirstSubmitTime());
+    }
+
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
index c1b13717bd..ba4f447216 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import 
org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
 
-public class DefaultTaskExecuteRunnable extends 
PriorityDelayTaskExecuteRunnable {
+public class DefaultTaskExecuteRunnable extends BaseTaskExecuteRunnable {
 
     private final TaskExecuteRunnableOperatorManager 
taskExecuteRunnableOperatorManager;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
index f03bd6b903..21d6c890f5 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
@@ -17,7 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import java.util.concurrent.DelayQueue;
+import org.apache.dolphinscheduler.server.master.runner.queue.DelayEntry;
+import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
 
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -25,26 +26,42 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
 /**
- * The class is used to store {@link TaskExecuteRunnable} which needs to be 
dispatched. The {@link TaskExecuteRunnable} will be stored in a {@link 
DelayQueue},
- * if the {@link TaskExecuteRunnable}'s delay time is 0, then it will be 
consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
+ * The class is used to store {@link TaskExecuteRunnable} which needs to be 
dispatched. The {@link TaskExecuteRunnable}
+ * will be stored in {@link PriorityDelayQueue}, if the {@link 
TaskExecuteRunnable}'s delay time is 0, then it will be
+ * consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
+ * <p>
+ * The order of {@link TaskExecuteRunnable} in the {@link PriorityDelayQueue} 
is determined by {@link TaskExecuteRunnable#compareTo}.
  */
 @Slf4j
 @Component
 public class GlobalTaskDispatchWaitingQueue {
 
-    private final DelayQueue<DefaultTaskExecuteRunnable> queue = new 
DelayQueue<>();
+    private final PriorityDelayQueue<DelayEntry<TaskExecuteRunnable>> 
priorityDelayQueue = new PriorityDelayQueue<>();
 
-    public void submitTaskExecuteRunnable(DefaultTaskExecuteRunnable 
priorityTaskExecuteRunnable) {
-        queue.put(priorityTaskExecuteRunnable);
+    /**
+     * Submit a {@link TaskExecuteRunnable} with delay time 0, it will be 
consumed immediately.
+     */
+    public void dispatchTaskExecuteRunnable(TaskExecuteRunnable 
taskExecuteRunnable) {
+        dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable, 0);
     }
 
+    /**
+     * Submit a {@link TaskExecuteRunnable} with delay time, if the delay time 
<= 0 then it can be consumed.
+     */
+    public void dispatchTaskExecuteRunnableWithDelay(TaskExecuteRunnable 
taskExecuteRunnable, long delayTimeMills) {
+        priorityDelayQueue.add(new DelayEntry<>(delayTimeMills, 
taskExecuteRunnable));
+    }
+
+    /**
+     * Consume {@link TaskExecuteRunnable} from the {@link 
PriorityDelayQueue}, only the delay time <= 0 can be consumed.
+     */
     @SneakyThrows
-    public DefaultTaskExecuteRunnable takeTaskExecuteRunnable() {
-        return queue.take();
+    public TaskExecuteRunnable takeTaskExecuteRunnable() {
+        return priorityDelayQueue.take().getData();
     }
 
     public int getWaitingDispatchTaskNumber() {
-        return queue.size();
+        return priorityDelayQueue.size();
     }
 
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
index eabbdd8e10..5cfb285c28 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
@@ -18,13 +18,11 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import 
org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatchFactory;
-import 
org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -43,10 +41,6 @@ public class GlobalTaskDispatchWaitingQueueLooper extends 
BaseDaemonThread imple
 
     private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
 
-    private final AtomicInteger DISPATCHED_CONSECUTIVE_FAILURE_TIMES = new 
AtomicInteger();
-
-    private static final Integer MAX_DISPATCHED_FAILED_TIMES = 100;
-
     public GlobalTaskDispatchWaitingQueueLooper() {
         super("GlobalTaskDispatchWaitingQueueLooper");
     }
@@ -64,29 +58,34 @@ public class GlobalTaskDispatchWaitingQueueLooper extends 
BaseDaemonThread imple
 
     @Override
     public void run() {
-        DefaultTaskExecuteRunnable defaultTaskExecuteRunnable;
         while (RUNNING_FLAG.get()) {
-            defaultTaskExecuteRunnable = 
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
-            try {
-                TaskExecutionStatus status = 
defaultTaskExecuteRunnable.getTaskInstance().getState();
-                if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status 
!= TaskExecutionStatus.DELAY_EXECUTION) {
-                    log.warn("The TaskInstance {} state is : {}, will not 
dispatch",
-                            
defaultTaskExecuteRunnable.getTaskInstance().getName(), status);
-                    continue;
-                }
+            doDispatch();
+        }
+    }
 
-                TaskDispatcher taskDispatcher =
-                        
taskDispatchFactory.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance());
-                taskDispatcher.dispatchTask(defaultTaskExecuteRunnable);
-                DISPATCHED_CONSECUTIVE_FAILURE_TIMES.set(0);
-            } catch (Exception e) {
-                
defaultTaskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes();
-                
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(defaultTaskExecuteRunnable);
-                if (DISPATCHED_CONSECUTIVE_FAILURE_TIMES.incrementAndGet() > 
MAX_DISPATCHED_FAILED_TIMES) {
-                    ThreadUtils.sleep(10 * 1000L);
-                }
-                log.error("Dispatch Task: {} failed", 
defaultTaskExecuteRunnable.getTaskInstance().getName(), e);
+    void doDispatch() {
+        final TaskExecuteRunnable taskExecuteRunnable = 
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
+        TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance();
+        if (taskInstance == null) {
+            // This case shouldn't happen, but if it does, log an error and 
continue
+            log.error("The TaskInstance is null, drop it(This case shouldn't 
happen)");
+            return;
+        }
+        try {
+            TaskExecutionStatus status = taskInstance.getState();
+            if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status != 
TaskExecutionStatus.DELAY_EXECUTION) {
+                log.warn("The TaskInstance {} state is : {}, will not 
dispatch", taskInstance.getName(), status);
+                return;
             }
+            
taskDispatchFactory.getTaskDispatcher(taskInstance).dispatchTask(taskExecuteRunnable);
+        } catch (Exception e) {
+            // If dispatch failed, will put the task back to the queue
+            // The task will be dispatched after waiting time.
+            // the waiting time will increase multiple of times, but will not 
exceed 60 seconds
+            long waitingTimeMills = Math.max(
+                    
taskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 
1_000L, 60_000L);
+            
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable,
 waitingTimeMills);
+            log.error("Dispatch Task: {} failed will retry after: {}/ms", 
taskInstance.getName(), waitingTimeMills, e);
         }
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
deleted file mode 100644
index 255ec6c8ac..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-public abstract class PriorityDelayTaskExecuteRunnable extends 
BaseTaskExecuteRunnable implements Delayed {
-
-    public PriorityDelayTaskExecuteRunnable(ProcessInstance workflowInstance,
-                                            TaskInstance taskInstance,
-                                            TaskExecutionContext 
taskExecutionContext) {
-        super(workflowInstance, taskInstance, taskExecutionContext);
-    }
-
-    @Override
-    public long getDelay(TimeUnit unit) {
-        return unit.convert(
-                
DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
-                        taskExecutionContext.getDelayTime() * 60L),
-                TimeUnit.SECONDS);
-    }
-
-    @Override
-    public int compareTo(Delayed o) {
-        if (o == null) {
-            return 1;
-        }
-        int delayTimeCompareResult =
-                Long.compare(this.getDelay(TimeUnit.MILLISECONDS), 
o.getDelay(TimeUnit.MILLISECONDS));
-        if (delayTimeCompareResult != 0) {
-            return delayTimeCompareResult;
-        }
-        PriorityDelayTaskExecuteRunnable other = 
(PriorityDelayTaskExecuteRunnable) o;
-        // the smaller dispatch fail times, the higher priority
-        int dispatchFailTimesCompareResult = 
taskExecutionContext.getDispatchFailTimes()
-                - other.getTaskExecutionContext().getDispatchFailTimes();
-        if (dispatchFailTimesCompareResult != 0) {
-            return dispatchFailTimesCompareResult;
-        }
-        int workflowInstancePriorityCompareResult = 
workflowInstance.getProcessInstancePriority().getCode()
-                - 
other.getWorkflowInstance().getProcessInstancePriority().getCode();
-        if (workflowInstancePriorityCompareResult != 0) {
-            return workflowInstancePriorityCompareResult;
-        }
-        long workflowInstanceIdCompareResult = 
workflowInstance.getId().compareTo(other.getWorkflowInstance().getId());
-        if (workflowInstanceIdCompareResult != 0) {
-            return workflowInstancePriorityCompareResult;
-        }
-        int taskInstancePriorityCompareResult = 
taskInstance.getTaskInstancePriority().getCode()
-                - other.getTaskInstance().getTaskInstancePriority().getCode();
-        if (taskInstancePriorityCompareResult != 0) {
-            return taskInstancePriorityCompareResult;
-        }
-        // larger number, higher priority
-        int taskGroupPriorityCompareResult =
-                taskInstance.getTaskGroupPriority() - 
other.getTaskInstance().getTaskGroupPriority();
-        if (taskGroupPriorityCompareResult != 0) {
-            return -taskGroupPriorityCompareResult;
-        }
-        // The task instance shouldn't be equals
-        return taskInstance.getId().compareTo(other.getTaskInstance().getId());
-    }
-
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
index 8f66189617..62617f4aac 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
@@ -25,7 +25,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
  * This interface is used to define a task which is executing.
  * todo: split to MasterTaskExecuteRunnable and WorkerTaskExecuteRunnable
  */
-public interface TaskExecuteRunnable {
+public interface TaskExecuteRunnable extends Comparable<TaskExecuteRunnable> {
 
     void dispatch();
 
@@ -40,4 +40,5 @@ public interface TaskExecuteRunnable {
     TaskInstance getTaskInstance();
 
     TaskExecutionContext getTaskExecutionContext();
+
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
index 8fa2e2926d..72073359d3 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
@@ -17,14 +17,13 @@
 
 package org.apache.dolphinscheduler.server.master.runner.operator;
 
+import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
 
-import java.util.concurrent.TimeUnit;
-
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -43,16 +42,17 @@ public abstract class 
BaseTaskExecuteRunnableDispatchOperator implements TaskExe
 
     @Override
     public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) {
-        long remainTime = taskExecuteRunnable.getDelay(TimeUnit.SECONDS);
         TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance();
-        if (remainTime > 0) {
+        long remainTimeMills =
+                DateUtils.getRemainTime(taskInstance.getFirstSubmitTime(), 
taskInstance.getDelayTime() * 60L) * 1_000;
+        if (remainTimeMills > 0) {
             taskInstance.setState(TaskExecutionStatus.DELAY_EXECUTION);
             taskInstanceDao.updateById(taskInstance);
-            log.info("Current taskInstance: {} is choose delay execution, 
delay time: {}/min, remainTime: {}/s",
+            log.info("Current taskInstance: {} is choose delay execution, 
delay time: {}/min, remainTime: {}/ms",
                     taskInstance.getName(),
                     taskInstance.getDelayTime(),
-                    remainTime);
+                    remainTimeMills);
         }
-        
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(taskExecuteRunnable);
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable,
 remainTimeMills);
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
new file mode 100644
index 0000000000..da6a750261
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.queue;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Getter;
+
+import org.jetbrains.annotations.NotNull;
+
+public class DelayEntry<V extends Comparable<V>> implements Delayed {
+
+    private final long delayTimeMills;
+
+    private final long triggerTimeMills;
+
+    @Getter
+    private final V data;
+
+    public DelayEntry(long delayTimeMills, V data) {
+        this.delayTimeMills = delayTimeMills;
+        this.triggerTimeMills = System.currentTimeMillis() + delayTimeMills;
+        this.data = checkNotNull(data, "data is null");
+    }
+
+    @Override
+    public long getDelay(@NotNull TimeUnit unit) {
+        long remainTimeMills = triggerTimeMills - System.currentTimeMillis();
+        if (TimeUnit.MILLISECONDS.equals(unit)) {
+            return remainTimeMills;
+        }
+        return unit.convert(remainTimeMills, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int compareTo(@NotNull Delayed o) {
+        DelayEntry<V> other = (DelayEntry<V>) o;
+        int delayTimeMillsCompareResult = Long.compare(delayTimeMills, 
other.delayTimeMills);
+        if (delayTimeMillsCompareResult != 0) {
+            return delayTimeMillsCompareResult;
+        }
+
+        if (data == null || other.data == null) {
+            return 0;
+        }
+        return data.compareTo(other.data);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        DelayEntry<?> that = (DelayEntry<?>) o;
+        return delayTimeMills == that.delayTimeMills && Objects.equals(data, 
that.data);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(delayTimeMills, data);
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
similarity index 55%
copy from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
index 8f66189617..8ed4869625 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
@@ -15,29 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner;
+package org.apache.dolphinscheduler.server.master.runner.queue;
 
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import java.util.concurrent.DelayQueue;
 
-/**
- * This interface is used to define a task which is executing.
- * todo: split to MasterTaskExecuteRunnable and WorkerTaskExecuteRunnable
- */
-public interface TaskExecuteRunnable {
-
-    void dispatch();
+import lombok.SneakyThrows;
 
-    void kill();
+public class PriorityDelayQueue<V extends DelayEntry> {
 
-    void pause();
+    private final DelayQueue<V> queue = new DelayQueue<>();
 
-    void timeout();
+    public void add(V v) {
+        queue.put(v);
+    }
 
-    ProcessInstance getWorkflowInstance();
+    @SneakyThrows
+    public V take() {
+        return queue.take();
+    }
 
-    TaskInstance getTaskInstance();
+    public int size() {
+        return queue.size();
+    }
 
-    TaskExecutionContext getTaskExecutionContext();
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
new file mode 100644
index 0000000000..843456b98f
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import 
org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
+
+import org.apache.commons.lang3.time.DateUtils;
+
+import java.time.Duration;
+import java.util.Date;
+import java.util.concurrent.CompletableFuture;
+
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class GlobalTaskDispatchWaitingQueueTest {
+
+    private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
+
+    @BeforeEach
+    public void setUp() {
+        globalTaskDispatchWaitingQueue = new GlobalTaskDispatchWaitingQueue();
+    }
+
+    @Test
+    void submitTaskExecuteRunnable() {
+        TaskExecuteRunnable taskExecuteRunnable = createTaskExecuteRunnable();
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable);
+        Awaitility.await()
+                .atMost(Duration.ofSeconds(1))
+                .untilAsserted(
+                        () -> 
Assertions.assertNotNull(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()));
+    }
+
+    @Test
+    void testSubmitTaskExecuteRunnableWithDelay() {
+        TaskExecuteRunnable taskExecuteRunnable = createTaskExecuteRunnable();
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable,
 3_000L);
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable);
+
+        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).isNotNull();
+        Awaitility.await()
+                .atLeast(Duration.ofSeconds(2))
+                .atMost(Duration.ofSeconds(4))
+                .untilAsserted(
+                        () -> 
Assertions.assertNotNull(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()));
+    }
+
+    @Test
+    void takeTaskExecuteRunnable_NoElementShouldBlock() {
+        CompletableFuture<Void> completableFuture =
+                CompletableFuture.runAsync(() -> 
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable());
+        assertThrowsExactly(ConditionTimeoutException.class,
+                () -> await()
+                        .atLeast(Duration.ofSeconds(2))
+                        .timeout(Duration.ofSeconds(3))
+                        .until(completableFuture::isDone));
+    }
+
+    @Test
+    void takeTaskExecuteRunnable_withDifferentTaskInstancePriority() {
+        TaskExecuteRunnable taskExecuteRunnable1 = createTaskExecuteRunnable();
+        taskExecuteRunnable1.getTaskInstance().setId(1);
+        
taskExecuteRunnable1.getTaskInstance().setTaskInstancePriority(Priority.MEDIUM);
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable1);
+
+        TaskExecuteRunnable taskExecuteRunnable2 = createTaskExecuteRunnable();
+        taskExecuteRunnable2.getTaskInstance().setId(2);
+        
taskExecuteRunnable2.getTaskInstance().setTaskInstancePriority(Priority.HIGH);
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable2);
+
+        TaskExecuteRunnable taskExecuteRunnable3 = createTaskExecuteRunnable();
+        taskExecuteRunnable3.getTaskInstance().setId(3);
+        
taskExecuteRunnable3.getTaskInstance().setTaskInstancePriority(Priority.LOW);
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable3);
+
+        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+                .isEqualTo(2);
+        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+                .isEqualTo(1);
+        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+                .isEqualTo(3);
+    }
+
+    @Test
+    void takeTaskExecuteRunnable_withDifferentTaskGroupPriority() {
+        TaskExecuteRunnable taskExecuteRunnable1 = createTaskExecuteRunnable();
+        taskExecuteRunnable1.getTaskInstance().setId(1);
+        
taskExecuteRunnable1.getTaskInstance().setTaskGroupPriority(Priority.MEDIUM.getCode());
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable1);
+
+        TaskExecuteRunnable taskExecuteRunnable2 = createTaskExecuteRunnable();
+        taskExecuteRunnable2.getTaskInstance().setId(2);
+        
taskExecuteRunnable2.getTaskInstance().setTaskGroupPriority(Priority.HIGH.getCode());
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable2);
+
+        TaskExecuteRunnable taskExecuteRunnable3 = createTaskExecuteRunnable();
+        taskExecuteRunnable3.getTaskInstance().setId(3);
+        
taskExecuteRunnable3.getTaskInstance().setTaskGroupPriority(Priority.LOW.getCode());
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable3);
+
+        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+                .isEqualTo(3);
+        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+                .isEqualTo(1);
+        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+                .isEqualTo(2);
+    }
+
+    @Test
+    void takeTaskExecuteRunnable_withDifferentSubmitTime() {
+        Date now = new Date();
+
+        TaskExecuteRunnable taskExecuteRunnable1 = createTaskExecuteRunnable();
+        taskExecuteRunnable1.getTaskInstance().setId(1);
+        taskExecuteRunnable1.getTaskInstance().setFirstSubmitTime(now);
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable1);
+
+        TaskExecuteRunnable taskExecuteRunnable2 = createTaskExecuteRunnable();
+        taskExecuteRunnable2.getTaskInstance().setId(2);
+        
taskExecuteRunnable2.getTaskInstance().setFirstSubmitTime(DateUtils.addMinutes(now,
 1));
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable2);
+
+        TaskExecuteRunnable taskExecuteRunnable3 = createTaskExecuteRunnable();
+        taskExecuteRunnable3.getTaskInstance().setId(3);
+        
taskExecuteRunnable3.getTaskInstance().setFirstSubmitTime(DateUtils.addMinutes(now,
 -1));
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable3);
+
+        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+                .isEqualTo(3);
+        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+                .isEqualTo(1);
+        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+                .isEqualTo(2);
+    }
+
+    @Test
+    void getWaitingDispatchTaskNumber() {
+        Assertions.assertEquals(0, 
globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
+        TaskExecuteRunnable taskExecuteRunnable = createTaskExecuteRunnable();
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable);
+        Assertions.assertEquals(1, 
globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
+    }
+
+    private TaskExecuteRunnable createTaskExecuteRunnable() {
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setProcessInstancePriority(Priority.MEDIUM);
+
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setTaskInstancePriority(Priority.MEDIUM);
+        taskInstance.setFirstSubmitTime(new Date());
+
+        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+
+        return new DefaultTaskExecuteRunnable(processInstance, taskInstance, 
taskExecutionContext,
+                new TaskExecuteRunnableOperatorManager());
+    }
+}
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
deleted file mode 100644
index 778884e066..0000000000
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.execute;
-
-import org.apache.dolphinscheduler.common.enums.Priority;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.PriorityDelayTaskExecuteRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class PriorityDelayTaskExecuteRunnableTest {
-
-    @Test
-    public void testCompareTo() {
-        TaskExecuteRunnableOperatorManager taskOperatorManager = new 
TaskExecuteRunnableOperatorManager();
-
-        ProcessInstance workflowInstance = new ProcessInstance();
-        workflowInstance.setId(1);
-        workflowInstance.setProcessInstancePriority(Priority.HIGH);
-
-        TaskInstance t1 = new TaskInstance();
-        t1.setId(1);
-        t1.setTaskInstancePriority(Priority.HIGH);
-
-        TaskInstance t2 = new TaskInstance();
-        t2.setId(1);
-        t2.setTaskInstancePriority(Priority.HIGH);
-
-        TaskExecutionContext context1 = new TaskExecutionContext();
-        TaskExecutionContext context2 = new TaskExecutionContext();
-        PriorityDelayTaskExecuteRunnable p1 =
-                new DefaultTaskExecuteRunnable(workflowInstance, t1, context1, 
taskOperatorManager);
-        PriorityDelayTaskExecuteRunnable p2 =
-                new DefaultTaskExecuteRunnable(workflowInstance, t2, context2, 
taskOperatorManager);
-
-        Assertions.assertEquals(0, p1.compareTo(p2));
-
-        // the higher priority, the higher priority
-        t2.setTaskInstancePriority(Priority.MEDIUM);
-        Assertions.assertTrue(p1.compareTo(p2) < 0);
-
-        // the smaller dispatch fail times, the higher priority
-        context1.setDispatchFailTimes(1);
-        Assertions.assertTrue(p1.compareTo(p2) > 0);
-    }
-
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
similarity index 55%
copy from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
copy to 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
index 8f66189617..0e3ab5da36 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
@@ -15,29 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner;
+package org.apache.dolphinscheduler.server.master.runner.queue;
 
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import java.util.concurrent.TimeUnit;
 
-/**
- * This interface is used to define a task which is executing.
- * todo: split to MasterTaskExecuteRunnable and WorkerTaskExecuteRunnable
- */
-public interface TaskExecuteRunnable {
-
-    void dispatch();
-
-    void kill();
-
-    void pause();
-
-    void timeout();
+import org.junit.jupiter.api.Test;
 
-    ProcessInstance getWorkflowInstance();
+import com.google.common.truth.Truth;
 
-    TaskInstance getTaskInstance();
+class DelayEntryTest {
 
-    TaskExecutionContext getTaskExecutionContext();
+    @Test
+    void getDelay() {
+        DelayEntry<String> delayEntry = new DelayEntry<>(1_000L, "Item");
+        Truth.assertThat(delayEntry.getDelay(TimeUnit.NANOSECONDS))
+                .isWithin(100)
+                .of(TimeUnit.NANOSECONDS.convert(1_000L, 
TimeUnit.MILLISECONDS));
+    }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index 4304ad2c13..b825949e88 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -203,11 +203,6 @@ public class TaskExecutionContext implements Serializable {
      */
     private String workerGroup;
 
-    /**
-     * delay execution time.
-     */
-    private int delayTime;
-
     /**
      * current execution status
      */
@@ -262,12 +257,9 @@ public class TaskExecutionContext implements Serializable {
 
     private boolean logBufferEnable;
 
-    /**
-     * dispatch fail times
-     */
     private int dispatchFailTimes;
 
-    public void increaseDispatchFailTimes() {
-        this.dispatchFailTimes++;
+    public int increaseDispatchFailTimes() {
+        return ++dispatchFailTimes;
     }
 }


Reply via email to