This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 6b03838326 [Fix-17834] [Master] Fix workflow failure strategy cannot
work (#17851)
6b03838326 is described below
commit 6b038383265bb3849252e2c95770d5d43b2df44c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Jan 10 11:49:04 2026 +0800
[Fix-17834] [Master] Fix workflow failure strategy cannot work (#17851)
---
.../task/runnable/ITaskExecutionRunnable.java | 2 +
.../task/runnable/TaskExecutionRunnable.java | 5 +
.../task/statemachine/AbstractTaskStateAction.java | 17 ++--
.../task/statemachine/TaskKillStateAction.java | 2 +-
.../task/statemachine/TaskPauseStateAction.java | 2 +-
.../policy/ContinueWorkflowFailureStrategy.java | 41 +++++++++
.../policy/EndWorkflowFailureStrategy.java | 46 ++++++++++
.../workflow/policy/IWorkflowFailureStrategy.java | 34 +++++++
.../policy/WorkflowFailureStrategyFactory.java | 31 +++++++
.../runnable/IWorkflowExecutionRunnable.java | 10 ++
.../runnable/WorkflowExecutionRunnable.java | 11 ++-
.../statemachine/AbstractWorkflowStateAction.java | 29 +++---
.../statemachine/WorkflowReadyStopStateAction.java | 2 +-
.../statemachine/WorkflowRunningStateAction.java | 2 +-
.../master/runner/WorkflowExecuteContext.java | 4 +-
.../master/integration/WorkflowOperator.java | 5 +
.../integration/cases/WorkflowStartTestCase.java | 40 ++++++++
..._parallel_fake_task_using_failure_strategy.yaml | 101 +++++++++++++++++++++
.../plugin/task/api/enums/TaskExecutionStatus.java | 1 +
19 files changed, 353 insertions(+), 32 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/ITaskExecutionRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/ITaskExecutionRunnable.java
index 2e75abffa4..a67788e79e 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/ITaskExecutionRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/ITaskExecutionRunnable.java
@@ -67,6 +67,8 @@ public interface ITaskExecutionRunnable
*/
boolean isTaskInstanceCanRetry();
+ boolean isFailure();
+
/**
* Retry the TaskExecutionRunnable.
* <p> Will create retry task instance and start it.
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
index 50b4e15a14..8e93892f77 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
@@ -96,6 +96,11 @@ public class TaskExecutionRunnable implements
ITaskExecutionRunnable {
return taskInstance.getRetryTimes() < taskInstance.getMaxRetryTimes();
}
+ @Override
+ public boolean isFailure() {
+ return isTaskInstanceInitialized() && !isTaskInstanceCanRetry() &&
taskInstance.getState().isFailure();
+ }
+
@Override
public void retry() {
checkState(isTaskInstanceInitialized(), "The task instance is not
initialized, can't initialize retry task.");
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index ad0e652d48..422087c5fe 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -136,7 +136,7 @@ public abstract class AbstractTaskStateAction implements
ITaskStateAction {
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
persistentTaskInstancePausedEventToDB(taskExecutionRunnable,
taskPausedEvent);
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainPause(taskExecutionRunnable);
-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
taskExecutionRunnable);
}
private void persistentTaskInstancePausedEventToDB(final
ITaskExecutionRunnable taskExecutionRunnable,
@@ -153,7 +153,7 @@ public abstract class AbstractTaskStateAction implements
ITaskStateAction {
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
persistentTaskInstanceKilledEventToDB(taskExecutionRunnable,
taskInstanceKillEvent);
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainKill(taskExecutionRunnable);
-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
taskExecutionRunnable);
}
private void persistentTaskInstanceKilledEventToDB(final
ITaskExecutionRunnable taskExecutionRunnable,
@@ -181,11 +181,12 @@ public abstract class AbstractTaskStateAction implements
ITaskStateAction {
final IWorkflowExecutionGraph workflowExecutionGraph =
taskExecutionRunnable.getWorkflowExecutionGraph();
if
(workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable))
{
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable,
taskExecutionRunnable);
-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
taskExecutionRunnable);
return;
}
+ // todo: Use Plugin to extend the act strategy on xxEvent.
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);
-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
taskExecutionRunnable);
}
private void persistentTaskInstanceFailedEventToDB(final
ITaskExecutionRunnable taskExecutionRunnable,
@@ -203,7 +204,7 @@ public abstract class AbstractTaskStateAction implements
ITaskStateAction {
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
persistentTaskInstanceSuccessEventToDB(taskExecutionRunnable,
taskSuccessEvent);
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable,
taskExecutionRunnable);
-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
taskExecutionRunnable);
}
protected void mergeTaskVarPoolToWorkflow(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
@@ -244,9 +245,9 @@ public abstract class AbstractTaskStateAction implements
ITaskStateAction {
taskExecutionRunnable.getWorkflowEventBus().publish(TaskDispatchLifecycleEvent.of(taskExecutionRunnable));
}
- protected void publishWorkflowInstanceTopologyLogicalTransitionEvent(final
ITaskExecutionRunnable taskExecutionRunnable) {
- final Integer workflowInstanceId =
taskExecutionRunnable.getWorkflowInstance().getId();
- final IWorkflowExecutionRunnable workflowExecutionRunnable =
workflowRepository.get(workflowInstanceId);
+ protected void publishWorkflowInstanceTopologyLogicalTransitionEvent(
+ final
IWorkflowExecutionRunnable workflowExecutionRunnable,
+ final
ITaskExecutionRunnable taskExecutionRunnable) {
taskExecutionRunnable
.getWorkflowEventBus()
.publish(
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskKillStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskKillStateAction.java
index a2b6c76701..0cabedf9ba 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskKillStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskKillStateAction.java
@@ -47,7 +47,7 @@ public class TaskKillStateAction extends
AbstractTaskStateAction {
final TaskStartLifecycleEvent taskStartEvent) {
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainKill(taskExecutionRunnable);
-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
taskExecutionRunnable);
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskPauseStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskPauseStateAction.java
index f6acf7155d..ae2251944f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskPauseStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskPauseStateAction.java
@@ -47,7 +47,7 @@ public class TaskPauseStateAction extends
AbstractTaskStateAction {
final TaskStartLifecycleEvent taskStartEvent) {
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainPause(taskExecutionRunnable);
-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable,
taskExecutionRunnable);
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/ContinueWorkflowFailureStrategy.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/ContinueWorkflowFailureStrategy.java
new file mode 100644
index 0000000000..927a19b8bf
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/ContinueWorkflowFailureStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.policy;
+
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+
+/**
+ * The strategy used to deal with {@link
org.apache.dolphinscheduler.common.enums.FailureStrategy#CONTINUE} when task
failure occurs.
+ * <p> Will wait the active the tasks finished.
+ */
+public class ContinueWorkflowFailureStrategy implements
IWorkflowFailureStrategy {
+
+ @Override
+ public void onTaskFailure(IWorkflowExecutionRunnable
workflowExecutionRunnable,
+ ITaskExecutionRunnable taskExecutionRunnable) {
+ // do nothing, just continue workflow execution
+ }
+
+ @Override
+ public boolean canTriggerSuccessor(IWorkflowExecutionRunnable
workflowExecutionRunnable,
+ ITaskExecutionRunnable
taskExecutionRunnable) {
+ return true;
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/EndWorkflowFailureStrategy.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/EndWorkflowFailureStrategy.java
new file mode 100644
index 0000000000..142f0dc982
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/EndWorkflowFailureStrategy.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.policy;
+
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * The strategy used to deal with {@link
org.apache.dolphinscheduler.common.enums.FailureStrategy#END} when task failure
occurs.
+ * <p> Will kill the active tasks and end the workflow execution.
+ */
+@Slf4j
+public class EndWorkflowFailureStrategy implements IWorkflowFailureStrategy {
+
+ @Override
+ public void onTaskFailure(IWorkflowExecutionRunnable
workflowExecutionRunnable,
+ ITaskExecutionRunnable taskExecutionRunnable) {
+ log.info("The workflow instance: [{}] using END failure strategy, will
kill the active tasks.",
+ workflowExecutionRunnable.getName());
+ workflowExecutionRunnable.killActiveTasks();
+ }
+
+ @Override
+ public boolean canTriggerSuccessor(IWorkflowExecutionRunnable
workflowExecutionRunnable,
+ ITaskExecutionRunnable
taskExecutionRunnable) {
+ return
!workflowExecutionRunnable.getWorkflowExecutionGraph().isExistFailureTaskExecutionRunnableChain();
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/IWorkflowFailureStrategy.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/IWorkflowFailureStrategy.java
new file mode 100644
index 0000000000..bb6f36176a
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/IWorkflowFailureStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.policy;
+
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+
+/**
+ * Used to deal with {@link
org.apache.dolphinscheduler.common.enums.FailureStrategy} when task failure
occurs
+ */
+public interface IWorkflowFailureStrategy {
+
+ void onTaskFailure(IWorkflowExecutionRunnable workflowExecutionRunnable,
+ ITaskExecutionRunnable taskExecutionRunnable);
+
+ boolean canTriggerSuccessor(IWorkflowExecutionRunnable
workflowExecutionRunnable,
+ ITaskExecutionRunnable taskExecutionRunnable);
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/WorkflowFailureStrategyFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/WorkflowFailureStrategyFactory.java
new file mode 100644
index 0000000000..faba884bbf
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/policy/WorkflowFailureStrategyFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.workflow.policy;
+
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+
+public class WorkflowFailureStrategyFactory {
+
+ public static IWorkflowFailureStrategy getStrategy(final FailureStrategy
failureStrategy) {
+ if (failureStrategy == FailureStrategy.END) {
+ return new EndWorkflowFailureStrategy();
+ }
+ return new ContinueWorkflowFailureStrategy();
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/IWorkflowExecutionRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/IWorkflowExecutionRunnable.java
index 199edd645a..c48d059592 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/IWorkflowExecutionRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/IWorkflowExecutionRunnable.java
@@ -21,7 +21,9 @@ import
org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
import
org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.policy.IWorkflowFailureStrategy;
import
org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
import java.util.List;
@@ -68,6 +70,13 @@ public interface IWorkflowExecutionRunnable {
return workflowExecutionStatus == WorkflowExecutionStatus.READY_STOP;
}
+ /**
+ * Kill the active tasks of the WorkflowExecutionRunnable.
+ */
+ default void killActiveTasks() {
+
getWorkflowExecutionGraph().getActiveTaskExecutionRunnable().forEach(ITaskExecutionRunnable::kill);
+ }
+
/**
* Get the WorkflowExecuteContext belongs to the WorkflowExecutionRunnable.
*/
@@ -111,4 +120,5 @@ public interface IWorkflowExecutionRunnable {
*/
void registerWorkflowInstanceLifecycleListener(IWorkflowLifecycleListener
listener);
+ IWorkflowFailureStrategy getWorkflowFailureStrategy();
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnable.java
index 7fdd59b2e6..c819fecf80 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnable.java
@@ -23,8 +23,11 @@ import
org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStopLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.policy.IWorkflowFailureStrategy;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.policy.WorkflowFailureStrategyFactory;
import
org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
+import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
@@ -39,9 +42,15 @@ public class WorkflowExecutionRunnable implements
IWorkflowExecutionRunnable {
@Getter
private final List<IWorkflowLifecycleListener>
workflowInstanceLifecycleListeners;
+ @Getter
+ private final IWorkflowFailureStrategy workflowFailureStrategy;
+
public WorkflowExecutionRunnable(WorkflowExecutionRunnableBuilder
workflowExecutionRunnableBuilder) {
this.workflowExecuteContext =
workflowExecutionRunnableBuilder.getWorkflowExecuteContextBuilder().build();
- this.workflowInstanceLifecycleListeners =
workflowExecuteContext.getWorkflowInstanceLifecycleListeners();
+ this.workflowInstanceLifecycleListeners =
+ new
ArrayList<>(workflowExecuteContext.getWorkflowInstanceLifecycleListeners());
+ this.workflowFailureStrategy = WorkflowFailureStrategyFactory
+
.getStrategy(workflowExecuteContext.getWorkflowInstance().getFailureStrategy());
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
index 488e3aabe9..459abc8a64 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
@@ -34,6 +34,7 @@ import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.Tas
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
+import
org.apache.dolphinscheduler.server.master.engine.workflow.policy.IWorkflowFailureStrategy;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;
@@ -103,18 +104,6 @@ public abstract class AbstractWorkflowStateAction
implements IWorkflowStateActio
}
}
- protected void killActiveTask(final IWorkflowExecutionRunnable
workflowExecutionRunnable) {
- try {
-
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
- workflowExecutionRunnable
- .getWorkflowExecutionGraph()
- .getActiveTaskExecutionRunnable()
- .forEach(ITaskExecutionRunnable::kill);
- } finally {
- LogUtils.removeWorkflowInstanceIdMDC();
- }
- }
-
protected void pauseActiveTask(final IWorkflowExecutionRunnable
workflowExecutionRunnable) {
try {
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
@@ -129,21 +118,25 @@ public abstract class AbstractWorkflowStateAction
implements IWorkflowStateActio
protected void tryToTriggerSuccessorsAfterTaskFinish(final
IWorkflowExecutionRunnable workflowExecutionRunnable,
final
ITaskExecutionRunnable taskExecutionRunnable) {
+ successorFlowAdjuster.adjustSuccessorFlow(taskExecutionRunnable);
+
+ final IWorkflowFailureStrategy workflowFailureStrategy =
workflowExecutionRunnable.getWorkflowFailureStrategy();
+ if (taskExecutionRunnable.isFailure()) {
+ workflowFailureStrategy.onTaskFailure(workflowExecutionRunnable,
taskExecutionRunnable);
+ }
+
final IWorkflowExecutionGraph workflowExecutionGraph =
workflowExecutionRunnable.getWorkflowExecutionGraph();
if (workflowExecutionGraph.isEndOfTaskChain(taskExecutionRunnable)) {
emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable);
return;
}
- successorFlowAdjuster.adjustSuccessorFlow(taskExecutionRunnable);
- final List<ITaskExecutionRunnable> successors =
workflowExecutionGraph.getSuccessors(taskExecutionRunnable);
- if (successors.isEmpty()) {
- log.debug("The task: {} has no successor, try to emit workflow
finished event",
- taskExecutionRunnable.getName());
+ if
(!workflowFailureStrategy.canTriggerSuccessor(workflowExecutionRunnable,
taskExecutionRunnable)) {
emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable);
return;
}
- triggerTasks(workflowExecutionRunnable, successors);
+
+ triggerTasks(workflowExecutionRunnable,
workflowExecutionGraph.getSuccessors(taskExecutionRunnable));
}
protected void workflowFinish(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
index ee6867ed5f..7cff982e07 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
@@ -85,7 +85,7 @@ public class WorkflowReadyStopStateAction extends
AbstractWorkflowStateAction {
public void onStopEvent(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
final WorkflowStopLifecycleEvent
workflowStopEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- super.killActiveTask(workflowExecutionRunnable);
+ workflowExecutionRunnable.killActiveTasks();
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
index b14b9353b4..16839f378a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
@@ -88,7 +88,7 @@ public class WorkflowRunningStateAction extends
AbstractWorkflowStateAction {
final WorkflowStopLifecycleEvent
workflowStopEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
super.transformWorkflowInstanceState(workflowExecutionRunnable,
WorkflowExecutionStatus.READY_STOP);
- super.killActiveTask(workflowExecutionRunnable);
+ workflowExecutionRunnable.killActiveTasks();
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java
index 67e83b9495..0727ff5e98 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java
@@ -26,7 +26,9 @@ import
org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecution
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
import
org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener;
+import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -91,7 +93,7 @@ public class WorkflowExecuteContext implements
IWorkflowExecuteContext {
workflowGraph,
workflowExecutionGraph,
workflowEventBus,
- workflowInstanceLifecycleListeners);
+
Optional.ofNullable(workflowInstanceLifecycleListeners).orElse(Collections.emptyList()));
}
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
index b3bf673b1f..aaac0b459d 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.integration;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.dao.entity.Project;
@@ -65,6 +66,7 @@ public class WorkflowOperator {
.startParamList(workflowTriggerDTO.getRunWorkflowCommandParam().getCommandParams())
.dryRun(workflowTriggerDTO.getDryRun())
.taskDependType(workflowTriggerDTO.getTaskDependType())
+ .failureStrategy(workflowTriggerDTO.getFailureStrategy())
.build();
final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
@@ -155,6 +157,9 @@ public class WorkflowOperator {
@Builder.Default
private TaskDependType taskDependType = TaskDependType.TASK_POST;
+
+ @Builder.Default
+ private FailureStrategy failureStrategy = FailureStrategy.CONTINUE;
}
@Data
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index a2f0331f34..3c628189df 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -20,6 +20,7 @@ package
org.apache.dolphinscheduler.server.master.integration.cases;
import static com.google.common.truth.Truth.assertThat;
import static org.awaitility.Awaitility.await;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
@@ -250,6 +251,45 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test start a workflow with three fake task(A) using end
failure strategy")
+ public void testStartWorkflow_with_threeFakeTask_usingFailureStrategyEnd()
{
+ final String yaml =
"/it/start/workflow_with_three_parallel_fake_task_using_failure_strategy.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .failureStrategy(FailureStrategy.END)
+ .build();
+ final Integer workflowInstanceId =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
+ .isEqualTo(WorkflowExecutionStatus.FAILURE);
+
Assertions.assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(3)
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
+ })
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("B");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
+ })
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("C");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+ });
+
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
@Test
@DisplayName("Test start a workflow with two fake task(A) using task
group")
public void testStartWorkflow_with_successTaskUsingTaskGroup() {
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_three_parallel_fake_task_using_failure_strategy.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_three_parallel_fake_task_using_failure_strategy.yaml
new file mode 100644
index 0000000000..8d459e1732
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_three_parallel_fake_task_using_failure_strategy.yaml
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_three_parallel_fake_task
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with three parallel success tasks
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 60"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: B
+ code: 2
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 60"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: C
+ code: 3
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"xx"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 2
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 3
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
index 5a7c42bbc6..7ea0d60b1b 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
@@ -24,6 +24,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
public enum TaskExecutionStatus {
+ // todo: refactor the state like WorkflowExecutionStatus
SUBMITTED_SUCCESS(0, "submit success"),
RUNNING_EXECUTION(1, "running"),
PAUSE(3, "pause"),