This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5aeca87823 [Fix-17758][Master] Mark task as failed if
TaskExecutionContext initialization fails (#17821)
5aeca87823 is described below
commit 5aeca878231200b6fe567bc707dd357b409851cd
Author: njnu-seafish <[email protected]>
AuthorDate: Wed Jan 14 10:47:51 2026 +0800
[Fix-17758][Master] Mark task as failed if TaskExecutionContext
initialization fails (#17821)
---
.../master/engine/WorkflowEventBusFireWorker.java | 14 +++
.../task/lifecycle/TaskLifecycleEventType.java | 4 +
.../lifecycle/event/TaskFatalLifecycleEvent.java | 52 +++++++++++
.../handler/TaskFatalLifecycleEventHandler.java | 44 +++++++++
.../task/statemachine/AbstractTaskStateAction.java | 34 +++++++
.../engine/task/statemachine/ITaskStateAction.java | 9 ++
.../statemachine/TaskSubmittedStateAction.java | 14 ++-
.../TaskExecutionContextCreateException.java | 2 +-
.../server/master/utils/ExceptionUtils.java | 5 +
.../integration/cases/WorkflowStartTestCase.java | 103 +++++++++++++++++++++
...tion_task_with_one_fake_predecessor_fatal.yaml} | 23 +++--
.../start/workflow_with_one_fake_task_fatal.yaml | 62 +++++++++++++
...tion_task_with_one_fake_predecessor_failed.yaml | 4 +-
...tion_task_with_one_fake_predecessor_fatal.yaml} | 21 +++--
14 files changed, 365 insertions(+), 26 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java
index 905f4b7842..2e1c807edf 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java
@@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import
org.apache.dolphinscheduler.server.master.engine.exceptions.WorkflowEventFireException;
+import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
+import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
@@ -32,6 +34,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import java.util.Collections;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -128,6 +131,17 @@ public class WorkflowEventBusFireWorker {
ThreadUtils.sleep(5_000);
return;
}
+
+ // If task context init fails, publish a fatal error event
+ if (ExceptionUtils.isTaskExecutionContextCreateException(ex)) {
+ AbstractTaskLifecycleEvent taskLifecycleEvent =
(AbstractTaskLifecycleEvent) lifecycleEvent;
+ final TaskFatalLifecycleEvent taskFatalEvent =
TaskFatalLifecycleEvent.builder()
+
.taskExecutionRunnable(taskLifecycleEvent.getTaskExecutionRunnable())
+ .endTime(new Date())
+ .build();
+ workflowEventBus.publish(taskFatalEvent);
+ }
+
workflowEventBus.getWorkflowEventBusSummary().decreaseFireSuccessEventCount();
workflowEventBus.getWorkflowEventBusSummary().increaseFireFailedEventCount();
throw new WorkflowEventFireException(lifecycleEvent, ex);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/TaskLifecycleEventType.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/TaskLifecycleEventType.java
index 5ddcf13c89..fb12ccb603 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/TaskLifecycleEventType.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/TaskLifecycleEventType.java
@@ -29,6 +29,10 @@ public enum TaskLifecycleEventType implements
ILifecycleEventType {
* Dispatch the task instance to target.
*/
DISPATCH,
+ /**
+ * Task instance encounters catastrophic failure(such as initialization
failure), it will enter a failed state.
+ */
+ FATAL,
/**
* The task instance is dispatched to the target executor server.
*/
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskFatalLifecycleEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskFatalLifecycleEvent.java
new file mode 100644
index 0000000000..88cfc9a478
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskFatalLifecycleEvent.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event;
+
+import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
+import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
+import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.Date;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+@AllArgsConstructor
+public class TaskFatalLifecycleEvent extends AbstractTaskLifecycleEvent {
+
+ private final ITaskExecutionRunnable taskExecutionRunnable;
+
+ private final Date endTime;
+
+ @Override
+ public ILifecycleEventType getEventType() {
+ return TaskLifecycleEventType.FATAL;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskFatalLifecycleEvent{" +
+ "task=" + taskExecutionRunnable.getName() +
+ ", endTime=" + endTime +
+ '}';
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskFatalLifecycleEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskFatalLifecycleEventHandler.java
new file mode 100644
index 0000000000..8277d8b2ca
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskFatalLifecycleEventHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.handler;
+
+import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
+import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
+import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.engine.task.statemachine.ITaskStateAction;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskFatalLifecycleEventHandler extends
AbstractTaskLifecycleEventHandler<TaskFatalLifecycleEvent> {
+
+ @Override
+ public void handle(final ITaskStateAction taskStateAction,
+ final IWorkflowExecutionRunnable
workflowExecutionRunnable,
+ final ITaskExecutionRunnable taskExecutionRunnable,
+ final TaskFatalLifecycleEvent taskFatalEvent) {
+ taskStateAction.onFatalEvent(workflowExecutionRunnable,
taskExecutionRunnable, taskFatalEvent);
+ }
+
+ @Override
+ public ILifecycleEventType matchEventType() {
+ return TaskLifecycleEventType.FATAL;
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index 422087c5fe..88281145c6 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -36,6 +36,7 @@ import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecuto
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
+import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRetryLifecycleEvent;
@@ -99,6 +100,39 @@ public abstract class AbstractTaskStateAction implements
ITaskStateAction {
}
}
+ @Override
+ public void onFatalEvent(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
+ final ITaskExecutionRunnable
taskExecutionRunnable,
+ final TaskFatalLifecycleEvent taskFatalEvent) {
+ releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
+ persistentTaskInstanceFatalEventToDB(taskExecutionRunnable,
taskFatalEvent);
+
+ if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
+
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
+ return;
+ }
+
+ // If all successors are condition tasks, then the task will not be
marked as failure.
+ // And the DAG will continue to execute.
+ final IWorkflowExecutionGraph workflowExecutionGraph =
taskExecutionRunnable.getWorkflowExecutionGraph();
+ if
(workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable))
{
+ mergeTaskVarPoolToWorkflow(workflowExecutionRunnable,
taskExecutionRunnable);
+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
taskExecutionRunnable);
+ return;
+ }
+
+
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);
+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
taskExecutionRunnable);
+ }
+
+ private void persistentTaskInstanceFatalEventToDB(final
ITaskExecutionRunnable taskExecutionRunnable,
+ final
TaskFatalLifecycleEvent taskFatalEvent) {
+ final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
+ taskInstance.setState(TaskExecutionStatus.FAILURE);
+ taskInstance.setEndTime(taskFatalEvent.getEndTime());
+ taskInstanceDao.updateById(taskInstance);
+ }
+
@Override
public void onDispatchedEvent(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
final ITaskExecutionRunnable
taskExecutionRunnable,
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/ITaskStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/ITaskStateAction.java
index a041de5e3c..f60c3ae135 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/ITaskStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/ITaskStateAction.java
@@ -22,6 +22,7 @@ import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.Tas
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent;
+import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
@@ -91,6 +92,14 @@ public interface ITaskStateAction {
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskDispatchLifecycleEvent taskDispatchEvent);
+ /**
+ * Perform the necessary actions when the task in a certain state receive
a {@link TaskFatalLifecycleEvent}.
+ * <p> This method is called when the task encounters catastrophic failure
(e.g., initialization failure).
+ */
+ void onFatalEvent(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
+ final ITaskExecutionRunnable taskExecutionRunnable,
+ final TaskFatalLifecycleEvent taskFatalEvent);
+
/**
* Perform the necessary actions when the task in a certain state receive
a {@link TaskDispatchedLifecycleEvent}.
* <p> This method is called when the task has been dispatched to executor.
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
index 327cd94258..4169a2e436 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
@@ -36,6 +36,8 @@ import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.Tas
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskSuccessLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
+import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
import java.util.concurrent.TimeUnit;
@@ -109,7 +111,17 @@ public class TaskSubmittedStateAction extends
AbstractTaskStateAction {
taskInstance.getDelayTime(),
remainTimeMills);
}
- taskExecutionRunnable.initializeTaskExecutionContext();
+
+ try {
+ taskExecutionRunnable.initializeTaskExecutionContext();
+ } catch (Exception ex) {
+ if (ExceptionUtils.isDatabaseConnectedFailedException(ex)) {
+ throw ex;
+ }
+ log.error("Failed to initialize task execution context, taskName:
{}", taskInstance.getName(), ex);
+ throw new TaskExecutionContextCreateException(ex.getMessage());
+ }
+
workerGroupDispatcherCoordinator.dispatchTask(taskExecutionRunnable,
remainTimeMills);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java
index ac37c94438..2a54ea93bf 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.exception;
-public class TaskExecutionContextCreateException extends MasterException {
+public class TaskExecutionContextCreateException extends RuntimeException {
public TaskExecutionContextCreateException(String message) {
super(message);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
index 9103bc5075..07156b58ab 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.utils;
+import
org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
+
import org.springframework.dao.DataAccessResourceFailureException;
public class ExceptionUtils {
@@ -25,4 +27,7 @@ public class ExceptionUtils {
return e instanceof DataAccessResourceFailureException;
}
+ public static boolean isTaskExecutionContextCreateException(Throwable e) {
+ return e instanceof TaskExecutionContextCreateException;
+ }
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 3c628189df..8fa8ecf68c 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -889,6 +889,36 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test start a workflow with one fake task(A) fatal")
+ public void testStartWorkflow_with_oneFatalTask() {
+ final String yaml = "/it/start/workflow_with_one_fake_task_fatal.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance ->
assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.FAILURE));
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .satisfiesExactly(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
+
@Test
@DisplayName("Test start a workflow with one fake task(A) failed")
public void testStartWorkflow_with_oneFailedTaskWithRetry() {
@@ -1443,6 +1473,46 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test start a workflow with one condition task(B) when one
fake predecessor task(A) run fatal")
+ void
testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFatal() {
+ final String yaml =
"/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_fatal.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(parentWorkflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ final Integer workflowInstanceId =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+ .matches(
+ workflowInstance ->
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
+
+ Assertions
+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
+ .hasSize(3)
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+ })
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("B");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ })
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("D");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
+
@Test
@DisplayName("Test start a workflow with one condition task(B) which is
forbidden when one fake predecessor task(A) run failed")
void
testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFailed()
{
@@ -1475,4 +1545,37 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
});
masterContainer.assertAllResourceReleased();
}
+
+ @Test
+ @DisplayName("Test start a workflow with one condition task(B) which is
forbidden when one fake predecessor task(A) run fatal")
+ void
testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFatal()
{
+ final String yaml =
+
"/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_fatal.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(parentWorkflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ final Integer workflowInstanceId =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+ .matches(
+ workflowInstance ->
workflowInstance.getState() == WorkflowExecutionStatus.FAILURE);
+
+ Assertions
+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
+ .hasSize(1)
+ .satisfiesExactly(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
}
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_failed.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_fatal.yaml
similarity index 89%
copy from
dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_failed.yaml
copy to
dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_fatal.yaml
index c312175c38..b38971a308 100644
---
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_failed.yaml
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_fatal.yaml
@@ -15,8 +15,7 @@
# limitations under the License.
#
-# A(failed) -> B(Condition)(forbidden) -> C(success)
-# -> D(failed)
+# A(fatal-failed) -> B(success) -> D(success)
project:
name: MasterIntegrationTest
code: 1
@@ -24,17 +23,17 @@ project:
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
- updateTime: 2021-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
workflows:
- - name: workflow_with_one_condition_task_with_one_fake_predecessor_failed
+ - name: workflow_with_one_condition_task_with_one_fake_predecessor_fatal
code: 1
version: 1
projectCode: 1
- description: This is a fake workflow with one condition task which has one
predecessor failed
+ description: This is a fake workflow with one condition task that has one
predecessor that failed fatally
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
- updateTime: 2021-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
userId: 1
executionType: PARALLEL
@@ -45,10 +44,11 @@ tasks:
projectCode: 1
userId: 1
taskType: LogicFakeTask
- taskParams: '{"localParams":null,"varPool":[],"shellScript":"error"}'
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo
success"}'
workerGroup: default
+ environmentCode: 144873539254368
createTime: 2024-08-12 00:00:00
- updateTime: 2021-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
taskExecuteType: BATCH
- name: B
code: 2
@@ -59,9 +59,8 @@ tasks:
taskParams:
'{"localParams":[],"resourceList":[],"dependence":{"relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"depTaskCode":1,"status":"SUCCESS"}]}]},"conditionResult":{"successNode":[3],"failedNode":[4]}},'
workerGroup: default
createTime: 2024-08-12 00:00:00
- updateTime: 2021-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
taskExecuteType: BATCH
- flag: NO
- name: C
code: 3
version: 1
@@ -71,7 +70,7 @@ tasks:
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo
success"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
- updateTime: 2021-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
taskExecuteType: BATCH
- name: D
code: 4
@@ -82,7 +81,7 @@ tasks:
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo failed"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
- updateTime: 2021-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
taskExecuteType: BATCH
taskRelations:
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_fatal.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_fatal.yaml
new file mode 100644
index 0000000000..bbd5adf848
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_fatal.yaml
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_one_fake_task_fatal
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5 &&
echo success"}'
+ workerGroup: default
+ environmentCode: 144873539254368
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_failed.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_failed.yaml
index c312175c38..bd5a9d360e 100644
---
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_failed.yaml
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_failed.yaml
@@ -27,11 +27,11 @@ project:
updateTime: 2021-08-12 00:00:00
workflows:
- - name: workflow_with_one_condition_task_with_one_fake_predecessor_failed
+ - name:
workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_failed
code: 1
version: 1
projectCode: 1
- description: This is a fake workflow with one condition task which has one
predecessor failed
+ description: This is a fake workflow with one condition task which is
forbidden and has one predecessor failed
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_failed.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_fatal.yaml
similarity index 87%
copy from
dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_failed.yaml
copy to
dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_fatal.yaml
index c312175c38..c892848d9b 100644
---
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_failed.yaml
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_fatal.yaml
@@ -24,17 +24,17 @@ project:
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
- updateTime: 2021-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
workflows:
- - name: workflow_with_one_condition_task_with_one_fake_predecessor_failed
+ - name:
workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_fatal
code: 1
version: 1
projectCode: 1
- description: This is a fake workflow with one condition task which has one
predecessor failed
+ description: This is a fake workflow with one forbidden condition task
that has one predecessor that failed fatally
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
- updateTime: 2021-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
userId: 1
executionType: PARALLEL
@@ -45,10 +45,11 @@ tasks:
projectCode: 1
userId: 1
taskType: LogicFakeTask
- taskParams: '{"localParams":null,"varPool":[],"shellScript":"error"}'
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo
success"}'
workerGroup: default
+ environmentCode: 144873539254368
createTime: 2024-08-12 00:00:00
- updateTime: 2021-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
taskExecuteType: BATCH
- name: B
code: 2
@@ -59,7 +60,7 @@ tasks:
taskParams:
'{"localParams":[],"resourceList":[],"dependence":{"relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"depTaskCode":1,"status":"SUCCESS"}]}]},"conditionResult":{"successNode":[3],"failedNode":[4]}},'
workerGroup: default
createTime: 2024-08-12 00:00:00
- updateTime: 2021-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
taskExecuteType: BATCH
flag: NO
- name: C
@@ -71,7 +72,7 @@ tasks:
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo
success"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
- updateTime: 2021-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
taskExecuteType: BATCH
- name: D
code: 4
@@ -82,7 +83,7 @@ tasks:
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo failed"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
- updateTime: 2021-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
taskExecuteType: BATCH
taskRelations:
@@ -121,4 +122,4 @@ taskRelations:
postTaskCode: 4
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
- updateTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
\ No newline at end of file