This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b12b682d3f [Improvement-17795][Master] Add dispatch timeout checking
logic to handle cases where the worker group does not exist or no workers are
available (#17796)
b12b682d3f is described below
commit b12b682d3fde9ea0b7af6afdb851b5da6d772832
Author: njnu-seafish <[email protected]>
AuthorDate: Mon Feb 2 21:39:59 2026 +0800
[Improvement-17795][Master] Add dispatch timeout checking logic to handle
cases where the worker group does not exist or no workers are available (#17796)
---
docs/docs/en/architecture/configuration.md | 2 +
docs/docs/zh/architecture/configuration.md | 2 +
.../server/master/config/MasterConfig.java | 15 ++
.../server/master/config/TaskDispatchPolicy.java | 43 ++++
.../PhysicalTaskExecutorClientDelegator.java | 21 +-
.../task/dispatcher/WorkerGroupDispatcher.java | 59 ++++-
.../WorkerGroupDispatcherCoordinator.java | 9 +-
.../dispatch/NoAvailableWorkerException.java} | 16 +-
.../server/master/utils/ExceptionUtils.java | 10 +
.../src/main/resources/application.yaml | 5 +
.../server/master/config/MasterConfigTest.java | 11 +
.../WorkerGroupDispatcherCoordinatorTest.java | 12 +-
.../task/dispatcher/WorkerGroupDispatcherTest.java | 244 ++++++++++++++++++++-
.../integration/cases/WorkflowStartTestCase.java | 159 ++++++++++++++
.../src/test/resources/application.yaml | 5 +
.../start/workflow_with_no_available_worker.yaml | 61 ++++++
.../workflow_with_worker_group_not_found.yaml | 61 ++++++
.../plugin/task/api/TaskExecutionContext.java | 2 +
18 files changed, 707 insertions(+), 30 deletions(-)
diff --git a/docs/docs/en/architecture/configuration.md
b/docs/docs/en/architecture/configuration.md
index b0a5548e7b..27c289bcec 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -291,6 +291,8 @@ Location: `master-server/conf/application.yaml`
| master.command-fetch-strategy.type
| ID_SLOT_BASED | The command fetch strategy, only support
`ID_SLOT_BASED`
|
| master.command-fetch-strategy.config.id-step
| 1 | The id auto incremental step of t_ds_command
in db
|
| master.command-fetch-strategy.config.fetch-size
| 10 | The number of commands fetched by master
|
+| master.task-dispatch-policy.dispatch-timeout-enabled
| false | Indicates whether the dispatch timeout
checking mechanism is enabled
|
+| master.task-dispatch-policy.max-task-dispatch-duration
| 1h | The maximum allowed duration a task may wait
in the dispatch queue before being assigned to a worker
|
### Worker Server related configuration
diff --git a/docs/docs/zh/architecture/configuration.md
b/docs/docs/zh/architecture/configuration.md
index 0c4973d32c..80fed042f1 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -298,6 +298,8 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
| master.command-fetch-strategy.type
| ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED`
|
| master.command-fetch-strategy.config.id-step
| 1 | 数据库中t_ds_command的id自增步长
|
| master.command-fetch-strategy.config.fetch-size
| 10 | master拉取command数量
|
+| master.task-dispatch-policy.dispatch-timeout-enabled
| false | 是否开启master分派超时检测功能
|
+| master.task-dispatch-policy.max-task-dispatch-duration
| 1h | master分派检测的超时时长,默认为一小时
|
## Worker Server相关配置
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 220062bd57..b486b38092 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -74,6 +74,8 @@ public class MasterConfig implements Validator {
*/
private String masterRegistryPath;
+ private TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+
@Override
public boolean supports(Class<?> clazz) {
return MasterConfig.class.isAssignableFrom(clazz);
@@ -97,6 +99,18 @@ public class MasterConfig implements Validator {
if (masterConfig.getWorkerGroupRefreshInterval().getSeconds() < 10) {
errors.rejectValue("worker-group-refresh-interval", null, "should
>= 10s");
}
+
+ TaskDispatchPolicy dispatchPolicy =
masterConfig.getTaskDispatchPolicy();
+ if (dispatchPolicy.isDispatchTimeoutEnabled()) {
+ if (dispatchPolicy.getMaxTaskDispatchDuration() == null) {
+
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
+ "must be specified when dispatch timeout checker is
enabled");
+ } else if (dispatchPolicy.getMaxTaskDispatchDuration().toMillis()
<= 0) {
+
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
+ "must be a positive duration (e.g., '10m', '30m',
'1h')");
+ }
+ }
+
if (StringUtils.isEmpty(masterConfig.getMasterAddress())) {
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
}
@@ -122,6 +136,7 @@ public class MasterConfig implements Validator {
"\n command-fetch-strategy: " + commandFetchStrategy +
"\n worker-load-balancer-configuration-properties: "
+ workerLoadBalancerConfigurationProperties +
+ "\n taskDispatchPolicy: " + taskDispatchPolicy +
"\n****************************Master
Configuration**************************************";
log.info(config);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/TaskDispatchPolicy.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/TaskDispatchPolicy.java
new file mode 100644
index 0000000000..3c93dc5d2b
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/TaskDispatchPolicy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.config;
+
+import java.time.Duration;
+
+import lombok.Data;
+
+/**
+ * Configuration for the master's task dispatch policy.
+ * When enabled, tasks that remain in the dispatch queue longer than
+ * {@link #maxTaskDispatchDuration} will be marked as failed to prevent
indefinite queuing.
+ */
+@Data
+public class TaskDispatchPolicy {
+
+ /**
+ * Indicates whether the dispatch timeout checking mechanism is enabled.
+ */
+ private boolean dispatchTimeoutEnabled = false;
+
+ /**
+ * The maximum allowed duration a task may wait in the dispatch queue
before being assigned to a worker.
+ * Tasks that exceed this duration will be marked as failed.
+ * Examples: {@code "10m"}, {@code "30m"}, {@code "1h"}.
+ */
+ private Duration maxTaskDispatchDuration;
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
index f7566d3133..96331b2dca 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
@@ -24,10 +24,13 @@ import
org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import
org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.cluster.ClusterManager;
import
org.apache.dolphinscheduler.server.master.cluster.loadbalancer.IWorkerLoadBalancer;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
import
org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
+import
org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
import
org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorLifecycleEventReporter;
import
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchRequest;
import
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchResponse;
@@ -55,18 +58,26 @@ public class PhysicalTaskExecutorClientDelegator implements
ITaskExecutorClientD
@Autowired
private IWorkerLoadBalancer workerLoadBalancer;
+ @Autowired
+ private ClusterManager clusterManager;
+
@Override
public void dispatch(final ITaskExecutionRunnable taskExecutionRunnable)
throws TaskDispatchException {
final TaskExecutionContext taskExecutionContext =
taskExecutionRunnable.getTaskExecutionContext();
final String taskName = taskExecutionContext.getTaskName();
+ final String workerGroup = taskExecutionContext.getWorkerGroup();
+
+ // workerGroup not exist
+ if
(!clusterManager.getWorkerClusters().containsWorkerGroup(workerGroup)) {
+ throw new WorkerGroupNotFoundException(workerGroup);
+ }
+
+ // select an available worker from the worker group; throws
NoAvailableWorkerException if none is available.
final String physicalTaskExecutorAddress = workerLoadBalancer
- .select(taskExecutionContext.getWorkerGroup())
+ .select(workerGroup)
.map(Host::of)
.map(Host::getAddress)
- .orElseThrow(() -> new TaskDispatchException(
- String.format("Cannot find the host to dispatch
Task[id=%s, name=%s, workerGroup=%s]",
- taskExecutionContext.getTaskInstanceId(),
taskName,
- taskExecutionContext.getWorkerGroup())));
+ .orElseThrow(() -> new
NoAvailableWorkerException(workerGroup));
taskExecutionContext.setHost(physicalTaskExecutorAddress);
taskExecutionRunnable.getTaskInstance().setHost(physicalTaskExecutorAddress);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
index 28834e27e7..38f5533ba7 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
@@ -18,12 +18,16 @@
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
+import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
+import java.util.Date;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,11 +52,22 @@ public class WorkerGroupDispatcher extends BaseDaemonThread
{
private final AtomicBoolean runningFlag = new AtomicBoolean(false);
- public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient
taskExecutorClient) {
+ private final TaskDispatchPolicy taskDispatchPolicy;
+
+ private final long maxTaskDispatchMillis;
+
+ public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient
taskExecutorClient,
+ TaskDispatchPolicy taskDispatchPolicy) {
super("WorkerGroupTaskDispatcher-" + workerGroupName);
this.taskExecutorClient = taskExecutorClient;
this.workerGroupEventBus = new TaskDispatchableEventBus<>();
this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
+ this.taskDispatchPolicy = taskDispatchPolicy;
+ if (taskDispatchPolicy.isDispatchTimeoutEnabled()) {
+ this.maxTaskDispatchMillis =
taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis();
+ } else {
+ this.maxTaskDispatchMillis = 0L;
+ }
log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
}
@@ -84,26 +99,54 @@ public class WorkerGroupDispatcher extends BaseDaemonThread
{
}
private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
+ final int taskInstanceId = taskExecutionRunnable.getId();
+ final TaskExecutionContext taskExecutionContext =
taskExecutionRunnable.getTaskExecutionContext();
try {
- if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId()))
{
+ if (!waitingDispatchTaskIds.remove(taskInstanceId)) {
log.info(
"The task: {} doesn't exist in
waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
- taskExecutionRunnable.getId());
+ taskInstanceId);
return;
}
taskExecutorClient.dispatch(taskExecutionRunnable);
- } catch (Exception e) {
+ } catch (Exception ex) {
+ if (taskDispatchPolicy.isDispatchTimeoutEnabled()) {
+ // If a dispatch timeout occurs, the task will not be put back
into the queue.
+ long elapsed = System.currentTimeMillis() -
taskExecutionContext.getFirstDispatchTime();
+ if (elapsed > maxTaskDispatchMillis) {
+ onDispatchTimeout(taskExecutionRunnable, ex, elapsed,
maxTaskDispatchMillis);
+ return;
+ }
+ }
+
// If dispatch failed, will put the task back to the queue
// The task will be dispatched after waiting time.
// the waiting time will increase multiple of times, but will not
exceed 60 seconds
- long waitingTimeMills = Math.min(
+ long waitingTimeMillis = Math.min(
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() *
1_000L, 60_000L);
- dispatchTask(taskExecutionRunnable, waitingTimeMills);
- log.error("Dispatch Task: {} failed will retry after: {}/ms",
taskExecutionRunnable.getId(),
- waitingTimeMills, e);
+ dispatchTask(taskExecutionRunnable, waitingTimeMillis);
+ log.warn("Dispatch Task: {} failed will retry after: {}/ms",
taskInstanceId,
+ waitingTimeMillis, ex);
}
}
+ /**
+ * Marks a task as permanently failed due to dispatch timeout.
+ * Once called, the task is considered permanently failed and will not be
retried.
+ */
+ private void onDispatchTimeout(ITaskExecutionRunnable
taskExecutionRunnable, Exception ex,
+ long elapsed, long timeout) {
+ String taskName = taskExecutionRunnable.getName();
+ log.error("Task: {} dispatch timeout after {}ms (limit: {}ms)",
+ taskName, elapsed, timeout, ex);
+
+ final TaskFailedLifecycleEvent taskFailedEvent =
TaskFailedLifecycleEvent.builder()
+ .taskExecutionRunnable(taskExecutionRunnable)
+ .endTime(new Date())
+ .build();
+ taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
+ }
+
/**
* Adds a task to the worker group queue.
* This method wraps the given task execution object into a priority and
delay-based task entry and adds it to the worker group queue.
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
index a85674c6f4..086fc5359e 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
@@ -41,8 +42,11 @@ public class WorkerGroupDispatcherCoordinator implements
AutoCloseable {
private final ConcurrentHashMap<String, WorkerGroupDispatcher>
workerGroupDispatcherMap;
- public WorkerGroupDispatcherCoordinator() {
+ private final MasterConfig masterConfig;
+
+ public WorkerGroupDispatcherCoordinator(final MasterConfig masterConfig) {
workerGroupDispatcherMap = new ConcurrentHashMap<>();
+ this.masterConfig = masterConfig;
}
public void start() {
@@ -99,7 +103,8 @@ public class WorkerGroupDispatcherCoordinator implements
AutoCloseable {
private WorkerGroupDispatcher getOrCreateWorkerGroupDispatcher(String
workerGroup) {
return workerGroupDispatcherMap.computeIfAbsent(workerGroup, wg -> {
- WorkerGroupDispatcher workerGroupDispatcher = new
WorkerGroupDispatcher(wg, taskExecutorClient);
+ WorkerGroupDispatcher workerGroupDispatcher =
+ new WorkerGroupDispatcher(wg, taskExecutorClient,
masterConfig.getTaskDispatchPolicy());
workerGroupDispatcher.start();
return workerGroupDispatcher;
});
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/NoAvailableWorkerException.java
similarity index 59%
copy from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/NoAvailableWorkerException.java
index 07156b58ab..d51e2342cc 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/NoAvailableWorkerException.java
@@ -15,19 +15,11 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.utils;
+package org.apache.dolphinscheduler.server.master.exception.dispatch;
-import
org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
+public class NoAvailableWorkerException extends TaskDispatchException {
-import org.springframework.dao.DataAccessResourceFailureException;
-
-public class ExceptionUtils {
-
- public static boolean isDatabaseConnectedFailedException(Throwable e) {
- return e instanceof DataAccessResourceFailureException;
- }
-
- public static boolean isTaskExecutionContextCreateException(Throwable e) {
- return e instanceof TaskExecutionContextCreateException;
+ public NoAvailableWorkerException(String workerGroup) {
+ super("Cannot find available worker under worker group: " +
workerGroup);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
index 07156b58ab..5c1233f7db 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
@@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.server.master.utils;
import
org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
+import
org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
+import
org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
import org.springframework.dao.DataAccessResourceFailureException;
@@ -30,4 +32,12 @@ public class ExceptionUtils {
public static boolean isTaskExecutionContextCreateException(Throwable e) {
return e instanceof TaskExecutionContextCreateException;
}
+
+ public static boolean isWorkerGroupNotFoundException(Throwable e) {
+ return e instanceof WorkerGroupNotFoundException;
+ }
+
+ public static boolean isNoAvailableWorkerException(Throwable e) {
+ return e instanceof NoAvailableWorkerException;
+ }
}
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml
b/dolphinscheduler-master/src/main/resources/application.yaml
index 39a0f4311a..81c7ae3aed 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -111,6 +111,11 @@ master:
# Master max concurrent workflow instances, when the master's workflow
instance count exceeds this value, master server will be marked as busy.
max-concurrent-workflow-instances: 2147483647
worker-group-refresh-interval: 5m
+ # Task dispatch timeout check (currently disabled).
+ # When enabled, tasks not dispatched within this duration are marked as
failed.
+ task-dispatch-policy:
+ dispatch-timeout-enabled: false
+ max-task-dispatch-duration: 1h
command-fetch-strategy:
type: ID_SLOT_BASED
config:
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
index 991d6c249e..8ca3e7538d 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
@@ -24,6 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import
org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerConfigurationProperties;
import
org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerType;
+import java.time.Duration;
+
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
@@ -72,4 +74,13 @@ public class MasterConfigTest {
assertThat(dynamicWeightConfigProperties.getCpuUsageWeight()).isEqualTo(30);
assertThat(dynamicWeightConfigProperties.getTaskThreadPoolUsageWeight()).isEqualTo(30);
}
+
+ @Test
+ public void getTaskDispatchPolicy() {
+ TaskDispatchPolicy policy = masterConfig.getTaskDispatchPolicy();
+
+ assertThat(policy).isNotNull();
+ assertThat(policy.isDispatchTimeoutEnabled()).isFalse();
+
assertThat(policy.getMaxTaskDispatchDuration()).isEqualTo(Duration.ofHours(1));
+ }
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
index e2e96a9614..8e838b724a 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
@@ -24,25 +24,33 @@ import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.test.util.ReflectionTestUtils;
@ExtendWith(MockitoExtension.class)
class WorkerGroupDispatcherCoordinatorTest {
- @InjectMocks
private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
@Mock
private ITaskExecutorClient taskExecutorClient;
+ @BeforeEach
+ void setUp() {
+ MasterConfig masterConfig = new MasterConfig();
+ workerGroupDispatcherCoordinator = new
WorkerGroupDispatcherCoordinator(masterConfig);
+ ReflectionTestUtils.setField(workerGroupDispatcherCoordinator,
"taskExecutorClient", taskExecutorClient);
+ }
+
@Test
void addTaskToWorkerGroup_NewWorkerGroup_ShouldAddTask() {
String workerGroup = "newGroup";
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
index 77525cb181..210f7c21c9 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
@@ -18,21 +18,36 @@
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
+import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
import
org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
+import
org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -46,7 +61,9 @@ class WorkerGroupDispatcherTest {
@BeforeEach
void setUp() {
taskExecutorClient = mock(ITaskExecutorClient.class);
- dispatcher = new WorkerGroupDispatcher("TestGroup",
taskExecutorClient);
+ final MasterConfig masterConfig = new MasterConfig();
+ dispatcher =
+ new WorkerGroupDispatcher("TestGroup", taskExecutorClient,
masterConfig.getTaskDispatchPolicy());
}
@Test
@@ -138,4 +155,229 @@ class WorkerGroupDispatcherTest {
.untilAsserted(() -> verify(taskExecutorClient,
times(2)).dispatch(taskExecutionRunnable));
}
+ @Test
+ void dispatchTask_WorkerGroupNotFound_TimeoutDisabled_ShouldKeepRetrying()
throws TaskDispatchException {
+ ITaskExecutionRunnable taskExecutionRunnable =
+
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis());
+ WorkerGroupNotFoundException ex = new WorkerGroupNotFoundException("no
worker group");
+ doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ await()
+ .atMost(Duration.ofSeconds(3))
+ .untilAsserted(() -> {
+ verify(taskExecutorClient,
atLeast(2)).dispatch(taskExecutionRunnable);
+ verify(taskExecutionRunnable.getWorkflowEventBus(),
never())
+ .publish(any(TaskFailedLifecycleEvent.class));
+ });
+ }
+
+ @Test
+ void
dispatchTask_WorkerGroupNotFound_TimeoutEnabledAndExceeded_ShouldPublishFailedEvent()
throws TaskDispatchException {
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMillis(200));
+
+ dispatcher = new WorkerGroupDispatcher("TestGroup",
taskExecutorClient, taskDispatchPolicy);
+
+ ITaskExecutionRunnable taskExecutionRunnable =
+
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() -
500);
+ WorkerGroupNotFoundException ex = new
WorkerGroupNotFoundException("worker group not found");
+ doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ await()
+ .atMost(Duration.ofSeconds(2))
+ .untilAsserted(() -> {
+ verify(taskExecutorClient,
times(1)).dispatch(taskExecutionRunnable);
+
verify(taskExecutionRunnable.getWorkflowEventBus()).publish(
+ argThat(event -> event instanceof
TaskFailedLifecycleEvent &&
+ ((TaskFailedLifecycleEvent) event)
+ .getTaskExecutionRunnable() ==
taskExecutionRunnable));
+ });
+ }
+
+ @Test
+ void
dispatchTask_WorkerGroupNotFound_TimeoutEnabledButNotExceeded_ShouldNotPublishAnyFailureEvent()
throws TaskDispatchException, InterruptedException {
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMinutes(1));
+
+ dispatcher = new WorkerGroupDispatcher("TestGroup",
taskExecutorClient, taskDispatchPolicy);
+
+ ITaskExecutionRunnable taskExecutionRunnable =
+
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() -
100);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ countDownLatch.countDown();
+ throw new WorkerGroupNotFoundException("Worker group 'TestGroup'
does not exist");
+ }).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ assertTrue(countDownLatch.await(1, TimeUnit.SECONDS));
+ verify(taskExecutionRunnable.getWorkflowEventBus(),
never()).publish(any(TaskFailedLifecycleEvent.class));
+ }
+
+ @Test
+ void dispatchTask_NoAvailableWorker_TimeoutDisabled_ShouldKeepRetrying()
throws TaskDispatchException {
+ ITaskExecutionRunnable taskExecutionRunnable =
+
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis());
+ NoAvailableWorkerException ex = new NoAvailableWorkerException("no
worker");
+ doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ await()
+ .atMost(Duration.ofSeconds(3))
+ .untilAsserted(() -> {
+ verify(taskExecutorClient,
atLeast(2)).dispatch(taskExecutionRunnable);
+ verify(taskExecutionRunnable.getWorkflowEventBus(),
never())
+ .publish(any(TaskFailedLifecycleEvent.class));
+ });
+ }
+
+ @Test
+ void
dispatchTask_NoAvailableWorker_TimeoutEnabledAndExceeded_ShouldPublishFailedEvent()
throws TaskDispatchException {
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMillis(200));
+
+ dispatcher = new WorkerGroupDispatcher("TestGroup",
taskExecutorClient, taskDispatchPolicy);
+
+ ITaskExecutionRunnable taskExecutionRunnable =
+
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() -
500);
+ NoAvailableWorkerException ex = new NoAvailableWorkerException("no
worker");
+ doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ await()
+ .atMost(Duration.ofSeconds(2))
+ .untilAsserted(() -> {
+ verify(taskExecutorClient,
times(1)).dispatch(taskExecutionRunnable);
+
verify(taskExecutionRunnable.getWorkflowEventBus()).publish(
+ argThat(event -> event instanceof
TaskFailedLifecycleEvent &&
+ ((TaskFailedLifecycleEvent) event)
+ .getTaskExecutionRunnable() ==
taskExecutionRunnable));
+ });
+ }
+
+ @Test
+ void
dispatchTask_NoAvailableWorker_TimeoutEnabledButNotExceeded_ShouldNotPublishAnyFailureEvent()
throws TaskDispatchException, InterruptedException {
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMinutes(1));
+
+ dispatcher = new WorkerGroupDispatcher("TestGroup",
taskExecutorClient, taskDispatchPolicy);
+
+ ITaskExecutionRunnable taskExecutionRunnable =
+
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() -
100);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ countDownLatch.countDown();
+ throw new NoAvailableWorkerException("no worker");
+ }).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ assertTrue(countDownLatch.await(1, TimeUnit.SECONDS));
+ verify(taskExecutionRunnable.getWorkflowEventBus(),
never()).publish(any(TaskFailedLifecycleEvent.class));
+ }
+
+ @Test
+ void
dispatchTask_GenericTaskDispatchException_TimeoutDisabled_ShouldKeepRetrying()
throws TaskDispatchException {
+ ITaskExecutionRunnable taskExecutionRunnable =
+
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis());
+ TaskDispatchException ex = new TaskDispatchException("generic dispatch
error");
+ doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ await()
+ .atMost(Duration.ofSeconds(3))
+ .untilAsserted(() -> {
+ verify(taskExecutorClient,
atLeast(2)).dispatch(taskExecutionRunnable);
+ verify(taskExecutionRunnable.getWorkflowEventBus(),
never())
+ .publish(any(TaskFailedLifecycleEvent.class));
+ });
+ }
+
+ @Test
+ void
dispatchTask_GenericTaskDispatchException_TimeoutEnabledAndExceeded_ShouldPublishFailedEvent()
throws TaskDispatchException {
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMillis(200));
+
+ dispatcher = new WorkerGroupDispatcher("TestGroup",
taskExecutorClient, taskDispatchPolicy);
+
+ ITaskExecutionRunnable taskExecutionRunnable =
+
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() -
500);
+ TaskDispatchException ex = new TaskDispatchException("generic dispatch
error");
+ doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ await()
+ .atMost(Duration.ofSeconds(2))
+ .untilAsserted(() -> {
+ verify(taskExecutorClient,
times(1)).dispatch(taskExecutionRunnable);
+
verify(taskExecutionRunnable.getWorkflowEventBus()).publish(
+ argThat(event -> event instanceof
TaskFailedLifecycleEvent &&
+ ((TaskFailedLifecycleEvent) event)
+ .getTaskExecutionRunnable() ==
taskExecutionRunnable));
+ });
+ }
+
+ @Test
+ void
dispatchTask_GenericTaskDispatchException_TimeoutEnabledButNotExceeded_ShouldNotPublishAnyFailureEvent()
throws TaskDispatchException, InterruptedException {
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMinutes(1));
+
+ dispatcher = new WorkerGroupDispatcher("TestGroup",
taskExecutorClient, taskDispatchPolicy);
+
+ ITaskExecutionRunnable taskExecutionRunnable =
+
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() -
100);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ countDownLatch.countDown();
+ throw new TaskDispatchException("Generic dispatch error");
+ }).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ assertTrue(countDownLatch.await(1, TimeUnit.SECONDS));
+ verify(taskExecutionRunnable.getWorkflowEventBus(),
never()).publish(any(TaskFailedLifecycleEvent.class));
+ }
+
+ // Helper to mock TaskExecutionRunnable with firstDispatchTime
+ private ITaskExecutionRunnable
mockTaskExecutionRunnableWithFirstDispatchTime(long firstDispatchTime) {
+ ITaskExecutionRunnable taskExecutionRunnable =
mock(ITaskExecutionRunnable.class);
+ TaskInstance taskInstance = mock(TaskInstance.class);
+ WorkflowInstance workflowInstance = mock(WorkflowInstance.class);
+ WorkflowEventBus eventBus = mock(WorkflowEventBus.class);
+
+ TaskExecutionContext context = mock(TaskExecutionContext.class);
+ when(context.getFirstDispatchTime()).thenReturn(firstDispatchTime);
+
+ when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+
when(taskExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance);
+ when(taskExecutionRunnable.getWorkflowEventBus()).thenReturn(eventBus);
+
when(taskExecutionRunnable.getId()).thenReturn(ThreadLocalRandom.current().nextInt(1000,
9999));
+
when(taskExecutionRunnable.getTaskExecutionContext()).thenReturn(context);
+
+ return taskExecutionRunnable;
+ }
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 8fa8ecf68c..6277351c29 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -34,6 +34,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import
org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
+import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
import
org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
@@ -1578,4 +1579,162 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
});
masterContainer.assertAllResourceReleased();
}
+
+ @Test
+ @DisplayName("Test start a workflow whose task specifies a non-existent
worker group when dispatch timeout is enabled")
+ public void testTaskFail_with_workerGroupNotFoundAndTimeoutEnabled() {
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofSeconds(10));
+ this.masterConfig.setTaskDispatchPolicy(taskDispatchPolicy);
+
+ final String yaml =
"/it/start/workflow_with_worker_group_not_found.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> {
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getWorkerGroup()).isEqualTo("workerGroupNotFound");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+ });
+
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance ->
assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.FAILURE));
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
+ @Test
+ @DisplayName("Test start a workflow whose task specifies a non-existent
worker group when dispatch timeout is disabled")
+ public void
testTaskRemainsSubmittedSuccess_with_workerGroupNotFoundAndTimeoutDisabled() {
+ TaskDispatchPolicy policy = new TaskDispatchPolicy();
+ policy.setDispatchTimeoutEnabled(false);
+ this.masterConfig.setTaskDispatchPolicy(policy);
+
+ final String yaml =
"/it/start/workflow_with_worker_group_not_found.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> {
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getWorkerGroup()).isEqualTo("workerGroupNotFound");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUBMITTED_SUCCESS);
+ });
+
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance ->
assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION));
+
+ });
+
+ // This test intentionally leaves the workflow running, so we skip the
resource cleanup check.
+ // masterContainer.assertAllResourceReleased();
+ }
+
+ @Test
+ @DisplayName("Test start a workflow when no available worker and dispatch
timeout is enabled")
+ public void testTaskFail_with_noAvailableWorkerAndTimeoutEnabled() {
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofSeconds(10));
+ this.masterConfig.setTaskDispatchPolicy(taskDispatchPolicy);
+
+ final String yaml = "/it/start/workflow_with_no_available_worker.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> {
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getWorkerGroup()).isEqualTo("default");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+ });
+
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance ->
assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.FAILURE));
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
+ @Test
+ @DisplayName("Test start a workflow when no available worker and dispatch
timeout is disabled")
+ public void
testTaskRemainsSubmittedSuccess_with_noAvailableWorkerAndTimeoutDisabled() {
+ TaskDispatchPolicy policy = new TaskDispatchPolicy();
+ policy.setDispatchTimeoutEnabled(false);
+ this.masterConfig.setTaskDispatchPolicy(policy);
+
+ final String yaml = "/it/start/workflow_with_no_available_worker.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> {
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getWorkerGroup()).isEqualTo("default");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUBMITTED_SUCCESS);
+ });
+
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance ->
assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION));
+ });
+
+ // This test intentionally leaves the workflow running, so we skip the
resource cleanup check.
+ // masterContainer.assertAllResourceReleased();
+ }
+
}
diff --git a/dolphinscheduler-master/src/test/resources/application.yaml
b/dolphinscheduler-master/src/test/resources/application.yaml
index 3aca1e3142..f485dd0c5f 100644
--- a/dolphinscheduler-master/src/test/resources/application.yaml
+++ b/dolphinscheduler-master/src/test/resources/application.yaml
@@ -73,6 +73,11 @@ master:
cpu-usage-weight: 30
task-thread-pool-usage-weight: 30
worker-group-refresh-interval: 5m
+ # Task dispatch timeout check (currently disabled).
+ # When enabled, tasks not dispatched within this duration are marked as
failed.
+ task-dispatch-policy:
+ dispatch-timeout-enabled: false
+ max-task-dispatch-duration: 1h
command-fetch-strategy:
type: ID_SLOT_BASED
config:
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_no_available_worker.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_no_available_worker.yaml
new file mode 100644
index 0000000000..a4688200fb
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_no_available_worker.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_no_available_worker
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: SHELL
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_worker_group_not_found.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_worker_group_not_found.yaml
new file mode 100644
index 0000000000..abb7a05e0b
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_worker_group_not_found.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_worker_group_not_found
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: SHELL
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+ workerGroup: workerGroupNotFound
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index a8f46435c1..94b4afd2ab 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -126,6 +126,8 @@ public class TaskExecutionContext implements Serializable {
private boolean failover;
+ private final long firstDispatchTime = System.currentTimeMillis();
+
public int increaseDispatchFailTimes() {
return ++dispatchFailTimes;
}