This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 24e599760f [Fix-17600][Master] Only mark task as inactive in
WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent (#17601)
24e599760f is described below
commit 24e599760f6b6d20d2a9c2c415243bc2f05840f9
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Oct 27 15:52:31 2025 +0800
[Fix-17600][Master] Only mark task as inactive in
WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent (#17601)
---
.../task/statemachine/AbstractTaskStateAction.java | 1 -
.../statemachine/AbstractWorkflowStateAction.java | 35 ++++--
.../WorkflowReadyPauseStateAction.java | 22 +++-
.../statemachine/WorkflowReadyStopStateAction.java | 23 +++-
.../statemachine/WorkflowRunningStateAction.java | 23 ++--
.../integration/cases/WorkflowStartTestCase.java | 35 ++++++
...workflow_with_task_successors_is_forbidden.yaml | 124 +++++++++++++++++++++
7 files changed, 231 insertions(+), 32 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index 1fd82f7f36..6a28154d83 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -244,7 +244,6 @@ public abstract class AbstractTaskStateAction implements
ITaskStateAction {
protected void publishWorkflowInstanceTopologyLogicalTransitionEvent(final
ITaskExecutionRunnable taskExecutionRunnable) {
final Integer workflowInstanceId =
taskExecutionRunnable.getWorkflowInstance().getId();
final IWorkflowExecutionRunnable workflowExecutionRunnable =
workflowRepository.get(workflowInstanceId);
-
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableInActive(taskExecutionRunnable);
taskExecutionRunnable
.getWorkflowEventBus()
.publish(
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
index 9c62a997ac..e8596c0dcc 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
@@ -39,6 +39,7 @@ import
org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;
import org.apache.commons.collections4.CollectionUtils;
+import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
@@ -70,28 +71,27 @@ public abstract class AbstractWorkflowStateAction
implements IWorkflowStateActio
* <p> If all the given tasks trigger condition is not met then will try
to emit workflow finish event.
*/
protected void triggerTasks(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
- final List<ITaskExecutionRunnable>
taskExecutionRunnables) {
+ final List<ITaskExecutionRunnable>
triggerCandidateTasks) {
final IWorkflowExecutionGraph workflowExecutionGraph =
workflowExecutionRunnable.getWorkflowExecutionGraph();
- final List<ITaskExecutionRunnable> readyTaskExecutionRunnableList =
taskExecutionRunnables
+ final List<ITaskExecutionRunnable> readyToTriggerTasks =
triggerCandidateTasks
.stream()
.filter(workflowExecutionGraph::isTriggerConditionMet)
+ .sorted(Comparator.comparing(ITaskExecutionRunnable::getName))
.collect(Collectors.toList());
- if (CollectionUtils.isEmpty(readyTaskExecutionRunnableList)) {
- emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable);
+ if (CollectionUtils.isEmpty(readyToTriggerTasks)) {
return;
}
final WorkflowEventBus workflowEventBus =
workflowExecutionRunnable.getWorkflowEventBus();
- for (ITaskExecutionRunnable readyTaskExecutionRunnable :
readyTaskExecutionRunnableList) {
-
workflowExecutionGraph.markTaskExecutionRunnableActive(readyTaskExecutionRunnable);
- if
(workflowExecutionGraph.isTaskExecutionRunnableSkipped(readyTaskExecutionRunnable)
- ||
workflowExecutionGraph.isTaskExecutionRunnableForbidden(readyTaskExecutionRunnable))
{
-
workflowExecutionGraph.markTaskExecutionRunnableInActive(readyTaskExecutionRunnable);
+ for (ITaskExecutionRunnable readyToTriggerTask : readyToTriggerTasks) {
+
workflowExecutionGraph.markTaskExecutionRunnableActive(readyToTriggerTask);
+ if
(workflowExecutionGraph.isTaskExecutionRunnableSkipped(readyToTriggerTask)
+ ||
workflowExecutionGraph.isTaskExecutionRunnableForbidden(readyToTriggerTask)) {
workflowEventBus.publish(
WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent.of(
- workflowExecutionRunnable,
readyTaskExecutionRunnable));
+ workflowExecutionRunnable,
readyToTriggerTask));
continue;
}
-
workflowEventBus.publish(TaskStartLifecycleEvent.of(readyTaskExecutionRunnable));
+
workflowEventBus.publish(TaskStartLifecycleEvent.of(readyToTriggerTask));
}
}
@@ -128,7 +128,14 @@ public abstract class AbstractWorkflowStateAction
implements IWorkflowStateActio
}
successorFlowAdjuster.adjustSuccessorFlow(taskExecutionRunnable);
- triggerTasks(workflowExecutionRunnable,
workflowExecutionGraph.getSuccessors(taskExecutionRunnable));
+ final List<ITaskExecutionRunnable> successors =
workflowExecutionGraph.getSuccessors(taskExecutionRunnable);
+ if (successors.isEmpty()) {
+ log.debug("The task: {} has no successor, try to emit workflow
finished event",
+ taskExecutionRunnable.getName());
+ emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable);
+ return;
+ }
+ triggerTasks(workflowExecutionRunnable, successors);
}
protected void workflowFinish(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
@@ -165,6 +172,10 @@ public abstract class AbstractWorkflowStateAction
implements IWorkflowStateActio
*/
protected abstract void emitWorkflowFinishedEventIfApplicable(final
IWorkflowExecutionRunnable workflowExecutionRunnable);
+ protected boolean isWorkflowFinishable(final IWorkflowExecutionRunnable
workflowExecutionRunnable) {
+ return
workflowExecutionRunnable.getWorkflowExecutionGraph().isAllTaskExecutionRunnableChainFinish();
+ }
+
/**
* Assert that the state of the task is the expected state.
*
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyPauseStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyPauseStateAction.java
index 8d677a3190..e71473c043 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyPauseStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyPauseStateAction.java
@@ -20,6 +20,7 @@ package
org.apache.dolphinscheduler.server.master.engine.workflow.statemachine;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
import
org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFailedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent;
@@ -31,6 +32,8 @@ import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+import java.util.List;
+
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -45,15 +48,23 @@ public class WorkflowReadyPauseStateAction extends
AbstractWorkflowStateAction {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
final IWorkflowExecutionGraph workflowExecutionGraph =
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowExecutionGraph();
- triggerTasks(workflowExecutionRunnable,
workflowExecutionGraph.getStartNodes());
+ final List<ITaskExecutionRunnable> startNodes =
workflowExecutionGraph.getStartNodes();
+ if (startNodes.isEmpty()) {
+ log.info("Workflow start node is empty, try to emit workflow
finished event");
+ emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable);
+ return;
+ }
+ triggerTasks(workflowExecutionRunnable, startNodes);
}
@Override
public void onTopologyLogicalTransitionEvent(final
IWorkflowExecutionRunnable workflowExecutionRunnable,
final
WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent
workflowTopologyLogicalTransitionWithTaskFinishEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- super.tryToTriggerSuccessorsAfterTaskFinish(workflowExecutionRunnable,
-
workflowTopologyLogicalTransitionWithTaskFinishEvent.getTaskExecutionRunnable());
+ final ITaskExecutionRunnable taskExecutionRunnable =
+
workflowTopologyLogicalTransitionWithTaskFinishEvent.getTaskExecutionRunnable();
+
workflowExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableInActive(taskExecutionRunnable);
+ super.tryToTriggerSuccessorsAfterTaskFinish(workflowExecutionRunnable,
taskExecutionRunnable);
}
@Override
@@ -113,12 +124,11 @@ public class WorkflowReadyPauseStateAction extends
AbstractWorkflowStateAction {
@Override
protected void emitWorkflowFinishedEventIfApplicable(final
IWorkflowExecutionRunnable workflowExecutionRunnable) {
- final IWorkflowExecutionGraph workflowExecutionGraph =
workflowExecutionRunnable.getWorkflowExecutionGraph();
- if (!workflowExecutionGraph.isAllTaskExecutionRunnableChainFinish()) {
+ if (!isWorkflowFinishable(workflowExecutionRunnable)) {
log.debug("There exist task which is not finish, don't need to
emit workflow finished event");
return;
}
-
+ final IWorkflowExecutionGraph workflowExecutionGraph =
workflowExecutionRunnable.getWorkflowExecutionGraph();
final WorkflowEventBus workflowEventBus =
workflowExecutionRunnable.getWorkflowEventBus();
if (workflowExecutionGraph.isExistPausedTaskExecutionRunnableChain()) {
workflowEventBus.publish(WorkflowPausedLifecycleEvent.of(workflowExecutionRunnable));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
index b651d2223d..ee6867ed5f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
@@ -20,6 +20,7 @@ package
org.apache.dolphinscheduler.server.master.engine.workflow.statemachine;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
import
org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFailedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent;
@@ -31,6 +32,8 @@ import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+import java.util.List;
+
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -45,15 +48,23 @@ public class WorkflowReadyStopStateAction extends
AbstractWorkflowStateAction {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
final IWorkflowExecutionGraph workflowExecutionGraph =
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowExecutionGraph();
- triggerTasks(workflowExecutionRunnable,
workflowExecutionGraph.getStartNodes());
+ final List<ITaskExecutionRunnable> startNodes =
workflowExecutionGraph.getStartNodes();
+ if (startNodes.isEmpty()) {
+ log.info("Workflow start node is empty, try to emit workflow
finished event");
+ emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable);
+ return;
+ }
+ triggerTasks(workflowExecutionRunnable, startNodes);
}
@Override
public void onTopologyLogicalTransitionEvent(final
IWorkflowExecutionRunnable workflowExecutionRunnable,
final
WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent
workflowTopologyLogicalTransitionWithTaskFinishEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- super.tryToTriggerSuccessorsAfterTaskFinish(workflowExecutionRunnable,
-
workflowTopologyLogicalTransitionWithTaskFinishEvent.getTaskExecutionRunnable());
+ final ITaskExecutionRunnable taskExecutionRunnable =
+
workflowTopologyLogicalTransitionWithTaskFinishEvent.getTaskExecutionRunnable();
+
workflowExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableInActive(taskExecutionRunnable);
+ super.tryToTriggerSuccessorsAfterTaskFinish(workflowExecutionRunnable,
taskExecutionRunnable);
}
@Override
@@ -112,13 +123,13 @@ public class WorkflowReadyStopStateAction extends
AbstractWorkflowStateAction {
@Override
protected void emitWorkflowFinishedEventIfApplicable(final
IWorkflowExecutionRunnable workflowExecutionRunnable) {
- final IWorkflowExecutionGraph workflowExecutionGraph =
-
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowExecutionGraph();
- if (!workflowExecutionGraph.isAllTaskExecutionRunnableChainFinish()) {
+ if (!isWorkflowFinishable(workflowExecutionRunnable)) {
log.debug("There exist task which is not finish, don't need to
emit workflow finished event");
return;
}
+ final IWorkflowExecutionGraph workflowExecutionGraph =
+
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowExecutionGraph();
final WorkflowEventBus workflowEventBus =
workflowExecutionRunnable.getWorkflowEventBus();
if (workflowExecutionGraph.isExistKilledTaskExecutionRunnableChain()) {
workflowEventBus.publish(WorkflowStoppedLifecycleEvent.of(workflowExecutionRunnable));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
index 469736cea5..b14b9353b4 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
@@ -20,6 +20,7 @@ package
org.apache.dolphinscheduler.server.master.engine.workflow.statemachine;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
import
org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFailedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent;
@@ -31,6 +32,8 @@ import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+import java.util.List;
+
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -45,7 +48,13 @@ public class WorkflowRunningStateAction extends
AbstractWorkflowStateAction {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
final IWorkflowExecutionGraph workflowExecutionGraph =
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowExecutionGraph();
- triggerTasks(workflowExecutionRunnable,
workflowExecutionGraph.getStartNodes());
+ final List<ITaskExecutionRunnable> startNodes =
workflowExecutionGraph.getStartNodes();
+ if (startNodes.isEmpty()) {
+ log.info("Workflow start node is empty, try to emit workflow
finished event");
+ emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable);
+ return;
+ }
+ triggerTasks(workflowExecutionRunnable, startNodes);
}
@Override
@@ -53,8 +62,10 @@ public class WorkflowRunningStateAction extends
AbstractWorkflowStateAction {
final
IWorkflowExecutionRunnable workflowExecutionRunnable,
final
WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent
workflowTopologyLogicalTransitionWithTaskFinishEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- super.tryToTriggerSuccessorsAfterTaskFinish(workflowExecutionRunnable,
-
workflowTopologyLogicalTransitionWithTaskFinishEvent.getTaskExecutionRunnable());
+ final ITaskExecutionRunnable taskExecutionRunnable =
+
workflowTopologyLogicalTransitionWithTaskFinishEvent.getTaskExecutionRunnable();
+
workflowExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableInActive(taskExecutionRunnable);
+ super.tryToTriggerSuccessorsAfterTaskFinish(workflowExecutionRunnable,
taskExecutionRunnable);
}
@Override
@@ -135,13 +146,11 @@ public class WorkflowRunningStateAction extends
AbstractWorkflowStateAction {
*/
@Override
protected void
emitWorkflowFinishedEventIfApplicable(IWorkflowExecutionRunnable
workflowExecutionRunnable) {
- final IWorkflowExecutionGraph workflowExecutionGraph =
-
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowExecutionGraph();
- if (!workflowExecutionGraph.isAllTaskExecutionRunnableChainFinish()) {
+ if (!isWorkflowFinishable(workflowExecutionRunnable)) {
log.debug("There exist task which is not finish, don't need to
emit workflow finished event");
return;
}
-
+ final IWorkflowExecutionGraph workflowExecutionGraph =
workflowExecutionRunnable.getWorkflowExecutionGraph();
final WorkflowEventBus workflowEventBus =
workflowExecutionRunnable.getWorkflowEventBus();
if (workflowExecutionGraph.isExistFailureTaskExecutionRunnableChain())
{
workflowEventBus.publish(WorkflowFailedLifecycleEvent.of(workflowExecutionRunnable));
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index b0acc01449..366902553a 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -40,6 +40,7 @@ import org.apache.commons.lang3.time.DateUtils;
import java.time.Duration;
import java.util.List;
+import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
@@ -1068,4 +1069,38 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
});
masterContainer.assertAllResourceReleased();
}
+
+ @Test
+ @DisplayName("Test start a workflow with task which successors is
forbidden")
+ public void testStartWorkflow_withTaskSuccessorsIsForbidden() {
+ final String yaml =
"/it/start/workflow_with_task_successors_is_forbidden.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance ->
assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.SUCCESS));
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(2)
+ .satisfiesExactly(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ }, (Consumer<TaskInstance>) taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("C1");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
}
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_task_successors_is_forbidden.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_task_successors_is_forbidden.yaml
new file mode 100644
index 0000000000..81a3c45d37
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_task_successors_is_forbidden.yaml
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_task_successors_is_forbidden
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with task which successors is
forbidden
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ flag: YES
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo A"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: B1
+ code: 2
+ version: 1
+ projectCode: 1
+ userId: 1
+ flag: NO
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo B1"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: B2
+ code: 3
+ version: 1
+ projectCode: 1
+ userId: 1
+ flag: NO
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo B2"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: C1
+ code: 4
+ version: 1
+ projectCode: 1
+ userId: 1
+ flag: YES
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo C1"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 1
+ preTaskVersion: 1
+ postTaskCode: 2
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 3
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 3
+ preTaskVersion: 1
+ postTaskCode: 4
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00