This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 2b865612c0 [Fix-17597][Master] Fix condition task node not run in
workflow when pre-task node failed (#17606)
2b865612c0 is described below
commit 2b865612c0632cea1ca1d0b81b7562e9eca579c1
Author: sanfeng-lhh <[email protected]>
AuthorDate: Wed Oct 29 23:07:38 2025 +0800
[Fix-17597][Master] Fix condition task node not run in workflow when
pre-task node failed (#17606)
---
.../engine/graph/WorkflowExecutionGraph.java | 6 +-
.../task/statemachine/AbstractTaskStateAction.java | 1 +
.../integration/cases/WorkflowStartTestCase.java | 80 ++++++++++++++
...tion_task_with_one_fake_predecessor_failed.yaml | 122 +++++++++++++++++++++
...ion_task_with_one_fake_predecessor_success.yaml | 122 +++++++++++++++++++++
5 files changed, 329 insertions(+), 2 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
index c21dd08c13..973f0ea935 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
@@ -290,7 +290,8 @@ public class WorkflowExecutionGraph implements
IWorkflowExecutionGraph {
return successors.get(taskExecutionRunnable.getName()).isEmpty()
|| isTaskExecutionRunnableKilled(taskExecutionRunnable)
|| isTaskExecutionRunnablePaused(taskExecutionRunnable)
- || isTaskExecutionRunnableFailed(taskExecutionRunnable);
+ || (isTaskExecutionRunnableFailed(taskExecutionRunnable)
+ &&
!isAllSuccessorsAreConditionTask(taskExecutionRunnable));
}
@Override
@@ -335,7 +336,8 @@ public class WorkflowExecutionGraph implements
IWorkflowExecutionGraph {
}
return successors.stream().allMatch(
successor -> isTaskExecutionRunnableSkipped(successor)
- ||
TaskTypeUtils.isConditionTask(taskExecutionRunnable.getTaskInstance().getTaskType()));
+ ||
(TaskTypeUtils.isConditionTask(successor.getTaskDefinition().getTaskType())
+ &&
!isTaskExecutionRunnableForbidden(successor)));
}
private void assertTaskExecutionRunnableState(final ITaskExecutionRunnable
taskExecutionRunnable,
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index 6a28154d83..e451cc78a7 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -178,6 +178,7 @@ public abstract class AbstractTaskStateAction implements
ITaskStateAction {
// And the DAG will continue to execute.
final IWorkflowExecutionGraph workflowExecutionGraph =
taskExecutionRunnable.getWorkflowExecutionGraph();
if
(workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable))
{
+ mergeTaskVarPoolToWorkflow(workflowExecutionRunnable,
taskExecutionRunnable);
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
return;
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 366902553a..8a9e443910 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -1103,4 +1103,84 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
});
masterContainer.assertAllResourceReleased();
}
+
+ @Test
+ @DisplayName("Test start a workflow with one condition task(B) when one
fake predecessor task(A) run success")
+ void
testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runSuccess() {
+ final String yaml =
"/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_success.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(parentWorkflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ final Integer workflowInstanceId =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+ .matches(
+ workflowInstance ->
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
+
+ Assertions
+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
+ .hasSize(3)
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ })
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("B");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ })
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("C");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
+
+ @Test
+ @DisplayName("Test start a workflow with one condition task(B) when one
fake predecessor task(A) run failed")
+ void
testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFailed() {
+ final String yaml =
"/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_failed.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(parentWorkflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ final Integer workflowInstanceId =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+ .matches(
+ workflowInstance ->
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
+
+ Assertions
+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
+ .hasSize(3)
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+ })
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("B");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ })
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("D");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
}
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_failed.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_failed.yaml
new file mode 100644
index 0000000000..fd9b22df5f
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_failed.yaml
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A(failed) -> B(success) -> D(success)
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_one_condition_task_with_one_fake_predecessor_failed
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with one condition task which has one
predecessor failed
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"error"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: B
+ code: 2
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: CONDITIONS
+ taskParams:
'{"localParams":[],"resourceList":[],"dependence":{"relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"depTaskCode":1,"status":"SUCCESS"}]}]},"conditionResult":{"successNode":[3],"failedNode":[4]}},'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: C
+ code: 3
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo
success"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: D
+ code: 4
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo failed"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 1
+ preTaskVersion: 1
+ postTaskCode: 2
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 2
+ preTaskVersion: 1
+ postTaskCode: 3
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 2
+ preTaskVersion: 1
+ postTaskCode: 4
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_success.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_success.yaml
new file mode 100644
index 0000000000..7aacd7b202
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_success.yaml
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A(success) -> B(success) -> C(success)
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_one_condition_task_with_one_fake_predecessor_success
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with one condition task which has one
predecessor success
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo
success"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: B
+ code: 2
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: CONDITIONS
+ taskParams:
'{"localParams":[],"resourceList":[],"dependence":{"relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"depTaskCode":1,"status":"SUCCESS"}]}]},"conditionResult":{"successNode":[3],"failedNode":[4]}},'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: C
+ code: 3
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo
success"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: D
+ code: 4
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo failed"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 1
+ preTaskVersion: 1
+ postTaskCode: 2
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 2
+ preTaskVersion: 1
+ postTaskCode: 3
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 2
+ preTaskVersion: 1
+ postTaskCode: 4
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00