This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new e6c57430e3 [DSIP-44] Set a delay time to TaskExecuteRunnable if it
dispatched failed (#16069)
e6c57430e3 is described below
commit e6c57430e33df31b56bb0f1a4ecfaecd04e6cfc9
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon May 27 21:53:11 2024 +0800
[DSIP-44] Set a delay time to TaskExecuteRunnable if it dispatched failed
(#16069)
---
.../builder/TaskExecutionContextBuilder.java | 1 -
.../master/runner/BaseTaskExecuteRunnable.java | 28 ++++
.../master/runner/DefaultTaskExecuteRunnable.java | 2 +-
.../runner/GlobalTaskDispatchWaitingQueue.java | 35 +++-
.../GlobalTaskDispatchWaitingQueueLooper.java | 53 +++---
.../runner/PriorityDelayTaskExecuteRunnable.java | 85 ----------
.../server/master/runner/TaskExecuteRunnable.java | 3 +-
.../BaseTaskExecuteRunnableDispatchOperator.java | 14 +-
.../server/master/runner/queue/DelayEntry.java | 82 +++++++++
.../PriorityDelayQueue.java} | 32 ++--
.../runner/GlobalTaskDispatchWaitingQueueTest.java | 184 +++++++++++++++++++++
.../PriorityDelayTaskExecuteRunnableTest.java | 67 --------
.../master/runner/queue/DelayEntryTest.java} | 32 ++--
.../plugin/task/api/TaskExecutionContext.java | 12 +-
14 files changed, 385 insertions(+), 245 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java
index 832c1b336b..5990a53e0f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java
@@ -66,7 +66,6 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
taskExecutionContext.setEnvironmentConfig(taskInstance.getEnvironmentConfig());
taskExecutionContext.setHost(taskInstance.getHost());
- taskExecutionContext.setDelayTime(taskInstance.getDelayTime());
taskExecutionContext.setVarPool(taskInstance.getVarPool());
taskExecutionContext.setDryRun(taskInstance.getDryRun());
taskExecutionContext.setTestFlag(taskInstance.getTestFlag());
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
index fefdbf3493..2e41173c4b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
@@ -52,4 +52,32 @@ public abstract class BaseTaskExecuteRunnable implements
TaskExecuteRunnable {
return taskExecutionContext;
}
+ @Override
+ public int compareTo(TaskExecuteRunnable other) {
+ if (other == null) {
+ return 1;
+ }
+ int workflowInstancePriorityCompareResult =
workflowInstance.getProcessInstancePriority().getCode() -
+
other.getWorkflowInstance().getProcessInstancePriority().getCode();
+ if (workflowInstancePriorityCompareResult != 0) {
+ return workflowInstancePriorityCompareResult;
+ }
+
+ // smaller number, higher priority
+ int taskInstancePriorityCompareResult =
taskInstance.getTaskInstancePriority().getCode()
+ - other.getTaskInstance().getTaskInstancePriority().getCode();
+ if (taskInstancePriorityCompareResult != 0) {
+ return taskInstancePriorityCompareResult;
+ }
+
+ // larger number, higher priority
+ int taskGroupPriorityCompareResult =
+ taskInstance.getTaskGroupPriority() -
other.getTaskInstance().getTaskGroupPriority();
+ if (taskGroupPriorityCompareResult != 0) {
+ return -taskGroupPriorityCompareResult;
+ }
+ // earlier submit time, higher priority
+ return
taskInstance.getFirstSubmitTime().compareTo(other.getTaskInstance().getFirstSubmitTime());
+ }
+
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
index c1b13717bd..ba4f447216 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import
org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
-public class DefaultTaskExecuteRunnable extends
PriorityDelayTaskExecuteRunnable {
+public class DefaultTaskExecuteRunnable extends BaseTaskExecuteRunnable {
private final TaskExecuteRunnableOperatorManager
taskExecuteRunnableOperatorManager;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
index f03bd6b903..21d6c890f5 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
@@ -17,7 +17,8 @@
package org.apache.dolphinscheduler.server.master.runner;
-import java.util.concurrent.DelayQueue;
+import org.apache.dolphinscheduler.server.master.runner.queue.DelayEntry;
+import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -25,26 +26,42 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
- * The class is used to store {@link TaskExecuteRunnable} which needs to be
dispatched. The {@link TaskExecuteRunnable} will be stored in a {@link
DelayQueue},
- * if the {@link TaskExecuteRunnable}'s delay time is 0, then it will be
consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
+ * The class is used to store {@link TaskExecuteRunnable} which needs to be
dispatched. The {@link TaskExecuteRunnable}
+ * will be stored in {@link PriorityDelayQueue}, if the {@link
TaskExecuteRunnable}'s delay time is 0, then it will be
+ * consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
+ * <p>
+ * The order of {@link TaskExecuteRunnable} in the {@link PriorityDelayQueue}
is determined by {@link TaskExecuteRunnable#compareTo}.
*/
@Slf4j
@Component
public class GlobalTaskDispatchWaitingQueue {
- private final DelayQueue<DefaultTaskExecuteRunnable> queue = new
DelayQueue<>();
+ private final PriorityDelayQueue<DelayEntry<TaskExecuteRunnable>>
priorityDelayQueue = new PriorityDelayQueue<>();
- public void submitTaskExecuteRunnable(DefaultTaskExecuteRunnable
priorityTaskExecuteRunnable) {
- queue.put(priorityTaskExecuteRunnable);
+ /**
+ * Submit a {@link TaskExecuteRunnable} with delay time 0, it will be
consumed immediately.
+ */
+ public void dispatchTaskExecuteRunnable(TaskExecuteRunnable
taskExecuteRunnable) {
+ dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable, 0);
}
+ /**
+ * Submit a {@link TaskExecuteRunnable} with delay time, if the delay time
<= 0 then it can be consumed.
+ */
+ public void dispatchTaskExecuteRunnableWithDelay(TaskExecuteRunnable
taskExecuteRunnable, long delayTimeMills) {
+ priorityDelayQueue.add(new DelayEntry<>(delayTimeMills,
taskExecuteRunnable));
+ }
+
+ /**
+ * Consume {@link TaskExecuteRunnable} from the {@link
PriorityDelayQueue}, only the delay time <= 0 can be consumed.
+ */
@SneakyThrows
- public DefaultTaskExecuteRunnable takeTaskExecuteRunnable() {
- return queue.take();
+ public TaskExecuteRunnable takeTaskExecuteRunnable() {
+ return priorityDelayQueue.take().getData();
}
public int getWaitingDispatchTaskNumber() {
- return queue.size();
+ return priorityDelayQueue.size();
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
index eabbdd8e10..5cfb285c28 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
@@ -18,13 +18,11 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import
org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatchFactory;
-import
org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
@@ -43,10 +41,6 @@ public class GlobalTaskDispatchWaitingQueueLooper extends
BaseDaemonThread imple
private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
- private final AtomicInteger DISPATCHED_CONSECUTIVE_FAILURE_TIMES = new
AtomicInteger();
-
- private static final Integer MAX_DISPATCHED_FAILED_TIMES = 100;
-
public GlobalTaskDispatchWaitingQueueLooper() {
super("GlobalTaskDispatchWaitingQueueLooper");
}
@@ -64,29 +58,34 @@ public class GlobalTaskDispatchWaitingQueueLooper extends
BaseDaemonThread imple
@Override
public void run() {
- DefaultTaskExecuteRunnable defaultTaskExecuteRunnable;
while (RUNNING_FLAG.get()) {
- defaultTaskExecuteRunnable =
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
- try {
- TaskExecutionStatus status =
defaultTaskExecuteRunnable.getTaskInstance().getState();
- if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status
!= TaskExecutionStatus.DELAY_EXECUTION) {
- log.warn("The TaskInstance {} state is : {}, will not
dispatch",
-
defaultTaskExecuteRunnable.getTaskInstance().getName(), status);
- continue;
- }
+ doDispatch();
+ }
+ }
- TaskDispatcher taskDispatcher =
-
taskDispatchFactory.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance());
- taskDispatcher.dispatchTask(defaultTaskExecuteRunnable);
- DISPATCHED_CONSECUTIVE_FAILURE_TIMES.set(0);
- } catch (Exception e) {
-
defaultTaskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes();
-
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(defaultTaskExecuteRunnable);
- if (DISPATCHED_CONSECUTIVE_FAILURE_TIMES.incrementAndGet() >
MAX_DISPATCHED_FAILED_TIMES) {
- ThreadUtils.sleep(10 * 1000L);
- }
- log.error("Dispatch Task: {} failed",
defaultTaskExecuteRunnable.getTaskInstance().getName(), e);
+ void doDispatch() {
+ final TaskExecuteRunnable taskExecuteRunnable =
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
+ TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance();
+ if (taskInstance == null) {
+ // This case shouldn't happen, but if it does, log an error and
continue
+ log.error("The TaskInstance is null, drop it(This case shouldn't
happen)");
+ return;
+ }
+ try {
+ TaskExecutionStatus status = taskInstance.getState();
+ if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status !=
TaskExecutionStatus.DELAY_EXECUTION) {
+ log.warn("The TaskInstance {} state is : {}, will not
dispatch", taskInstance.getName(), status);
+ return;
}
+
taskDispatchFactory.getTaskDispatcher(taskInstance).dispatchTask(taskExecuteRunnable);
+ } catch (Exception e) {
+ // If dispatch failed, will put the task back to the queue
+ // The task will be dispatched after waiting time.
+ // the waiting time will increase multiple of times, but will not
exceed 60 seconds
+ long waitingTimeMills = Math.max(
+
taskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes() *
1_000L, 60_000L);
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable,
waitingTimeMills);
+ log.error("Dispatch Task: {} failed will retry after: {}/ms",
taskInstance.getName(), waitingTimeMills, e);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
deleted file mode 100644
index 255ec6c8ac..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-public abstract class PriorityDelayTaskExecuteRunnable extends
BaseTaskExecuteRunnable implements Delayed {
-
- public PriorityDelayTaskExecuteRunnable(ProcessInstance workflowInstance,
- TaskInstance taskInstance,
- TaskExecutionContext
taskExecutionContext) {
- super(workflowInstance, taskInstance, taskExecutionContext);
- }
-
- @Override
- public long getDelay(TimeUnit unit) {
- return unit.convert(
-
DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
- taskExecutionContext.getDelayTime() * 60L),
- TimeUnit.SECONDS);
- }
-
- @Override
- public int compareTo(Delayed o) {
- if (o == null) {
- return 1;
- }
- int delayTimeCompareResult =
- Long.compare(this.getDelay(TimeUnit.MILLISECONDS),
o.getDelay(TimeUnit.MILLISECONDS));
- if (delayTimeCompareResult != 0) {
- return delayTimeCompareResult;
- }
- PriorityDelayTaskExecuteRunnable other =
(PriorityDelayTaskExecuteRunnable) o;
- // the smaller dispatch fail times, the higher priority
- int dispatchFailTimesCompareResult =
taskExecutionContext.getDispatchFailTimes()
- - other.getTaskExecutionContext().getDispatchFailTimes();
- if (dispatchFailTimesCompareResult != 0) {
- return dispatchFailTimesCompareResult;
- }
- int workflowInstancePriorityCompareResult =
workflowInstance.getProcessInstancePriority().getCode()
- -
other.getWorkflowInstance().getProcessInstancePriority().getCode();
- if (workflowInstancePriorityCompareResult != 0) {
- return workflowInstancePriorityCompareResult;
- }
- long workflowInstanceIdCompareResult =
workflowInstance.getId().compareTo(other.getWorkflowInstance().getId());
- if (workflowInstanceIdCompareResult != 0) {
- return workflowInstancePriorityCompareResult;
- }
- int taskInstancePriorityCompareResult =
taskInstance.getTaskInstancePriority().getCode()
- - other.getTaskInstance().getTaskInstancePriority().getCode();
- if (taskInstancePriorityCompareResult != 0) {
- return taskInstancePriorityCompareResult;
- }
- // larger number, higher priority
- int taskGroupPriorityCompareResult =
- taskInstance.getTaskGroupPriority() -
other.getTaskInstance().getTaskGroupPriority();
- if (taskGroupPriorityCompareResult != 0) {
- return -taskGroupPriorityCompareResult;
- }
- // The task instance shouldn't be equals
- return taskInstance.getId().compareTo(other.getTaskInstance().getId());
- }
-
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
index 8f66189617..62617f4aac 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
@@ -25,7 +25,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
* This interface is used to define a task which is executing.
* todo: split to MasterTaskExecuteRunnable and WorkerTaskExecuteRunnable
*/
-public interface TaskExecuteRunnable {
+public interface TaskExecuteRunnable extends Comparable<TaskExecuteRunnable> {
void dispatch();
@@ -40,4 +40,5 @@ public interface TaskExecuteRunnable {
TaskInstance getTaskInstance();
TaskExecutionContext getTaskExecutionContext();
+
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
index 8fa2e2926d..72073359d3 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
@@ -17,14 +17,13 @@
package org.apache.dolphinscheduler.server.master.runner.operator;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
-import java.util.concurrent.TimeUnit;
-
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -43,16 +42,17 @@ public abstract class
BaseTaskExecuteRunnableDispatchOperator implements TaskExe
@Override
public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) {
- long remainTime = taskExecuteRunnable.getDelay(TimeUnit.SECONDS);
TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance();
- if (remainTime > 0) {
+ long remainTimeMills =
+ DateUtils.getRemainTime(taskInstance.getFirstSubmitTime(),
taskInstance.getDelayTime() * 60L) * 1_000;
+ if (remainTimeMills > 0) {
taskInstance.setState(TaskExecutionStatus.DELAY_EXECUTION);
taskInstanceDao.updateById(taskInstance);
- log.info("Current taskInstance: {} is choose delay execution,
delay time: {}/min, remainTime: {}/s",
+ log.info("Current taskInstance: {} is choose delay execution,
delay time: {}/min, remainTime: {}/ms",
taskInstance.getName(),
taskInstance.getDelayTime(),
- remainTime);
+ remainTimeMills);
}
-
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(taskExecuteRunnable);
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable,
remainTimeMills);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
new file mode 100644
index 0000000000..da6a750261
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.queue;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Getter;
+
+import org.jetbrains.annotations.NotNull;
+
+public class DelayEntry<V extends Comparable<V>> implements Delayed {
+
+ private final long delayTimeMills;
+
+ private final long triggerTimeMills;
+
+ @Getter
+ private final V data;
+
+ public DelayEntry(long delayTimeMills, V data) {
+ this.delayTimeMills = delayTimeMills;
+ this.triggerTimeMills = System.currentTimeMillis() + delayTimeMills;
+ this.data = checkNotNull(data, "data is null");
+ }
+
+ @Override
+ public long getDelay(@NotNull TimeUnit unit) {
+ long remainTimeMills = triggerTimeMills - System.currentTimeMillis();
+ if (TimeUnit.MILLISECONDS.equals(unit)) {
+ return remainTimeMills;
+ }
+ return unit.convert(remainTimeMills, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(@NotNull Delayed o) {
+ DelayEntry<V> other = (DelayEntry<V>) o;
+ int delayTimeMillsCompareResult = Long.compare(delayTimeMills,
other.delayTimeMills);
+ if (delayTimeMillsCompareResult != 0) {
+ return delayTimeMillsCompareResult;
+ }
+
+ if (data == null || other.data == null) {
+ return 0;
+ }
+ return data.compareTo(other.data);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ DelayEntry<?> that = (DelayEntry<?>) o;
+ return delayTimeMills == that.delayTimeMills && Objects.equals(data,
that.data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(delayTimeMills, data);
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
similarity index 55%
copy from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
index 8f66189617..8ed4869625 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
@@ -15,29 +15,27 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner;
+package org.apache.dolphinscheduler.server.master.runner.queue;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import java.util.concurrent.DelayQueue;
-/**
- * This interface is used to define a task which is executing.
- * todo: split to MasterTaskExecuteRunnable and WorkerTaskExecuteRunnable
- */
-public interface TaskExecuteRunnable {
-
- void dispatch();
+import lombok.SneakyThrows;
- void kill();
+public class PriorityDelayQueue<V extends DelayEntry> {
- void pause();
+ private final DelayQueue<V> queue = new DelayQueue<>();
- void timeout();
+ public void add(V v) {
+ queue.put(v);
+ }
- ProcessInstance getWorkflowInstance();
+ @SneakyThrows
+ public V take() {
+ return queue.take();
+ }
- TaskInstance getTaskInstance();
+ public int size() {
+ return queue.size();
+ }
- TaskExecutionContext getTaskExecutionContext();
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
new file mode 100644
index 0000000000..843456b98f
--- /dev/null
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import
org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
+
+import org.apache.commons.lang3.time.DateUtils;
+
+import java.time.Duration;
+import java.util.Date;
+import java.util.concurrent.CompletableFuture;
+
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class GlobalTaskDispatchWaitingQueueTest {
+
+ private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
+
+ @BeforeEach
+ public void setUp() {
+ globalTaskDispatchWaitingQueue = new GlobalTaskDispatchWaitingQueue();
+ }
+
+ @Test
+ void submitTaskExecuteRunnable() {
+ TaskExecuteRunnable taskExecuteRunnable = createTaskExecuteRunnable();
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable);
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(
+ () ->
Assertions.assertNotNull(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()));
+ }
+
+ @Test
+ void testSubmitTaskExecuteRunnableWithDelay() {
+ TaskExecuteRunnable taskExecuteRunnable = createTaskExecuteRunnable();
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable,
3_000L);
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable);
+
+
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).isNotNull();
+ Awaitility.await()
+ .atLeast(Duration.ofSeconds(2))
+ .atMost(Duration.ofSeconds(4))
+ .untilAsserted(
+ () ->
Assertions.assertNotNull(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()));
+ }
+
+ @Test
+ void takeTaskExecuteRunnable_NoElementShouldBlock() {
+ CompletableFuture<Void> completableFuture =
+ CompletableFuture.runAsync(() ->
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable());
+ assertThrowsExactly(ConditionTimeoutException.class,
+ () -> await()
+ .atLeast(Duration.ofSeconds(2))
+ .timeout(Duration.ofSeconds(3))
+ .until(completableFuture::isDone));
+ }
+
+ @Test
+ void takeTaskExecuteRunnable_withDifferentTaskInstancePriority() {
+ TaskExecuteRunnable taskExecuteRunnable1 = createTaskExecuteRunnable();
+ taskExecuteRunnable1.getTaskInstance().setId(1);
+
taskExecuteRunnable1.getTaskInstance().setTaskInstancePriority(Priority.MEDIUM);
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable1);
+
+ TaskExecuteRunnable taskExecuteRunnable2 = createTaskExecuteRunnable();
+ taskExecuteRunnable2.getTaskInstance().setId(2);
+
taskExecuteRunnable2.getTaskInstance().setTaskInstancePriority(Priority.HIGH);
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable2);
+
+ TaskExecuteRunnable taskExecuteRunnable3 = createTaskExecuteRunnable();
+ taskExecuteRunnable3.getTaskInstance().setId(3);
+
taskExecuteRunnable3.getTaskInstance().setTaskInstancePriority(Priority.LOW);
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable3);
+
+
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+ .isEqualTo(2);
+
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+ .isEqualTo(1);
+
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+ .isEqualTo(3);
+ }
+
+ @Test
+ void takeTaskExecuteRunnable_withDifferentTaskGroupPriority() {
+ TaskExecuteRunnable taskExecuteRunnable1 = createTaskExecuteRunnable();
+ taskExecuteRunnable1.getTaskInstance().setId(1);
+
taskExecuteRunnable1.getTaskInstance().setTaskGroupPriority(Priority.MEDIUM.getCode());
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable1);
+
+ TaskExecuteRunnable taskExecuteRunnable2 = createTaskExecuteRunnable();
+ taskExecuteRunnable2.getTaskInstance().setId(2);
+
taskExecuteRunnable2.getTaskInstance().setTaskGroupPriority(Priority.HIGH.getCode());
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable2);
+
+ TaskExecuteRunnable taskExecuteRunnable3 = createTaskExecuteRunnable();
+ taskExecuteRunnable3.getTaskInstance().setId(3);
+
taskExecuteRunnable3.getTaskInstance().setTaskGroupPriority(Priority.LOW.getCode());
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable3);
+
+
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+ .isEqualTo(3);
+
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+ .isEqualTo(1);
+
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+ .isEqualTo(2);
+ }
+
+ @Test
+ void takeTaskExecuteRunnable_withDifferentSubmitTime() {
+ Date now = new Date();
+
+ TaskExecuteRunnable taskExecuteRunnable1 = createTaskExecuteRunnable();
+ taskExecuteRunnable1.getTaskInstance().setId(1);
+ taskExecuteRunnable1.getTaskInstance().setFirstSubmitTime(now);
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable1);
+
+ TaskExecuteRunnable taskExecuteRunnable2 = createTaskExecuteRunnable();
+ taskExecuteRunnable2.getTaskInstance().setId(2);
+
taskExecuteRunnable2.getTaskInstance().setFirstSubmitTime(DateUtils.addMinutes(now,
1));
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable2);
+
+ TaskExecuteRunnable taskExecuteRunnable3 = createTaskExecuteRunnable();
+ taskExecuteRunnable3.getTaskInstance().setId(3);
+
taskExecuteRunnable3.getTaskInstance().setFirstSubmitTime(DateUtils.addMinutes(now,
-1));
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable3);
+
+
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+ .isEqualTo(3);
+
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+ .isEqualTo(1);
+
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
+ .isEqualTo(2);
+ }
+
+ @Test
+ void getWaitingDispatchTaskNumber() {
+ Assertions.assertEquals(0,
globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
+ TaskExecuteRunnable taskExecuteRunnable = createTaskExecuteRunnable();
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable);
+ Assertions.assertEquals(1,
globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
+ }
+
+ private TaskExecuteRunnable createTaskExecuteRunnable() {
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setProcessInstancePriority(Priority.MEDIUM);
+
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setTaskInstancePriority(Priority.MEDIUM);
+ taskInstance.setFirstSubmitTime(new Date());
+
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+
+ return new DefaultTaskExecuteRunnable(processInstance, taskInstance,
taskExecutionContext,
+ new TaskExecuteRunnableOperatorManager());
+ }
+}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
deleted file mode 100644
index 778884e066..0000000000
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.execute;
-
-import org.apache.dolphinscheduler.common.enums.Priority;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.PriorityDelayTaskExecuteRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class PriorityDelayTaskExecuteRunnableTest {
-
- @Test
- public void testCompareTo() {
- TaskExecuteRunnableOperatorManager taskOperatorManager = new
TaskExecuteRunnableOperatorManager();
-
- ProcessInstance workflowInstance = new ProcessInstance();
- workflowInstance.setId(1);
- workflowInstance.setProcessInstancePriority(Priority.HIGH);
-
- TaskInstance t1 = new TaskInstance();
- t1.setId(1);
- t1.setTaskInstancePriority(Priority.HIGH);
-
- TaskInstance t2 = new TaskInstance();
- t2.setId(1);
- t2.setTaskInstancePriority(Priority.HIGH);
-
- TaskExecutionContext context1 = new TaskExecutionContext();
- TaskExecutionContext context2 = new TaskExecutionContext();
- PriorityDelayTaskExecuteRunnable p1 =
- new DefaultTaskExecuteRunnable(workflowInstance, t1, context1,
taskOperatorManager);
- PriorityDelayTaskExecuteRunnable p2 =
- new DefaultTaskExecuteRunnable(workflowInstance, t2, context2,
taskOperatorManager);
-
- Assertions.assertEquals(0, p1.compareTo(p2));
-
- // the higher priority, the higher priority
- t2.setTaskInstancePriority(Priority.MEDIUM);
- Assertions.assertTrue(p1.compareTo(p2) < 0);
-
- // the smaller dispatch fail times, the higher priority
- context1.setDispatchFailTimes(1);
- Assertions.assertTrue(p1.compareTo(p2) > 0);
- }
-
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
similarity index 55%
copy from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
copy to
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
index 8f66189617..0e3ab5da36 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
@@ -15,29 +15,21 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner;
+package org.apache.dolphinscheduler.server.master.runner.queue;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import java.util.concurrent.TimeUnit;
-/**
- * This interface is used to define a task which is executing.
- * todo: split to MasterTaskExecuteRunnable and WorkerTaskExecuteRunnable
- */
-public interface TaskExecuteRunnable {
-
- void dispatch();
-
- void kill();
-
- void pause();
-
- void timeout();
+import org.junit.jupiter.api.Test;
- ProcessInstance getWorkflowInstance();
+import com.google.common.truth.Truth;
- TaskInstance getTaskInstance();
+class DelayEntryTest {
- TaskExecutionContext getTaskExecutionContext();
+ @Test
+ void getDelay() {
+ DelayEntry<String> delayEntry = new DelayEntry<>(1_000L, "Item");
+ Truth.assertThat(delayEntry.getDelay(TimeUnit.NANOSECONDS))
+ .isWithin(100)
+ .of(TimeUnit.NANOSECONDS.convert(1_000L,
TimeUnit.MILLISECONDS));
+ }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index 4304ad2c13..b825949e88 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -203,11 +203,6 @@ public class TaskExecutionContext implements Serializable {
*/
private String workerGroup;
- /**
- * delay execution time.
- */
- private int delayTime;
-
/**
* current execution status
*/
@@ -262,12 +257,9 @@ public class TaskExecutionContext implements Serializable {
private boolean logBufferEnable;
- /**
- * dispatch fail times
- */
private int dispatchFailTimes;
- public void increaseDispatchFailTimes() {
- this.dispatchFailTimes++;
+ public int increaseDispatchFailTimes() {
+ return ++dispatchFailTimes;
}
}