This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2b865612c0 [Fix-17597][Master] Fix condition task node not run in 
workflow when pre-task node failed (#17606)
2b865612c0 is described below

commit 2b865612c0632cea1ca1d0b81b7562e9eca579c1
Author: sanfeng-lhh <[email protected]>
AuthorDate: Wed Oct 29 23:07:38 2025 +0800

    [Fix-17597][Master] Fix condition task node not run in workflow when 
pre-task node failed (#17606)
---
 .../engine/graph/WorkflowExecutionGraph.java       |   6 +-
 .../task/statemachine/AbstractTaskStateAction.java |   1 +
 .../integration/cases/WorkflowStartTestCase.java   |  80 ++++++++++++++
 ...tion_task_with_one_fake_predecessor_failed.yaml | 122 +++++++++++++++++++++
 ...ion_task_with_one_fake_predecessor_success.yaml | 122 +++++++++++++++++++++
 5 files changed, 329 insertions(+), 2 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
index c21dd08c13..973f0ea935 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
@@ -290,7 +290,8 @@ public class WorkflowExecutionGraph implements 
IWorkflowExecutionGraph {
         return successors.get(taskExecutionRunnable.getName()).isEmpty()
                 || isTaskExecutionRunnableKilled(taskExecutionRunnable)
                 || isTaskExecutionRunnablePaused(taskExecutionRunnable)
-                || isTaskExecutionRunnableFailed(taskExecutionRunnable);
+                || (isTaskExecutionRunnableFailed(taskExecutionRunnable)
+                        && 
!isAllSuccessorsAreConditionTask(taskExecutionRunnable));
     }
 
     @Override
@@ -335,7 +336,8 @@ public class WorkflowExecutionGraph implements 
IWorkflowExecutionGraph {
         }
         return successors.stream().allMatch(
                 successor -> isTaskExecutionRunnableSkipped(successor)
-                        || 
TaskTypeUtils.isConditionTask(taskExecutionRunnable.getTaskInstance().getTaskType()));
+                        || 
(TaskTypeUtils.isConditionTask(successor.getTaskDefinition().getTaskType())
+                                && 
!isTaskExecutionRunnableForbidden(successor)));
     }
 
     private void assertTaskExecutionRunnableState(final ITaskExecutionRunnable 
taskExecutionRunnable,
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index 6a28154d83..e451cc78a7 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -178,6 +178,7 @@ public abstract class AbstractTaskStateAction implements 
ITaskStateAction {
         // And the DAG will continue to execute.
         final IWorkflowExecutionGraph workflowExecutionGraph = 
taskExecutionRunnable.getWorkflowExecutionGraph();
         if 
(workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) 
{
+            mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, 
taskExecutionRunnable);
             
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
             return;
         }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 366902553a..8a9e443910 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -1103,4 +1103,84 @@ public class WorkflowStartTestCase extends 
AbstractMasterIntegrationTestCase {
                 });
         masterContainer.assertAllResourceReleased();
     }
+
+    @Test
+    @DisplayName("Test start a workflow with one condition task(B) when one 
fake predecessor task(A) run success")
+    void 
testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runSuccess() {
+        final String yaml = 
"/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_success.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(parentWorkflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .build();
+        final Integer workflowInstanceId = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    Assertions
+                            
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+                            .matches(
+                                    workflowInstance -> 
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
+
+                    Assertions
+                            
.assertThat(repository.queryTaskInstance(workflowInstanceId))
+                            .hasSize(3)
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("A");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+                            })
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("B");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+                            })
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("C");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+                            });
+                });
+        masterContainer.assertAllResourceReleased();
+    }
+
+    @Test
+    @DisplayName("Test start a workflow with one condition task(B) when one 
fake predecessor task(A) run failed")
+    void 
testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFailed() {
+        final String yaml = 
"/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_failed.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(parentWorkflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .build();
+        final Integer workflowInstanceId = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    Assertions
+                            
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+                            .matches(
+                                    workflowInstance -> 
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
+
+                    Assertions
+                            
.assertThat(repository.queryTaskInstance(workflowInstanceId))
+                            .hasSize(3)
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("A");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+                            })
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("B");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+                            })
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("D");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+                            });
+                });
+        masterContainer.assertAllResourceReleased();
+    }
 }
diff --git 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_failed.yaml
 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_failed.yaml
new file mode 100644
index 0000000000..fd9b22df5f
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_failed.yaml
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A(failed) -> B(success) -> D(success)
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2021-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_one_condition_task_with_one_fake_predecessor_failed
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with one condition task which has one 
predecessor failed
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    executionType: PARALLEL
+
+tasks:
+  - name: A
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"error"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+  - name: B
+    code: 2
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: CONDITIONS
+    taskParams: 
'{"localParams":[],"resourceList":[],"dependence":{"relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"depTaskCode":1,"status":"SUCCESS"}]}]},"conditionResult":{"successNode":[3],"failedNode":[4]}},'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+  - name: C
+    code: 3
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo 
success"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+  - name: D
+    code: 4
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo failed"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 1
+    preTaskVersion: 1
+    postTaskCode: 2
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 2
+    preTaskVersion: 1
+    postTaskCode: 3
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 2
+    preTaskVersion: 1
+    postTaskCode: 4
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
diff --git 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_success.yaml
 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_success.yaml
new file mode 100644
index 0000000000..7aacd7b202
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_success.yaml
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A(success) -> B(success) -> C(success)
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2021-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_one_condition_task_with_one_fake_predecessor_success
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with one condition task which has one 
predecessor success
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    executionType: PARALLEL
+
+tasks:
+  - name: A
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo 
success"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+  - name: B
+    code: 2
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: CONDITIONS
+    taskParams: 
'{"localParams":[],"resourceList":[],"dependence":{"relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"depTaskCode":1,"status":"SUCCESS"}]}]},"conditionResult":{"successNode":[3],"failedNode":[4]}},'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+  - name: C
+    code: 3
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo 
success"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+  - name: D
+    code: 4
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo failed"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 1
+    preTaskVersion: 1
+    postTaskCode: 2
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 2
+    preTaskVersion: 1
+    postTaskCode: 3
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 2
+    preTaskVersion: 1
+    postTaskCode: 4
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00

Reply via email to