This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7150b42a3c [Fix-17453] Fix TASK_ONLY strategy cannot work (#17461)
7150b42a3c is described below

commit 7150b42a3c388b8a19a21a305bf5b2e20d3f7bed
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Sep 15 18:34:06 2025 +0800

    [Fix-17453] Fix TASK_ONLY strategy cannot work (#17461)
---
 .../master/command/RunWorkflowCommandParam.java    |  7 ++
 .../handler/RecoverFailureTaskCommandHandler.java  |  1 +
 .../command/handler/RunWorkflowCommandHandler.java |  1 +
 .../handler/WorkflowFailoverCommandHandler.java    |  1 +
 .../engine/graph/IWorkflowExecutionGraph.java      |  5 ++
 .../engine/graph/WorkflowExecutionGraph.java       | 23 +++++++
 .../master/integration/WorkflowOperator.java       |  7 +-
 .../integration/cases/WorkflowStartTestCase.java   | 33 +++++++++
 .../it/start/workflow_with_task_only_strategy.yaml | 80 ++++++++++++++++++++++
 9 files changed, 157 insertions(+), 1 deletion(-)

diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/command/RunWorkflowCommandParam.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/command/RunWorkflowCommandParam.java
index 960a4f7177..7ebc7347ff 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/command/RunWorkflowCommandParam.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/command/RunWorkflowCommandParam.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.extract.master.command;
 
 import org.apache.dolphinscheduler.common.enums.CommandType;
 
+import java.util.List;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
@@ -35,4 +37,9 @@ public class RunWorkflowCommandParam extends 
AbstractCommandParam {
         return CommandType.START_PROCESS;
     }
 
+    public RunWorkflowCommandParam withStartNodes(List<Long> startNodes) {
+        this.startNodes = startNodes;
+        return this;
+    }
+
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
index 7ef570eab8..92f5fb4ed0 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
@@ -139,6 +139,7 @@ public class RecoverFailureTaskCommandHandler extends 
AbstractCommandHandler {
                         .doVisitFunction(taskExecutionRunnableCreator)
                         .build();
         workflowGraphTopologyLogicalVisitor.visit();
+        workflowExecutionGraph.removeUnReachableEdge();
 
         
workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph);
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
index 2e1bf83562..a127ae6cea 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
@@ -116,6 +116,7 @@ public class RunWorkflowCommandHandler extends 
AbstractCommandHandler {
                         .doVisitFunction(taskExecutionRunnableCreator)
                         .build();
         workflowGraphTopologyLogicalVisitor.visit();
+        workflowExecutionGraph.removeUnReachableEdge();
 
         
workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph);
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
index 093d61e26e..8348375b6e 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
@@ -137,6 +137,7 @@ public class WorkflowFailoverCommandHandler extends 
AbstractCommandHandler {
                         .doVisitFunction(taskExecutionRunnableCreator)
                         .build();
         workflowGraphTopologyLogicalVisitor.visit();
+        workflowExecutionGraph.removeUnReachableEdge();
 
         
workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph);
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
index fdbbb581da..f94641b4a3 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
@@ -40,6 +40,11 @@ public interface IWorkflowExecutionGraph {
      */
     void addEdge(final String fromTaskName, final Set<String> toTaskName);
 
+    /**
+     * Remove the unreachable edge in the graph.
+     */
+    void removeUnReachableEdge();
+
     /**
      * Return the start tasks, the start tasks in the workflow execution graph 
is the tasks which predecessors is empty.
      */
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
index 0fa4d15cac..c21dd08c13 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
@@ -28,13 +28,16 @@ import org.apache.commons.collections4.CollectionUtils;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 public class WorkflowExecutionGraph implements IWorkflowExecutionGraph {
 
+    // Store all the task execution runnable in the execution graph.
     private final Map<String, ITaskExecutionRunnable> 
totalTaskExecuteRunnableMap;
 
     private final Set<String> failureTaskChains;
@@ -78,6 +81,26 @@ public class WorkflowExecutionGraph implements 
IWorkflowExecutionGraph {
         toTaskNames.forEach(toTask -> predecessors.computeIfAbsent(toTask, k 
-> new HashSet<>()).add(fromTaskName));
     }
 
+    @Override
+    public void removeUnReachableEdge() {
+        // If the node in successors or predecessors is not in 
taskExecuteRunnableMap
+        // It means that the node is not executable, so we need to filter it 
out
+        Consumer<Map<String, Set<String>>> removeUnReachableEdge = edgeMap -> {
+            final Iterator<Map.Entry<String, Set<String>>> iterator = 
edgeMap.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<String, Set<String>> entry = iterator.next();
+                if (!totalTaskExecuteRunnableMap.containsKey(entry.getKey())) {
+                    iterator.remove();
+                    continue;
+                }
+                Set<String> toTasks = entry.getValue();
+                toTasks.removeIf(toTask -> 
!totalTaskExecuteRunnableMap.containsKey(toTask));
+            }
+        };
+        removeUnReachableEdge.accept(successors);
+        removeUnReachableEdge.accept(predecessors);
+    }
+
     @Override
     public List<ITaskExecutionRunnable> getStartNodes() {
         return totalTaskExecuteRunnableMap.values()
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
index c133e29a0d..b3bf673b1f 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.master.integration;
 
 import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -62,7 +63,8 @@ public class WorkflowOperator {
                 
.workflowDefinitionVersion(workflowTriggerDTO.workflowDefinition.getVersion())
                 
.startNodes(workflowTriggerDTO.getRunWorkflowCommandParam().getStartNodes())
                 
.startParamList(workflowTriggerDTO.getRunWorkflowCommandParam().getCommandParams())
-                .dryRun(workflowTriggerDTO.dryRun)
+                .dryRun(workflowTriggerDTO.getDryRun())
+                .taskDependType(workflowTriggerDTO.getTaskDependType())
                 .build();
 
         final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
@@ -150,6 +152,9 @@ public class WorkflowOperator {
 
         @Builder.Default
         private Flag dryRun = Flag.NO;
+
+        @Builder.Default
+        private TaskDependType taskDependType = TaskDependType.TASK_POST;
     }
 
     @Data
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 530b6b7d6f..b0acc01449 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -21,6 +21,7 @@ import static com.google.common.truth.Truth.assertThat;
 import static org.awaitility.Awaitility.await;
 
 import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -1035,4 +1036,36 @@ public class WorkflowStartTestCase extends 
AbstractMasterIntegrationTestCase {
                 });
         masterContainer.assertAllResourceReleased();
     }
+
+    @Test
+    @DisplayName("Test start a workflow with task depend type TASK_ONLY")
+    public void testStartWorkflow_withTaskOnlyStrategy() {
+        final String yaml = "/it/start/workflow_with_task_only_strategy.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new 
RunWorkflowCommandParam().withStartNodes(Lists.newArrayList(1L)))
+                .taskDependType(TaskDependType.TASK_ONLY)
+                .build();
+        workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    Assertions
+                            
.assertThat(repository.queryWorkflowInstance(workflow))
+                            .satisfiesExactly(workflowInstance -> 
assertThat(workflowInstance.getState())
+                                    
.isEqualTo(WorkflowExecutionStatus.SUCCESS));
+                    Assertions
+                            .assertThat(repository.queryTaskInstance(workflow))
+                            .hasSize(1)
+                            .satisfiesExactly(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("A");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+                            });
+                });
+        masterContainer.assertAllResourceReleased();
+    }
 }
diff --git 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_task_only_strategy.yaml
 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_task_only_strategy.yaml
new file mode 100644
index 0000000000..1ff9e24173
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_task_only_strategy.yaml
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2021-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_two_serial_fake_task_success
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with two serial tasks
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    executionType: PARALLEL
+
+tasks:
+  - name: A
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo hello"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+  - name: B
+    code: 2
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo hello"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 1
+    preTaskVersion: 1
+    postTaskCode: 2
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00

Reply via email to