This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6b03838326 [Fix-17834] [Master] Fix workflow failure strategy cannot 
work (#17851)
6b03838326 is described below

commit 6b038383265bb3849252e2c95770d5d43b2df44c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Jan 10 11:49:04 2026 +0800

    [Fix-17834] [Master] Fix workflow failure strategy cannot work (#17851)
---
 .../task/runnable/ITaskExecutionRunnable.java      |   2 +
 .../task/runnable/TaskExecutionRunnable.java       |   5 +
 .../task/statemachine/AbstractTaskStateAction.java |  17 ++--
 .../task/statemachine/TaskKillStateAction.java     |   2 +-
 .../task/statemachine/TaskPauseStateAction.java    |   2 +-
 .../policy/ContinueWorkflowFailureStrategy.java    |  41 +++++++++
 .../policy/EndWorkflowFailureStrategy.java         |  46 ++++++++++
 .../workflow/policy/IWorkflowFailureStrategy.java  |  34 +++++++
 .../policy/WorkflowFailureStrategyFactory.java     |  31 +++++++
 .../runnable/IWorkflowExecutionRunnable.java       |  10 ++
 .../runnable/WorkflowExecutionRunnable.java        |  11 ++-
 .../statemachine/AbstractWorkflowStateAction.java  |  29 +++---
 .../statemachine/WorkflowReadyStopStateAction.java |   2 +-
 .../statemachine/WorkflowRunningStateAction.java   |   2 +-
 .../master/runner/WorkflowExecuteContext.java      |   4 +-
 .../master/integration/WorkflowOperator.java       |   5 +
 .../integration/cases/WorkflowStartTestCase.java   |  40 ++++++++
 ..._parallel_fake_task_using_failure_strategy.yaml | 101 +++++++++++++++++++++
 .../plugin/task/api/enums/TaskExecutionStatus.java |   1 +
 19 files changed, 353 insertions(+), 32 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/ITaskExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/ITaskExecutionRunnable.java
index 2e75abffa4..a67788e79e 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/ITaskExecutionRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/ITaskExecutionRunnable.java
@@ -67,6 +67,8 @@ public interface ITaskExecutionRunnable
      */
     boolean isTaskInstanceCanRetry();
 
+    boolean isFailure();
+
     /**
      * Retry the TaskExecutionRunnable.
      * <p> Will create retry task instance and start it.
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
index 50b4e15a14..8e93892f77 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
@@ -96,6 +96,11 @@ public class TaskExecutionRunnable implements 
ITaskExecutionRunnable {
         return taskInstance.getRetryTimes() < taskInstance.getMaxRetryTimes();
     }
 
+    @Override
+    public boolean isFailure() {
+        return isTaskInstanceInitialized() && !isTaskInstanceCanRetry() && 
taskInstance.getState().isFailure();
+    }
+
     @Override
     public void retry() {
         checkState(isTaskInstanceInitialized(), "The task instance is not 
initialized, can't initialize retry task.");
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index ad0e652d48..422087c5fe 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -136,7 +136,7 @@ public abstract class AbstractTaskStateAction implements 
ITaskStateAction {
         releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
         persistentTaskInstancePausedEventToDB(taskExecutionRunnable, 
taskPausedEvent);
         
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainPause(taskExecutionRunnable);
-        
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+        
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
 taskExecutionRunnable);
     }
 
     private void persistentTaskInstancePausedEventToDB(final 
ITaskExecutionRunnable taskExecutionRunnable,
@@ -153,7 +153,7 @@ public abstract class AbstractTaskStateAction implements 
ITaskStateAction {
         releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
         persistentTaskInstanceKilledEventToDB(taskExecutionRunnable, 
taskInstanceKillEvent);
         
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainKill(taskExecutionRunnable);
-        
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+        
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
 taskExecutionRunnable);
     }
 
     private void persistentTaskInstanceKilledEventToDB(final 
ITaskExecutionRunnable taskExecutionRunnable,
@@ -181,11 +181,12 @@ public abstract class AbstractTaskStateAction implements 
ITaskStateAction {
         final IWorkflowExecutionGraph workflowExecutionGraph = 
taskExecutionRunnable.getWorkflowExecutionGraph();
         if 
(workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) 
{
             mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, 
taskExecutionRunnable);
-            
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+            
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
 taskExecutionRunnable);
             return;
         }
+        // todo: Use Plugin to extend the act strategy on xxEvent.
         
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);
-        
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+        
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
 taskExecutionRunnable);
     }
 
     private void persistentTaskInstanceFailedEventToDB(final 
ITaskExecutionRunnable taskExecutionRunnable,
@@ -203,7 +204,7 @@ public abstract class AbstractTaskStateAction implements 
ITaskStateAction {
         releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
         persistentTaskInstanceSuccessEventToDB(taskExecutionRunnable, 
taskSuccessEvent);
         mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, 
taskExecutionRunnable);
-        
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+        
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
 taskExecutionRunnable);
     }
 
     protected void mergeTaskVarPoolToWorkflow(final IWorkflowExecutionRunnable 
workflowExecutionRunnable,
@@ -244,9 +245,9 @@ public abstract class AbstractTaskStateAction implements 
ITaskStateAction {
         
taskExecutionRunnable.getWorkflowEventBus().publish(TaskDispatchLifecycleEvent.of(taskExecutionRunnable));
     }
 
-    protected void publishWorkflowInstanceTopologyLogicalTransitionEvent(final 
ITaskExecutionRunnable taskExecutionRunnable) {
-        final Integer workflowInstanceId = 
taskExecutionRunnable.getWorkflowInstance().getId();
-        final IWorkflowExecutionRunnable workflowExecutionRunnable = 
workflowRepository.get(workflowInstanceId);
+    protected void publishWorkflowInstanceTopologyLogicalTransitionEvent(
+                                                                         final 
IWorkflowExecutionRunnable workflowExecutionRunnable,
+                                                                         final 
ITaskExecutionRunnable taskExecutionRunnable) {
         taskExecutionRunnable
                 .getWorkflowEventBus()
                 .publish(
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskKillStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskKillStateAction.java
index a2b6c76701..0cabedf9ba 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskKillStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskKillStateAction.java
@@ -47,7 +47,7 @@ public class TaskKillStateAction extends 
AbstractTaskStateAction {
                              final TaskStartLifecycleEvent taskStartEvent) {
         throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
         
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainKill(taskExecutionRunnable);
-        
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+        
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
 taskExecutionRunnable);
     }
 
     @Override
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskPauseStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskPauseStateAction.java
index f6acf7155d..ae2251944f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskPauseStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskPauseStateAction.java
@@ -47,7 +47,7 @@ public class TaskPauseStateAction extends 
AbstractTaskStateAction {
                              final TaskStartLifecycleEvent taskStartEvent) {
         throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
         
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainPause(taskExecutionRunnable);
-        
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+        
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
 taskExecutionRunnable);
     }
 
     @Override
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/ContinueWorkflowFailureStrategy.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/ContinueWorkflowFailureStrategy.java
new file mode 100644
index 0000000000..927a19b8bf
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/ContinueWorkflowFailureStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.policy;
+
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+
+/**
+ * The strategy used to deal with {@link 
org.apache.dolphinscheduler.common.enums.FailureStrategy#CONTINUE} when task 
failure occurs.
+ * <p> Will wait the active the tasks finished.
+ */
+public class ContinueWorkflowFailureStrategy implements 
IWorkflowFailureStrategy {
+
+    @Override
+    public void onTaskFailure(IWorkflowExecutionRunnable 
workflowExecutionRunnable,
+                              ITaskExecutionRunnable taskExecutionRunnable) {
+        // do nothing, just continue workflow execution
+    }
+
+    @Override
+    public boolean canTriggerSuccessor(IWorkflowExecutionRunnable 
workflowExecutionRunnable,
+                                       ITaskExecutionRunnable 
taskExecutionRunnable) {
+        return true;
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/EndWorkflowFailureStrategy.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/EndWorkflowFailureStrategy.java
new file mode 100644
index 0000000000..142f0dc982
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/EndWorkflowFailureStrategy.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.policy;
+
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * The strategy used to deal with {@link 
org.apache.dolphinscheduler.common.enums.FailureStrategy#END} when task failure 
occurs.
+ * <p> Will kill the active tasks and end the workflow execution.
+ */
+@Slf4j
+public class EndWorkflowFailureStrategy implements IWorkflowFailureStrategy {
+
+    @Override
+    public void onTaskFailure(IWorkflowExecutionRunnable 
workflowExecutionRunnable,
+                              ITaskExecutionRunnable taskExecutionRunnable) {
+        log.info("The workflow instance: [{}] using END failure strategy, will 
kill the active tasks.",
+                workflowExecutionRunnable.getName());
+        workflowExecutionRunnable.killActiveTasks();
+    }
+
+    @Override
+    public boolean canTriggerSuccessor(IWorkflowExecutionRunnable 
workflowExecutionRunnable,
+                                       ITaskExecutionRunnable 
taskExecutionRunnable) {
+        return 
!workflowExecutionRunnable.getWorkflowExecutionGraph().isExistFailureTaskExecutionRunnableChain();
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/IWorkflowFailureStrategy.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/IWorkflowFailureStrategy.java
new file mode 100644
index 0000000000..bb6f36176a
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/IWorkflowFailureStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.policy;
+
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+
+/**
+ * Used to deal with {@link 
org.apache.dolphinscheduler.common.enums.FailureStrategy} when task failure 
occurs
+ */
+public interface IWorkflowFailureStrategy {
+
+    void onTaskFailure(IWorkflowExecutionRunnable workflowExecutionRunnable,
+                       ITaskExecutionRunnable taskExecutionRunnable);
+
+    boolean canTriggerSuccessor(IWorkflowExecutionRunnable 
workflowExecutionRunnable,
+                                ITaskExecutionRunnable taskExecutionRunnable);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/WorkflowFailureStrategyFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/WorkflowFailureStrategyFactory.java
new file mode 100644
index 0000000000..faba884bbf
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/WorkflowFailureStrategyFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.policy;
+
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+
+public class WorkflowFailureStrategyFactory {
+
+    public static IWorkflowFailureStrategy getStrategy(final FailureStrategy 
failureStrategy) {
+        if (failureStrategy == FailureStrategy.END) {
+            return new EndWorkflowFailureStrategy();
+        }
+        return new ContinueWorkflowFailureStrategy();
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/IWorkflowExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/IWorkflowExecutionRunnable.java
index 199edd645a..c48d059592 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/IWorkflowExecutionRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/IWorkflowExecutionRunnable.java
@@ -21,7 +21,9 @@ import 
org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
 import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
 import 
org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener;
+import 
org.apache.dolphinscheduler.server.master.engine.workflow.policy.IWorkflowFailureStrategy;
 import 
org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
 
 import java.util.List;
@@ -68,6 +70,13 @@ public interface IWorkflowExecutionRunnable {
         return workflowExecutionStatus == WorkflowExecutionStatus.READY_STOP;
     }
 
+    /**
+     * Kill the active tasks of the WorkflowExecutionRunnable.
+     */
+    default void killActiveTasks() {
+        
getWorkflowExecutionGraph().getActiveTaskExecutionRunnable().forEach(ITaskExecutionRunnable::kill);
+    }
+
     /**
      * Get the WorkflowExecuteContext belongs to the WorkflowExecutionRunnable.
      */
@@ -111,4 +120,5 @@ public interface IWorkflowExecutionRunnable {
      */
     void registerWorkflowInstanceLifecycleListener(IWorkflowLifecycleListener 
listener);
 
+    IWorkflowFailureStrategy getWorkflowFailureStrategy();
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnable.java
index 7fdd59b2e6..c819fecf80 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnable.java
@@ -23,8 +23,11 @@ import 
org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStopLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener;
+import 
org.apache.dolphinscheduler.server.master.engine.workflow.policy.IWorkflowFailureStrategy;
+import 
org.apache.dolphinscheduler.server.master.engine.workflow.policy.WorkflowFailureStrategyFactory;
 import 
org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import lombok.Getter;
@@ -39,9 +42,15 @@ public class WorkflowExecutionRunnable implements 
IWorkflowExecutionRunnable {
     @Getter
     private final List<IWorkflowLifecycleListener> 
workflowInstanceLifecycleListeners;
 
+    @Getter
+    private final IWorkflowFailureStrategy workflowFailureStrategy;
+
     public WorkflowExecutionRunnable(WorkflowExecutionRunnableBuilder 
workflowExecutionRunnableBuilder) {
         this.workflowExecuteContext = 
workflowExecutionRunnableBuilder.getWorkflowExecuteContextBuilder().build();
-        this.workflowInstanceLifecycleListeners = 
workflowExecuteContext.getWorkflowInstanceLifecycleListeners();
+        this.workflowInstanceLifecycleListeners =
+                new 
ArrayList<>(workflowExecuteContext.getWorkflowInstanceLifecycleListeners());
+        this.workflowFailureStrategy = WorkflowFailureStrategyFactory
+                
.getStrategy(workflowExecuteContext.getWorkflowInstance().getFailureStrategy());
     }
 
     @Override
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
index 488e3aabe9..459abc8a64 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
@@ -34,6 +34,7 @@ import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.Tas
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
+import 
org.apache.dolphinscheduler.server.master.engine.workflow.policy.IWorkflowFailureStrategy;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
 import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
 import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;
@@ -103,18 +104,6 @@ public abstract class AbstractWorkflowStateAction 
implements IWorkflowStateActio
         }
     }
 
-    protected void killActiveTask(final IWorkflowExecutionRunnable 
workflowExecutionRunnable) {
-        try {
-            
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
-            workflowExecutionRunnable
-                    .getWorkflowExecutionGraph()
-                    .getActiveTaskExecutionRunnable()
-                    .forEach(ITaskExecutionRunnable::kill);
-        } finally {
-            LogUtils.removeWorkflowInstanceIdMDC();
-        }
-    }
-
     protected void pauseActiveTask(final IWorkflowExecutionRunnable 
workflowExecutionRunnable) {
         try {
             
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
@@ -129,21 +118,25 @@ public abstract class AbstractWorkflowStateAction 
implements IWorkflowStateActio
 
     protected void tryToTriggerSuccessorsAfterTaskFinish(final 
IWorkflowExecutionRunnable workflowExecutionRunnable,
                                                          final 
ITaskExecutionRunnable taskExecutionRunnable) {
+        successorFlowAdjuster.adjustSuccessorFlow(taskExecutionRunnable);
+
+        final IWorkflowFailureStrategy workflowFailureStrategy = 
workflowExecutionRunnable.getWorkflowFailureStrategy();
+        if (taskExecutionRunnable.isFailure()) {
+            workflowFailureStrategy.onTaskFailure(workflowExecutionRunnable, 
taskExecutionRunnable);
+        }
+
         final IWorkflowExecutionGraph workflowExecutionGraph = 
workflowExecutionRunnable.getWorkflowExecutionGraph();
         if (workflowExecutionGraph.isEndOfTaskChain(taskExecutionRunnable)) {
             emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable);
             return;
         }
 
-        successorFlowAdjuster.adjustSuccessorFlow(taskExecutionRunnable);
-        final List<ITaskExecutionRunnable> successors = 
workflowExecutionGraph.getSuccessors(taskExecutionRunnable);
-        if (successors.isEmpty()) {
-            log.debug("The task: {} has no successor, try to emit workflow 
finished event",
-                    taskExecutionRunnable.getName());
+        if 
(!workflowFailureStrategy.canTriggerSuccessor(workflowExecutionRunnable, 
taskExecutionRunnable)) {
             emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable);
             return;
         }
-        triggerTasks(workflowExecutionRunnable, successors);
+
+        triggerTasks(workflowExecutionRunnable, 
workflowExecutionGraph.getSuccessors(taskExecutionRunnable));
     }
 
     protected void workflowFinish(final IWorkflowExecutionRunnable 
workflowExecutionRunnable,
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
index ee6867ed5f..7cff982e07 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
@@ -85,7 +85,7 @@ public class WorkflowReadyStopStateAction extends 
AbstractWorkflowStateAction {
     public void onStopEvent(final IWorkflowExecutionRunnable 
workflowExecutionRunnable,
                             final WorkflowStopLifecycleEvent 
workflowStopEvent) {
         throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
-        super.killActiveTask(workflowExecutionRunnable);
+        workflowExecutionRunnable.killActiveTasks();
     }
 
     @Override
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
index b14b9353b4..16839f378a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
@@ -88,7 +88,7 @@ public class WorkflowRunningStateAction extends 
AbstractWorkflowStateAction {
                             final WorkflowStopLifecycleEvent 
workflowStopEvent) {
         throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
         super.transformWorkflowInstanceState(workflowExecutionRunnable, 
WorkflowExecutionStatus.READY_STOP);
-        super.killActiveTask(workflowExecutionRunnable);
+        workflowExecutionRunnable.killActiveTasks();
     }
 
     @Override
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java
index 67e83b9495..0727ff5e98 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java
@@ -26,7 +26,9 @@ import 
org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecution
 import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -91,7 +93,7 @@ public class WorkflowExecuteContext implements 
IWorkflowExecuteContext {
                     workflowGraph,
                     workflowExecutionGraph,
                     workflowEventBus,
-                    workflowInstanceLifecycleListeners);
+                    
Optional.ofNullable(workflowInstanceLifecycleListeners).orElse(Collections.emptyList()));
         }
     }
 
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
index b3bf673b1f..aaac0b459d 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.integration;
 
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.dao.entity.Project;
@@ -65,6 +66,7 @@ public class WorkflowOperator {
                 
.startParamList(workflowTriggerDTO.getRunWorkflowCommandParam().getCommandParams())
                 .dryRun(workflowTriggerDTO.getDryRun())
                 .taskDependType(workflowTriggerDTO.getTaskDependType())
+                .failureStrategy(workflowTriggerDTO.getFailureStrategy())
                 .build();
 
         final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
@@ -155,6 +157,9 @@ public class WorkflowOperator {
 
         @Builder.Default
         private TaskDependType taskDependType = TaskDependType.TASK_POST;
+
+        @Builder.Default
+        private FailureStrategy failureStrategy = FailureStrategy.CONTINUE;
     }
 
     @Data
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index a2f0331f34..3c628189df 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -20,6 +20,7 @@ package 
org.apache.dolphinscheduler.server.master.integration.cases;
 import static com.google.common.truth.Truth.assertThat;
 import static org.awaitility.Awaitility.await;
 
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
@@ -250,6 +251,45 @@ public class WorkflowStartTestCase extends 
AbstractMasterIntegrationTestCase {
         masterContainer.assertAllResourceReleased();
     }
 
+    @Test
+    @DisplayName("Test start a workflow with three fake task(A) using end 
failure strategy")
+    public void testStartWorkflow_with_threeFakeTask_usingFailureStrategyEnd() 
{
+        final String yaml = 
"/it/start/workflow_with_three_parallel_fake_task_using_failure_strategy.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .failureStrategy(FailureStrategy.END)
+                .build();
+        final Integer workflowInstanceId = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
+                            .isEqualTo(WorkflowExecutionStatus.FAILURE);
+                    
Assertions.assertThat(repository.queryTaskInstance(workflow))
+                            .hasSize(3)
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("A");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
+                            })
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("B");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
+                            })
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("C");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+                            });
+
+                });
+
+        masterContainer.assertAllResourceReleased();
+    }
+
     @Test
     @DisplayName("Test start a workflow with two fake task(A) using task 
group")
     public void testStartWorkflow_with_successTaskUsingTaskGroup() {
diff --git 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_three_parallel_fake_task_using_failure_strategy.yaml
 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_three_parallel_fake_task_using_failure_strategy.yaml
new file mode 100644
index 0000000000..8d459e1732
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_three_parallel_fake_task_using_failure_strategy.yaml
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2021-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_three_parallel_fake_task
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with three parallel success tasks
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    executionType: PARALLEL
+
+tasks:
+  - name: A
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 60"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+  - name: B
+    code: 2
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 60"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+  - name: C
+    code: 3
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"xx"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 2
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 3
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
index 5a7c42bbc6..7ea0d60b1b 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
@@ -24,6 +24,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
 
 public enum TaskExecutionStatus {
 
+    // todo: refactor the state like WorkflowExecutionStatus
     SUBMITTED_SUCCESS(0, "submit success"),
     RUNNING_EXECUTION(1, "running"),
     PAUSE(3, "pause"),


Reply via email to