This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 7dc6a4a41d [Fix-17817][Master] Add workflow timeout event and handle
(#18063)
7dc6a4a41d is described below
commit 7dc6a4a41df9b75723615582d08cfd6b3e6be8e3
Author: njnu-seafish <[email protected]>
AuthorDate: Thu Apr 2 14:32:35 2026 +0800
[Fix-17817][Master] Add workflow timeout event and handle (#18063)
---
.../dolphinscheduler/common/enums/AlertType.java | 2 +-
.../org/apache/dolphinscheduler/dao/AlertDao.java | 15 ++++-
.../lifecycle/event/TaskTimeoutLifecycleEvent.java | 2 +-
.../lifecycle/WorkflowLifecycleEventType.java | 4 ++
.../event/WorkflowTimeoutLifecycleEvent.java | 70 ++++++++++++++++++++
.../WorkflowStartLifecycleEventHandler.java | 15 +++++
.../WorkflowTimeoutLifecycleEventHandler.java | 76 ++++++++++++++++++++++
.../server/master/integration/Repository.java | 14 ++++
.../master/integration/WorkflowOperator.java | 9 +++
.../integration/cases/WorkflowStartTestCase.java | 42 ++++++++++++
.../workflow_with_workflow_timeout_alert.yaml | 63 ++++++++++++++++++
.../service/alert/WorkflowAlertManager.java | 6 ++
12 files changed, 313 insertions(+), 5 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java
index f7a5ba0d39..058afcb3fc 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java
@@ -29,7 +29,7 @@ public enum AlertType {
/**
* 0 workflow instance failure, 1 workflow instance success, 2 workflow
instance blocked, 3 workflow instance timeout, 4 fault tolerance warning,
- * 5 task failure, 6 task success, 7 task timeout, 8 close alert
+ * 5 task failure, 6 task success, 7 task timeout
*/
WORKFLOW_INSTANCE_FAILURE(0, "workflow instance failure"),
WORKFLOW_INSTANCE_SUCCESS(1, "workflow instance success"),
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
index 708c8ad6b8..3b1c45816a 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
@@ -195,6 +195,13 @@ public class AlertDao {
* @param projectUser projectUser
*/
public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance,
ProjectUser projectUser) {
+ if (projectUser == null) {
+ throw new IllegalArgumentException("projectUser must not be null");
+ }
+ if (workflowInstance.getWarningGroupId() == null) {
+ throw new IllegalArgumentException("warningGroupId of the workflow
instance must not be null");
+ }
+
int alertGroupId = workflowInstance.getWarningGroupId();
Alert alert = new Alert();
List<WorkflowAlertContent> workflowAlertContentList = new
ArrayList<>(1);
@@ -220,10 +227,11 @@ public class AlertDao {
alert.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode());
alert.setWorkflowInstanceId(workflowInstance.getId());
alert.setAlertType(AlertType.WORKFLOW_INSTANCE_TIMEOUT);
- saveTaskTimeoutAlert(alert, content, alertGroupId);
+
+ saveTimeoutAlert(alert, content, alertGroupId);
}
- private void saveTaskTimeoutAlert(Alert alert, String content, int
alertGroupId) {
+ private void saveTimeoutAlert(Alert alert, String content, int
alertGroupId) {
alert.setAlertGroupId(alertGroupId);
alert.setWarningType(WarningType.FAILURE);
alert.setContent(content);
@@ -275,7 +283,8 @@ public class AlertDao {
alert.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode());
alert.setWorkflowInstanceId(workflowInstance.getId());
alert.setAlertType(AlertType.TASK_TIMEOUT);
- saveTaskTimeoutAlert(alert, content,
workflowInstance.getWarningGroupId());
+
+ saveTimeoutAlert(alert, content, workflowInstance.getWarningGroupId());
}
/**
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java
index 5ce6d109a6..29905b71ea 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java
@@ -51,7 +51,7 @@ public class TaskTimeoutLifecycleEvent extends
AbstractTaskLifecycleEvent {
final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
checkState(timeoutStrategy != null, "The task timeoutStrategy must not
be null");
- checkState(timeoutInMinutes >= 0, "The task timeout: %s must >=0
minutes", timeoutInMinutes);
+ checkState(timeoutInMinutes > 0, "The task timeout: %s must > 0
minutes", timeoutInMinutes);
long delayTime = System.currentTimeMillis() -
taskInstance.getSubmitTime().getTime()
+ TimeUnit.MINUTES.toMillis(timeoutInMinutes);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java
index 95070d6229..aa16f8405b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java
@@ -29,6 +29,10 @@ public enum WorkflowLifecycleEventType implements
ILifecycleEventType {
* Notify the workflow instance there exist a task has been finished, and
should do DAG topology logic transaction.
*/
TOPOLOGY_LOGICAL_TRANSACTION_WITH_TASK_FINISH,
+ /**
+ * Do Timeout strategy of the workflow instance.
+ */
+ TIMEOUT,
/**
* Pause the workflow instance
*/
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java
new file mode 100644
index 0000000000..dc97d5d34e
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.AbstractWorkflowLifecycleLifecycleEvent;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+
+import java.util.concurrent.TimeUnit;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@Getter
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+public class WorkflowTimeoutLifecycleEvent extends
AbstractWorkflowLifecycleLifecycleEvent {
+
+ private IWorkflowExecutionRunnable workflowExecutionRunnable;
+
+ protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
+ final long timeout) {
+ super(timeout);
+ this.workflowExecutionRunnable = workflowExecutionRunnable;
+ }
+
+ public static WorkflowTimeoutLifecycleEvent of(IWorkflowExecutionRunnable
workflowExecutionRunnable) {
+ final WorkflowInstance workflowInstance =
workflowExecutionRunnable.getWorkflowInstance();
+ checkState(workflowInstance != null,
+ "The workflow instance must be initialized before creating
workflow timeout event.");
+
+ final int timeout = workflowInstance.getTimeout();
+ checkState(timeout > 0, "The workflow timeout: %s must > 0 minutes",
timeout);
+
+ long delayTime = System.currentTimeMillis() -
workflowInstance.getRestartTime().getTime()
+ + TimeUnit.MINUTES.toMillis(timeout);
+ return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable,
delayTime);
+ }
+
+ @Override
+ public ILifecycleEventType getEventType() {
+ return WorkflowLifecycleEventType.TIMEOUT;
+ }
+
+ @Override
+ public String toString() {
+ return "WorkflowTimeoutLifecycleEvent{" +
+ "workflow=" +
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getName()
+
+ '}';
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java
index 2a3aba95e3..ce78aa30de 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java
@@ -17,9 +17,11 @@
package
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler;
+import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction;
@@ -38,6 +40,7 @@ public class WorkflowStartLifecycleEventHandler
final IWorkflowExecutionRunnable
workflowExecutionRunnable,
final WorkflowStartLifecycleEvent workflowStartEvent) {
+ workflowTimeoutMonitor(workflowExecutionRunnable);
workflowStateAction.onStartEvent(workflowExecutionRunnable,
workflowStartEvent);
}
@@ -45,4 +48,16 @@ public class WorkflowStartLifecycleEventHandler
public ILifecycleEventType matchEventType() {
return WorkflowLifecycleEventType.START;
}
+
+ private void workflowTimeoutMonitor(final IWorkflowExecutionRunnable
workflowExecutionRunnable) {
+ final WorkflowInstance workflowInstance =
workflowExecutionRunnable.getWorkflowInstance();
+ if (workflowInstance.getTimeout() <= 0) {
+ log.debug("The workflow {} timeout {} is not configured or
invalid, skip timeout monitor.",
+ workflowInstance.getName(),
+ workflowInstance.getTimeout());
+ return;
+ }
+ workflowExecutionRunnable.getWorkflowEventBus()
+
.publish(WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable));
+ }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java
new file mode 100644
index 0000000000..34e405125a
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler;
+
+import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
+import
org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction;
+import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowTimeoutLifecycleEventHandler
+ extends
+
AbstractWorkflowLifecycleEventHandler<WorkflowTimeoutLifecycleEvent> {
+
+ private final WorkflowAlertManager workflowAlertManager;
+
+ public WorkflowTimeoutLifecycleEventHandler(final WorkflowAlertManager
workflowAlertManager) {
+ this.workflowAlertManager = workflowAlertManager;
+ }
+
+ @Override
+ public void handle(final IWorkflowStateAction workflowStateAction,
+ final IWorkflowExecutionRunnable
workflowExecutionRunnable,
+ final WorkflowTimeoutLifecycleEvent
workflowTimeoutLifecycleEvent) {
+ final IWorkflowExecutionGraph workflowExecutionGraph =
workflowExecutionRunnable.getWorkflowExecutionGraph();
+ if (workflowExecutionGraph.isAllTaskExecutionRunnableChainFinish()) {
+ // all the TaskExecutionRunnable chain in the graph is finish,
means the workflow is already finished.
+ return;
+ }
+
+ final WorkflowInstance workflowInstance =
workflowExecutionRunnable.getWorkflowInstance();
+ final boolean shouldSendAlert = workflowInstance.getWarningGroupId()
!= null;
+
+ if (shouldSendAlert) {
+ doWorkflowTimeoutAlert(workflowExecutionRunnable);
+ } else {
+ log.info("Skipped sending timeout alert for workflow {} because
warningGroupId is null.",
+ workflowInstance.getName());
+ }
+
+ }
+
+ @Override
+ public ILifecycleEventType matchEventType() {
+ return WorkflowLifecycleEventType.TIMEOUT;
+ }
+
+ private void doWorkflowTimeoutAlert(final IWorkflowExecutionRunnable
workflowExecutionRunnable) {
+ final WorkflowInstance workflowInstance =
workflowExecutionRunnable.getWorkflowInstance();
+ workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance);
+ }
+}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java
index 15c31c0940..4659fbfaaf 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java
@@ -17,9 +17,11 @@
package org.apache.dolphinscheduler.server.master.integration;
+import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
@@ -39,6 +41,9 @@ public class Repository {
@Autowired
private TaskInstanceDao taskInstanceDao;
+ @Autowired
+ private AlertMapper alertMapper;
+
/**
* Return the list of process instances for a given workflow definition in
ascending order of their IDs.
*/
@@ -87,4 +92,13 @@ public class Repository {
return taskInstanceDao.queryAll();
}
+ /**
+ * Return the list of alert for a given workflow instance in ascending
order of their IDs.
+ */
+ public List<Alert> queryAlert(final Integer workflowInstanceId) {
+ return alertMapper.selectByWorkflowInstanceId(workflowInstanceId)
+ .stream()
+ .sorted(Comparator.comparingInt(Alert::getId))
+ .collect(Collectors.toList());
+ }
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
index aaac0b459d..99341cda33 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.integration;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -67,6 +68,8 @@ public class WorkflowOperator {
.dryRun(workflowTriggerDTO.getDryRun())
.taskDependType(workflowTriggerDTO.getTaskDependType())
.failureStrategy(workflowTriggerDTO.getFailureStrategy())
+ .warningGroupId(workflowTriggerDTO.getWarningGroupId())
+ .warningType(workflowTriggerDTO.getWarningType())
.build();
final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
@@ -160,6 +163,12 @@ public class WorkflowOperator {
@Builder.Default
private FailureStrategy failureStrategy = FailureStrategy.CONTINUE;
+
+ @Builder.Default
+ private WarningType warningType = WarningType.NONE;
+
+ @Builder.Default
+ private Integer warningGroupId = null;
}
@Data
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index b19b27ebe8..e5a12c78c2 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -20,6 +20,7 @@ package
org.apache.dolphinscheduler.server.master.integration.cases;
import static com.google.common.truth.Truth.assertThat;
import static org.awaitility.Awaitility.await;
+import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
@@ -1778,4 +1779,45 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
// masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test start a workflow when timeout should trigger alert when
warningGroupId is set")
+ public void testWorkflowTimeout_WithAlertGroup_ShouldSendAlert() {
+ final String yaml =
"/it/start/workflow_with_workflow_timeout_alert.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .warningGroupId(workflow.getWarningGroupId())
+ .build();
+ final Integer workflowInstanceId =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await().atMost(Duration.ofMinutes(2))
+ .untilAsserted(() -> {
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+ .matches(
+ workflowInstance ->
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("long_running_task");
+
assertThat(taskInstance.getWorkerGroup()).isEqualTo("default");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ });
+ Assertions
+
.assertThat(repository.queryAlert(workflowInstanceId))
+ .hasSize(1)
+ .anySatisfy(alert -> {
+
assertThat(alert.getTitle()).isEqualTo("Workflow Timeout Warn");
+
assertThat(alert.getProjectCode()).isEqualTo(1);
+
assertThat(alert.getWorkflowDefinitionCode()).isEqualTo(1);
+
assertThat(alert.getAlertType()).isEqualTo(AlertType.WORKFLOW_INSTANCE_TIMEOUT);
+ });
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
}
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_workflow_timeout_alert.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_workflow_timeout_alert.yaml
new file mode 100644
index 0000000000..f1353981cf
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_workflow_timeout_alert.yaml
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project for timeout alert testing
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_timeout_alert
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with timeout alert configuration
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+ timeout: 1
+ warningGroupId: 1
+
+tasks:
+ - name: long_running_task
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":[],"shellScript":"sleep 90","resourceList":[]}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java
index 2a907c48dc..9335e9551e 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java
@@ -202,4 +202,10 @@ public class WorkflowAlertManager {
ProjectUser projectUser =
projectDao.queryProjectWithUserByWorkflowInstanceId(workflowInstance.getId());
alertDao.sendTaskTimeoutAlert(workflowInstance, taskInstance,
projectUser);
}
+
+ public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance) {
+ ProjectUser projectUser =
projectDao.queryProjectWithUserByWorkflowInstanceId(workflowInstance.getId());
+ alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser);
+ }
+
}