This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5d63e417ba [Fix-17350] Fix issues of sub-workflow failover from
different master server (#17352)
5d63e417ba is described below
commit 5d63e417ba17935f0bddb91366aefa46e6f20c99
Author: lile <[email protected]>
AuthorDate: Fri Jul 25 09:38:40 2025 +0800
[Fix-17350] Fix issues of sub-workflow failover from different master
server (#17352)
---
.../common/enums/WorkflowExecutionStatus.java | 16 +-
.../dao/mapper/WorkflowInstanceMapper.java | 10 -
.../repository/impl/WorkflowInstanceDaoImpl.java | 2 +-
.../dao/mapper/WorkflowInstanceMapper.xml | 16 -
.../plugin/subworkflow/SubWorkflowLogicTask.java | 7 +-
.../cases/WorkflowInstanceFailoverTestCase.java | 477 ++++++++++++++++++++-
...h_sub_workflow_not_running_in_diff_master.yaml} | 150 ++++++-
...h_sub_workflow_not_running_in_diff_master.yaml} | 150 ++++++-
...h_sub_workflow_not_running_in_diff_master.yaml} | 48 +--
...lowInstance_with_sub_workflow_task_running.yaml | 4 +
..._sub_workflow_task_running_in_diff_master.yaml} | 8 +-
11 files changed, 774 insertions(+), 114 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
index 6d96b31bb6..1cf115b4a2 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.common.enums;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -94,8 +93,19 @@ public enum WorkflowExecutionStatus {
|| this == SERIAL_WAIT;
}
- public boolean canFailover() {
- return Arrays.stream(NEED_FAILOVER_STATES).anyMatch(x -> x ==
this.getCode());
+ /**
+ * status can be take over on sub-workflow
+ * @return bool
+ */
+ public boolean canTakeover() {
+ return this == RUNNING_EXECUTION
+ || this == READY_PAUSE
+ || this == PAUSE
+ || this == READY_STOP
+ || this == STOP
+ || this == FAILURE
+ || this == SUCCESS
+ || this == FAILOVER;
}
public boolean canDirectPauseInDB() {
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
index b7a8de4cec..dee3a559b5 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
@@ -56,16 +56,6 @@ public interface WorkflowInstanceMapper extends
BaseMapper<WorkflowInstance> {
List<WorkflowInstance> queryByHostAndStatus(@Param("host") String host,
@Param("states") int[]
stateArray);
- /**
- * query workflow instance by host and stateArray which is not sub workflow
- *
- * @param host host
- * @param stateArray stateArray
- * @return workflow instance list
- */
- List<WorkflowInstance> queryMainWorkflowByHostAndStatus(@Param("host")
String host,
- @Param("states")
int[] stateArray);
-
/**
* query workflow instance host by stateArray
*
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
index 7ec792d864..cfc381ab8a 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
@@ -176,7 +176,7 @@ public class WorkflowInstanceDaoImpl extends
BaseDao<WorkflowInstance, WorkflowI
@Override
public List<WorkflowInstance> queryNeedFailoverWorkflowInstances(String
masterAddress) {
- return mybatisMapper.queryMainWorkflowByHostAndStatus(masterAddress,
+ return mybatisMapper.queryByHostAndStatus(masterAddress,
WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
index 24ca24ab57..fc2fb6acc2 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
@@ -50,22 +50,6 @@
</if>
order by id asc
</select>
- <select id="queryMainWorkflowByHostAndStatus"
resultType="org.apache.dolphinscheduler.dao.entity.WorkflowInstance">
- select
- <include refid="baseSql"/>
- from t_ds_workflow_instance
- where is_sub_workflow=0
- <if test="host != null and host != ''">
- and host=#{host}
- </if>
- <if test="states != null and states.length != 0">
- and state in
- <foreach collection="states" item="i" open="(" close=")"
separator=",">
- #{i}
- </foreach>
- </if>
- order by id asc
- </select>
<select id="queryNeedFailoverWorkflowInstanceHost" resultType="String">
select distinct host
from t_ds_workflow_instance
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java
index a156760fb5..82c435583f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java
@@ -40,7 +40,6 @@ import
org.apache.dolphinscheduler.server.master.engine.executor.plugin.Abstract
import
org.apache.dolphinscheduler.server.master.engine.executor.plugin.ITaskParameterDeserializer;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
-import org.apache.dolphinscheduler.server.master.failover.WorkflowFailover;
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
import
org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent;
@@ -168,9 +167,9 @@ public class SubWorkflowLogicTask extends
AbstractLogicTask<SubWorkflowParameter
final WorkflowInstance subWorkflowInstance =
workflowInstanceDao.queryById(
subWorkflowLogicTaskRuntimeContext.getSubWorkflowInstanceId());
- if (subWorkflowInstance != null &&
subWorkflowInstance.getState().canFailover()) {
- // Only handle sub-workflow's fail-over in SubWorkflowLogicTask's
fail-over
-
applicationContext.getBean(WorkflowFailover.class).failoverWorkflow(subWorkflowInstance);
+ if (subWorkflowInstance != null &&
subWorkflowInstance.getState().canTakeover()) {
+ // Here we only need to take over the runtime context of
sub-workflow,
+ // the sub-workflow will be failover by master-server when needed.
return subWorkflowLogicTaskRuntimeContext;
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java
index 2967109ac9..ab6f751df4 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java
@@ -21,9 +21,11 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
+import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
@@ -31,8 +33,11 @@ import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowI
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
import
org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
+import org.apache.dolphinscheduler.server.master.cluster.MasterServerMetadata;
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
import
org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent;
+import
org.apache.dolphinscheduler.server.master.engine.system.event.MasterFailoverEvent;
+import org.apache.dolphinscheduler.server.master.failover.FailoverCoordinator;
import
org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
import org.apache.commons.lang3.StringUtils;
@@ -49,6 +54,9 @@ public class WorkflowInstanceFailoverTestCase extends
AbstractMasterIntegrationT
@Autowired
private SystemEventBus systemEventBus;
+ @Autowired
+ FailoverCoordinator failoverCoordinator;
+
@Test
public void testGlobalFailover_runningWorkflow_withSubmittedTasks() {
final String yaml =
"/it/failover/running_workflowInstance_with_one_submitted_fake_task.yaml";
@@ -633,46 +641,495 @@ public class WorkflowInstanceFailoverTestCase extends
AbstractMasterIntegrationT
await()
.atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
.untilAsserted(() -> {
assertThat(repository.queryAllWorkflowInstance())
.hasSize(2)
- .anySatisfy(workflowInstance -> {
+ .allSatisfy(workflowInstance -> {
assertThat(workflowInstance.getState())
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
});
});
+ assertThat(repository.queryTaskInstance(mainWorkflow))
+ .hasSize(2)
+ .allSatisfy(taskInstance -> {
+ assertThat(taskInstance.getState())
+ .isEqualTo(taskInstance.getId() == 1 ?
TaskExecutionStatus.NEED_FAULT_TOLERANCE
+ : TaskExecutionStatus.SUCCESS);
+ assertThat(taskInstance.getName())
+ .isEqualTo("sub_workflow_task");
+ });
+
+ assertThat(repository.queryTaskInstance(subWorkflow))
+ .hasSize(2)
+ .allSatisfy(taskInstance -> {
+ assertThat(taskInstance.getState())
+ .isEqualTo(taskInstance.getId() == 2 ?
TaskExecutionStatus.NEED_FAULT_TOLERANCE
+ : TaskExecutionStatus.SUCCESS);
+ assertThat(taskInstance.getName())
+ .isEqualTo("fake_task_A");
+ });
+
+ assertThat(repository.queryAllTaskInstance()).hasSize(4);
+
+ masterContainer.assertAllResourceReleased();
+
+ }
+
+ @Test
+ public void
testMasterFailover_runningWorkflow_takeOverSubWorkflowOnParentHealthy() {
+ final String yaml =
"/it/failover/running_workflowInstance_with_sub_workflow_task_running_in_diff_master.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition mainWorkflow = context.getWorkflows().stream()
+ .filter(workflow ->
workflow.getName().equals("workflow_with_one_sub_workflow_running")).findFirst()
+ .orElse(null);
+ final WorkflowDefinition subWorkflow = context.getWorkflows().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow_running")).findFirst().orElse(null);
+
+ final WorkflowInstance mainWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow -> workflow.getName()
+
.equals("workflow_with_one_sub_workflow_running-20250424180000000"))
+ .findFirst()
+ .orElse(null);
+ final WorkflowInstance subWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow_running-20250424180000000")).findFirst()
+ .orElse(null);
+
+ assertThat(mainWorkflow).isNotNull();
+ assertThat(subWorkflow).isNotNull();
+ assertThat(mainWorkflowInstance).isNotNull();
+ assertThat(subWorkflowInstance).isNotNull();
+
+ MasterServerMetadata masterServerMain = MasterServerMetadata.builder()
+ .cpuUsage(0.2)
+ .memoryUsage(0.4)
+ .serverStatus(ServerStatus.NORMAL)
+ .address(mainWorkflowInstance.getHost())
+ .build();
+ MasterServerMetadata masterServerSub = MasterServerMetadata.builder()
+ .cpuUsage(0.2)
+ .memoryUsage(0.4)
+ .serverStatus(ServerStatus.NORMAL)
+ .address(subWorkflowInstance.getHost())
+ .build();
+
+ // first start workflow to simulate the normal parent workflow
+ systemEventBus.publish(MasterFailoverEvent.of(masterServerMain, new
Date(), 0));
+
+ final String mainMasterFailoverNodePath =
RegistryUtils.getFailoveredNodePath(
+ masterServerMain.getAddress(),
+ masterServerMain.getServerStartupTime(),
+ masterServerMain.getProcessId());
+ // wait failover main-workflow
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+
assertThat(registryClient.exists(mainMasterFailoverNodePath)).isTrue();
+ });
+ // wait main-workflow started
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+ assertThat(repository.queryWorkflowInstance(mainWorkflow))
+ .hasSize(1)
+ .allSatisfy(workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
+ });
+ });
+
+ // wait sub-workflow-task started
await()
.atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
.untilAsserted(() -> {
assertThat(repository.queryTaskInstance(mainWorkflow))
.hasSize(2)
- .anySatisfy(taskInstance -> {
+ .allSatisfy(taskInstance -> {
assertThat(taskInstance.getState())
.isEqualTo(taskInstance.getId() == 1 ?
TaskExecutionStatus.NEED_FAULT_TOLERANCE
- : TaskExecutionStatus.SUCCESS);
+ :
TaskExecutionStatus.RUNNING_EXECUTION);
assertThat(taskInstance.getName())
.isEqualTo("sub_workflow_task");
});
});
+ // failover sub-workflow
+ systemEventBus.publish(MasterFailoverEvent.of(masterServerSub, new
Date(), 0));
+
await()
.atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
.untilAsserted(() -> {
- assertThat(repository.queryTaskInstance(subWorkflow))
+ assertThat(repository.queryAllWorkflowInstance())
.hasSize(2)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getState())
- .isEqualTo(taskInstance.getId() == 2 ?
TaskExecutionStatus.NEED_FAULT_TOLERANCE
- : TaskExecutionStatus.SUCCESS);
- assertThat(taskInstance.getName())
- .isEqualTo("fake_task_A");
+ .allSatisfy(workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
});
});
+ assertThat(repository.queryTaskInstance(mainWorkflow))
+ .hasSize(2)
+ .allSatisfy(taskInstance -> {
+ assertThat(taskInstance.getState())
+ .isEqualTo(taskInstance.getId() == 1 ?
TaskExecutionStatus.NEED_FAULT_TOLERANCE
+ : TaskExecutionStatus.SUCCESS);
+ assertThat(taskInstance.getName())
+ .isEqualTo("sub_workflow_task");
+ });
+
+ assertThat(repository.queryTaskInstance(subWorkflow))
+ .hasSize(2)
+ .allSatisfy(taskInstance -> {
+ assertThat(taskInstance.getState())
+ .isEqualTo(taskInstance.getId() == 2 ?
TaskExecutionStatus.NEED_FAULT_TOLERANCE
+ : TaskExecutionStatus.SUCCESS);
+ assertThat(taskInstance.getName())
+ .isEqualTo("fake_task_A");
+ });
+
assertThat(repository.queryAllTaskInstance()).hasSize(4);
masterContainer.assertAllResourceReleased();
}
+
+ @Test
+ public void
testMasterFailover_runningWorkflow_takeOverSubWorkflowOnChildHealthy() {
+ final String yaml =
"/it/failover/running_workflowInstance_with_sub_workflow_task_running_in_diff_master.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition mainWorkflow = context.getWorkflows().stream()
+ .filter(workflow ->
workflow.getName().equals("workflow_with_one_sub_workflow_running")).findFirst()
+ .orElse(null);
+ final WorkflowDefinition subWorkflow = context.getWorkflows().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow_running")).findFirst().orElse(null);
+
+ final WorkflowInstance mainWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow -> workflow.getName()
+
.equals("workflow_with_one_sub_workflow_running-20250424180000000"))
+ .findFirst()
+ .orElse(null);
+ final WorkflowInstance subWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow_running-20250424180000000")).findFirst()
+ .orElse(null);
+
+ assertThat(mainWorkflow).isNotNull();
+ assertThat(subWorkflow).isNotNull();
+ assertThat(mainWorkflowInstance).isNotNull();
+ assertThat(subWorkflowInstance).isNotNull();
+
+ MasterServerMetadata masterServerMain = MasterServerMetadata.builder()
+ .cpuUsage(0.2)
+ .memoryUsage(0.4)
+ .serverStatus(ServerStatus.NORMAL)
+ .address(mainWorkflowInstance.getHost())
+ .build();
+ MasterServerMetadata masterServerSub = MasterServerMetadata.builder()
+ .cpuUsage(0.2)
+ .memoryUsage(0.4)
+ .serverStatus(ServerStatus.NORMAL)
+ .address(subWorkflowInstance.getHost())
+ .build();
+
+ // first start sub-workflow to simulate the normal child workflow
+ systemEventBus.publish(MasterFailoverEvent.of(masterServerSub, new
Date(), 0));
+
+ final String subMasterFailoverNodePath =
RegistryUtils.getFailoveredNodePath(
+ masterServerSub.getAddress(),
+ masterServerSub.getServerStartupTime(),
+ masterServerSub.getProcessId());
+ // wait failover sub-workflow
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+
assertThat(registryClient.exists(subMasterFailoverNodePath)).isTrue();
+ });
+
+ // failover main-workflow
+ systemEventBus.publish(MasterFailoverEvent.of(masterServerMain, new
Date(), 0));
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+ assertThat(repository.queryAllWorkflowInstance())
+ .hasSize(2)
+ .allSatisfy(workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
+ });
+ });
+
+ assertThat(repository.queryAllTaskInstance()).filteredOn(
+ taskInstance -> taskInstance.getId() > 2 &&
taskInstance.getState() == TaskExecutionStatus.SUCCESS)
+ .hasSize(2);
+
+ masterContainer.assertAllResourceReleased();
+ }
+
+ @Test
+ public void
testMasterFailover_runningWorkflow_takeOverSubWorkflowOnChildNotHealthy() {
+ final String yaml =
"/it/failover/running_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition mainWorkflow = context.getWorkflows().stream()
+ .filter(workflow ->
workflow.getName().equals("workflow_with_one_sub_workflows")).findFirst()
+ .orElse(null);
+ final WorkflowDefinition subWorkflow = context.getWorkflows().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow")).findFirst().orElse(null);
+
+ final WorkflowInstance mainWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow ->
workflow.getName().equals("workflow_with_sub_workflow_running-20250424180000000"))
+ .findFirst()
+ .orElse(null);
+ final WorkflowInstance submittedSubWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow_submitted-20250424180000000")).findFirst()
+ .orElse(null);
+
+ assertThat(mainWorkflow).isNotNull();
+ assertThat(subWorkflow).isNotNull();
+ assertThat(mainWorkflowInstance).isNotNull();
+ assertThat(submittedSubWorkflowInstance).isNotNull();
+
+ MasterServerMetadata masterServerMain = MasterServerMetadata.builder()
+ .cpuUsage(0.2)
+ .memoryUsage(0.4)
+ .serverStatus(ServerStatus.NORMAL)
+ .address(mainWorkflowInstance.getHost())
+ .build();
+ MasterServerMetadata masterServerSub = MasterServerMetadata.builder()
+ .cpuUsage(0.2)
+ .memoryUsage(0.4)
+ .serverStatus(ServerStatus.NORMAL)
+ .address(submittedSubWorkflowInstance.getHost())
+ .build();
+
+ // first start workflow to simulate the normal parent workflow
+ systemEventBus.publish(MasterFailoverEvent.of(masterServerMain, new
Date(), 0));
+
+ final String mainMasterFailoverNodePath =
RegistryUtils.getFailoveredNodePath(
+ masterServerMain.getAddress(),
+ masterServerMain.getServerStartupTime(),
+ masterServerMain.getProcessId());
+ // wait failover main-workflow
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+
assertThat(registryClient.exists(mainMasterFailoverNodePath)).isTrue();
+ });
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+ assertThat(repository.queryWorkflowInstance(1).getState())
+ .isEqualTo(WorkflowExecutionStatus.SUCCESS);
+ });
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+ assertThat(repository.queryAllWorkflowInstance())
+ .hasSize(3)
+ .filteredOn(workflowInstance ->
workflowInstance.getId() == 3)
+ .allSatisfy(workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
+ });
+ });
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+ assertThat(repository.queryAllTaskInstance())
+ .hasSize(3)
+ .filteredOn(taskInstance -> taskInstance.getId() >
1)
+ .allSatisfy(taskInstance -> {
+ assertThat(taskInstance.getState())
+
.isEqualTo(TaskExecutionStatus.SUCCESS);
+ });
+ });
+
+ masterContainer.assertAllResourceReleased();
+
+ }
+
+ @Test
+ public void
testMasterFailover_readyStopWorkflow_takeOverSubWorkflowOnChildNotHealthy() {
+ final String yaml =
"/it/failover/readyStop_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition mainWorkflow = context.getWorkflows().stream()
+ .filter(workflow ->
workflow.getName().equals("workflow_with_one_sub_workflows")).findFirst()
+ .orElse(null);
+ final WorkflowDefinition subWorkflow = context.getWorkflows().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow")).findFirst().orElse(null);
+
+ final WorkflowInstance mainWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow ->
workflow.getName().equals("workflow_with_sub_workflow_running-20250424180000000"))
+ .findFirst()
+ .orElse(null);
+ final WorkflowInstance submittedSubWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow_submitted-20250424180000000")).findFirst()
+ .orElse(null);
+
+ final WorkflowInstance stopppedSubWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow_stopped-20250424180000000")).findFirst()
+ .orElse(null);
+
+ final WorkflowInstance pausedSubWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow_paused-20250424180000000")).findFirst()
+ .orElse(null);
+
+ assertThat(mainWorkflow).isNotNull();
+ assertThat(subWorkflow).isNotNull();
+ assertThat(mainWorkflowInstance).isNotNull();
+ assertThat(submittedSubWorkflowInstance).isNotNull();
+ assertThat(stopppedSubWorkflowInstance).isNotNull();
+ assertThat(pausedSubWorkflowInstance).isNotNull();
+
+ MasterServerMetadata masterServerMain = MasterServerMetadata.builder()
+ .cpuUsage(0.2)
+ .memoryUsage(0.4)
+ .serverStatus(ServerStatus.NORMAL)
+ .address(mainWorkflowInstance.getHost())
+ .build();
+ MasterServerMetadata masterServerSub = MasterServerMetadata.builder()
+ .cpuUsage(0.2)
+ .memoryUsage(0.4)
+ .serverStatus(ServerStatus.NORMAL)
+ .address(submittedSubWorkflowInstance.getHost())
+ .build();
+
+ systemEventBus.publish(MasterFailoverEvent.of(masterServerMain, new
Date(), 0));
+ systemEventBus.publish(MasterFailoverEvent.of(masterServerSub, new
Date(), 0));
+
+ final String mainMasterFailoverNodePath =
RegistryUtils.getFailoveredNodePath(
+ masterServerMain.getAddress(),
+ masterServerMain.getServerStartupTime(),
+ masterServerMain.getProcessId());
+ // wait failover main-workflow
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+
assertThat(registryClient.exists(mainMasterFailoverNodePath)).isTrue();
+ });
+ // wait main-workflow stop
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+ assertThat(repository.queryWorkflowInstance(mainWorkflow))
+ .hasSize(1)
+ .allSatisfy(workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.STOP);
+ });
+ });
+
+ assertThat(repository.queryAllWorkflowInstance().size()).isEqualTo(4);
+ assertThat(repository.queryAllTaskInstance())
+ .hasSize(6)
+ .filteredOn(taskInstance -> taskInstance.getId() > 4)
+ .anySatisfy(taskInstance -> {
+ assertThat(taskInstance.getState())
+ .isEqualTo(TaskExecutionStatus.KILL);
+ });
+
+ masterContainer.assertAllResourceReleased();
+
+ }
+
+ @Test
+ public void
testMasterFailover_readyPauseWorkflow_takeOverSubWorkflowOnChildNotHealthy() {
+ final String yaml =
+
"/it/failover/readyPause_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition mainWorkflow = context.getWorkflows().stream()
+ .filter(workflow ->
workflow.getName().equals("workflow_with_one_sub_workflows")).findFirst()
+ .orElse(null);
+ final WorkflowDefinition subWorkflow = context.getWorkflows().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow")).findFirst().orElse(null);
+
+ final WorkflowInstance mainWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow ->
workflow.getName().equals("workflow_with_sub_workflow_running-20250424180000000"))
+ .findFirst()
+ .orElse(null);
+ final WorkflowInstance submittedSubWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow_submitted-20250424180000000")).findFirst()
+ .orElse(null);
+
+ final WorkflowInstance stopppedSubWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow_stopped-20250424180000000")).findFirst()
+ .orElse(null);
+
+ final WorkflowInstance pausedSubWorkflowInstance =
context.getWorkflowInstances().stream()
+ .filter(workflow ->
workflow.getName().equals("sub_workflow_paused-20250424180000000")).findFirst()
+ .orElse(null);
+
+ assertThat(mainWorkflow).isNotNull();
+ assertThat(subWorkflow).isNotNull();
+ assertThat(mainWorkflowInstance).isNotNull();
+ assertThat(submittedSubWorkflowInstance).isNotNull();
+ assertThat(stopppedSubWorkflowInstance).isNotNull();
+ assertThat(pausedSubWorkflowInstance).isNotNull();
+
+ MasterServerMetadata masterServerMain = MasterServerMetadata.builder()
+ .cpuUsage(0.2)
+ .memoryUsage(0.4)
+ .serverStatus(ServerStatus.NORMAL)
+ .address(mainWorkflowInstance.getHost())
+ .build();
+ MasterServerMetadata masterServerSub = MasterServerMetadata.builder()
+ .cpuUsage(0.2)
+ .memoryUsage(0.4)
+ .serverStatus(ServerStatus.NORMAL)
+ .address(submittedSubWorkflowInstance.getHost())
+ .build();
+
+ systemEventBus.publish(MasterFailoverEvent.of(masterServerMain, new
Date(), 0));
+ systemEventBus.publish(MasterFailoverEvent.of(masterServerSub, new
Date(), 0));
+
+ final String mainMasterFailoverNodePath =
RegistryUtils.getFailoveredNodePath(
+ masterServerMain.getAddress(),
+ masterServerMain.getServerStartupTime(),
+ masterServerMain.getProcessId());
+ // wait failover main-workflow
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+
assertThat(registryClient.exists(mainMasterFailoverNodePath)).isTrue();
+ });
+ // wait main-workflow stop
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+ assertThat(repository.queryWorkflowInstance(mainWorkflow))
+ .hasSize(1)
+ .allSatisfy(workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.PAUSE);
+ });
+ });
+
+ assertThat(repository.queryAllWorkflowInstance().size()).isEqualTo(4);
+ assertThat(repository.queryAllTaskInstance())
+ .hasSize(6)
+ .filteredOn(taskInstance -> taskInstance.getId() > 4)
+ .allSatisfy(taskInstance -> {
+ assertThat(taskInstance.getState())
+ .isEqualTo(TaskExecutionStatus.PAUSE);
+ });
+
+ masterContainer.assertAllResourceReleased();
+
+ }
}
diff --git
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
b/dolphinscheduler-master/src/test/resources/it/failover/readyPause_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
similarity index 53%
copy from
dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
copy to
dolphinscheduler-master/src/test/resources/it/failover/readyPause_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
index 6d1e1e24ee..35b0231de3 100644
---
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
+++
b/dolphinscheduler-master/src/test/resources/it/failover/readyPause_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
@@ -25,7 +25,7 @@ project:
updateTime: 2025-04-24 00:00:00
workflows:
- - name: workflow_with_one_sub_workflow_running
+ - name: workflow_with_one_sub_workflows
code: 1
version: 1
projectCode: 1
@@ -35,7 +35,7 @@ workflows:
updateTime: 2025-04-24 00:00:00
userId: 1
executionType: PARALLEL
- - name: sub_workflow_running
+ - name: sub_workflow
code: 2
version: 1
projectCode: 1
@@ -47,7 +47,7 @@ workflows:
executionType: PARALLEL
tasks:
- - name: sub_workflow_task
+ - name: sub_workflow_task_submitted
code: 1
version: 1
projectCode: 1
@@ -58,31 +58,57 @@ tasks:
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
taskExecuteType: BATCH
- - name: fake_task_A
+ taskPriority: MEDIUM
+ - name: sub_workflow_task_stopped
code: 2
version: 1
projectCode: 1
userId: 1
+ taskType: SUB_WORKFLOW
+ taskParams:
'{"localParams":[],"resourceList":[],"workflowDefinitionCode":2}'
+ workerGroup: default
+ createTime: 2025-04-24 00:00:00
+ updateTime: 2025-04-24 00:00:00
+ taskExecuteType: BATCH
+ taskPriority: MEDIUM
+ - name: sub_workflow_task_paused
+ code: 3
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: SUB_WORKFLOW
+ taskParams:
'{"localParams":[],"resourceList":[],"workflowDefinitionCode":2}'
+ workerGroup: default
+ createTime: 2025-04-24 00:00:00
+ updateTime: 2025-04-24 00:00:00
+ taskExecuteType: BATCH
+ taskPriority: MEDIUM
+ - name: fake_task_A
+ code: 4
+ version: 1
+ projectCode: 1
+ userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls ."}'
workerGroup: default
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
taskExecuteType: BATCH
+ taskPriority: MEDIUM
workflowInstances:
- id: 1
- name: workflow_with_one_sub_workflow_running-20250424180000000
+ name: workflow_with_sub_workflow_running-20250424180000000
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
projectCode: 1
- state: RUNNING_EXECUTION
+ state: READY_PAUSE
recovery: NO
startTime: 2025-04-24 18:00:00
endTime: null
runTimes: 1
- host: 127.0.0.1:5678
+ host: 1.2.3.4:5678
commandType: START_PROCESS
commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
taskDependType: TASK_POST
@@ -95,16 +121,60 @@ workflowInstances:
varPool: '[]'
dryRun: 0
- id: 2
- name: sub_workflow_running-20250424180000000
+ name: sub_workflow_submitted-20250424180000000
workflowDefinitionCode: 2
workflowDefinitionVersion: 1
projectCode: 1
- state: RUNNING_EXECUTION
+ state: SUBMITTED_SUCCESS
+ recovery: NO
+ startTime: 2025-04-24 18:00:00
+ endTime: null
+ runTimes: 1
+ host: 5.6.7.8:5678
+ commandType: START_PROCESS
+ commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
+ taskDependType: TASK_POST
+ commandStartTime: 2025-04-24 18:00:00
+ isSubWorkflow: YES
+ executorId: 1
+ historyCmd: START_PROCESS
+ workerGroup: default
+ globalParams: '[]'
+ varPool: '[]'
+ dryRun: 0
+ - id: 3
+ name: sub_workflow_stopped-20250424180000000
+ workflowDefinitionCode: 2
+ workflowDefinitionVersion: 1
+ projectCode: 1
+ state: STOP
recovery: NO
startTime: 2025-04-24 18:00:00
endTime: null
runTimes: 1
- host: 127.0.0.1:5678
+ host: 5.6.7.8:5678
+ commandType: START_PROCESS
+ commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
+ taskDependType: TASK_POST
+ commandStartTime: 2025-04-24 18:00:00
+ isSubWorkflow: YES
+ executorId: 1
+ historyCmd: START_PROCESS
+ workerGroup: default
+ globalParams: '[]'
+ varPool: '[]'
+ dryRun: 0
+ - id: 4
+ name: sub_workflow_paused-20250424180000000
+ workflowDefinitionCode: 2
+ workflowDefinitionVersion: 1
+ projectCode: 1
+ state: PAUSE
+ recovery: NO
+ startTime: 2025-04-24 18:00:00
+ endTime: null
+ runTimes: 1
+ host: 5.6.7.8:5678
commandType: START_PROCESS
commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
taskDependType: TASK_POST
@@ -120,7 +190,7 @@ workflowInstances:
taskInstances:
- id: 1
- name: sub_workflow_task
+ name: sub_workflow_task_submitted
taskType: SUB_WORKFLOW
workflowInstanceId: 1
workflowInstanceName:
workflow_with_one_sub_workflow_running-20240816071251690
@@ -143,13 +213,14 @@ taskInstances:
varPool: '[]'
taskExecuteType: BATCH
appLink: '{"subWorkflowInstanceId":2}'
+ taskInstancePriority: MEDIUM
- id: 2
- name: fake_task_A
- taskType: LogicFakeTask
- workflowInstanceId: 2
+ name: sub_workflow_task_stopped
+ taskType: SUB_WORKFLOW
+ workflowInstanceId: 1
workflowInstanceName:
workflow_with_one_sub_workflow_running-20240816071251690
projectCode: 1
- taskCode: 2
+ taskCode: 1
taskDefinitionVersion: 1
state: RUNNING_EXECUTION
firstSubmitTime: 2025-04-24 18:00:00
@@ -158,7 +229,7 @@ taskInstances:
retryTimes: 0
host: 127.0.0.1:1234
maxRetryTimes: 0
- taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls ."}'
+ taskParams: '{"localParams":null,"varPool":[],"workflowDefinitionCode":1}'
flag: YES
retryInterval: 0
delayTime: 0
@@ -166,6 +237,33 @@ taskInstances:
executorId: 1
varPool: '[]'
taskExecuteType: BATCH
+ appLink: '{"subWorkflowInstanceId":3}'
+ taskInstancePriority: MEDIUM
+ - id: 3
+ name: sub_workflow_task_paused
+ taskType: SUB_WORKFLOW
+ workflowInstanceId: 1
+ workflowInstanceName:
workflow_with_one_sub_workflow_running-20240816071251690
+ projectCode: 1
+ taskCode: 1
+ taskDefinitionVersion: 1
+ state: RUNNING_EXECUTION
+ firstSubmitTime: 2025-04-24 18:00:00
+ submitTime: 2025-04-24 18:00:00
+ startTime: 2025-04-24 18:00:00
+ retryTimes: 0
+ host: 127.0.0.1:1234
+ maxRetryTimes: 0
+ taskParams: '{"localParams":null,"varPool":[],"workflowDefinitionCode":1}'
+ flag: YES
+ retryInterval: 0
+ delayTime: 0
+ workerGroup: default
+ executorId: 1
+ varPool: '[]'
+ taskExecuteType: BATCH
+ appLink: '{"subWorkflowInstanceId":4}'
+ taskInstancePriority: MEDIUM
taskRelations:
- projectCode: 1
@@ -178,7 +276,7 @@ taskRelations:
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
- projectCode: 1
- workflowDefinitionCode: 2
+ workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
@@ -186,3 +284,21 @@ taskRelations:
postTaskVersion: 1
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 3
+ postTaskVersion: 1
+ createTime: 2025-04-24 00:00:00
+ updateTime: 2025-04-24 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 2
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 4
+ postTaskVersion: 1
+ createTime: 2025-04-24 00:00:00
+ updateTime: 2025-04-24 00:00:00
diff --git
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
b/dolphinscheduler-master/src/test/resources/it/failover/readyStop_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
similarity index 53%
copy from
dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
copy to
dolphinscheduler-master/src/test/resources/it/failover/readyStop_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
index 6d1e1e24ee..3d81ed4dc5 100644
---
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
+++
b/dolphinscheduler-master/src/test/resources/it/failover/readyStop_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
@@ -25,7 +25,7 @@ project:
updateTime: 2025-04-24 00:00:00
workflows:
- - name: workflow_with_one_sub_workflow_running
+ - name: workflow_with_one_sub_workflows
code: 1
version: 1
projectCode: 1
@@ -35,7 +35,7 @@ workflows:
updateTime: 2025-04-24 00:00:00
userId: 1
executionType: PARALLEL
- - name: sub_workflow_running
+ - name: sub_workflow
code: 2
version: 1
projectCode: 1
@@ -47,7 +47,7 @@ workflows:
executionType: PARALLEL
tasks:
- - name: sub_workflow_task
+ - name: sub_workflow_task_submitted
code: 1
version: 1
projectCode: 1
@@ -58,31 +58,57 @@ tasks:
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
taskExecuteType: BATCH
- - name: fake_task_A
+ taskPriority: MEDIUM
+ - name: sub_workflow_task_stopped
code: 2
version: 1
projectCode: 1
userId: 1
+ taskType: SUB_WORKFLOW
+ taskParams:
'{"localParams":[],"resourceList":[],"workflowDefinitionCode":2}'
+ workerGroup: default
+ createTime: 2025-04-24 00:00:00
+ updateTime: 2025-04-24 00:00:00
+ taskExecuteType: BATCH
+ taskPriority: MEDIUM
+ - name: sub_workflow_task_paused
+ code: 3
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: SUB_WORKFLOW
+ taskParams:
'{"localParams":[],"resourceList":[],"workflowDefinitionCode":2}'
+ workerGroup: default
+ createTime: 2025-04-24 00:00:00
+ updateTime: 2025-04-24 00:00:00
+ taskExecuteType: BATCH
+ taskPriority: MEDIUM
+ - name: fake_task_A
+ code: 4
+ version: 1
+ projectCode: 1
+ userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls ."}'
workerGroup: default
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
taskExecuteType: BATCH
+ taskPriority: MEDIUM
workflowInstances:
- id: 1
- name: workflow_with_one_sub_workflow_running-20250424180000000
+ name: workflow_with_sub_workflow_running-20250424180000000
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
projectCode: 1
- state: RUNNING_EXECUTION
+ state: READY_STOP
recovery: NO
startTime: 2025-04-24 18:00:00
endTime: null
runTimes: 1
- host: 127.0.0.1:5678
+ host: 1.2.3.4:5678
commandType: START_PROCESS
commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
taskDependType: TASK_POST
@@ -95,16 +121,60 @@ workflowInstances:
varPool: '[]'
dryRun: 0
- id: 2
- name: sub_workflow_running-20250424180000000
+ name: sub_workflow_submitted-20250424180000000
workflowDefinitionCode: 2
workflowDefinitionVersion: 1
projectCode: 1
- state: RUNNING_EXECUTION
+ state: SUBMITTED_SUCCESS
+ recovery: NO
+ startTime: 2025-04-24 18:00:00
+ endTime: null
+ runTimes: 1
+ host: 5.6.7.8:5678
+ commandType: START_PROCESS
+ commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
+ taskDependType: TASK_POST
+ commandStartTime: 2025-04-24 18:00:00
+ isSubWorkflow: YES
+ executorId: 1
+ historyCmd: START_PROCESS
+ workerGroup: default
+ globalParams: '[]'
+ varPool: '[]'
+ dryRun: 0
+ - id: 3
+ name: sub_workflow_stopped-20250424180000000
+ workflowDefinitionCode: 2
+ workflowDefinitionVersion: 1
+ projectCode: 1
+ state: STOP
recovery: NO
startTime: 2025-04-24 18:00:00
endTime: null
runTimes: 1
- host: 127.0.0.1:5678
+ host: 5.6.7.8:5678
+ commandType: START_PROCESS
+ commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
+ taskDependType: TASK_POST
+ commandStartTime: 2025-04-24 18:00:00
+ isSubWorkflow: YES
+ executorId: 1
+ historyCmd: START_PROCESS
+ workerGroup: default
+ globalParams: '[]'
+ varPool: '[]'
+ dryRun: 0
+ - id: 4
+ name: sub_workflow_paused-20250424180000000
+ workflowDefinitionCode: 2
+ workflowDefinitionVersion: 1
+ projectCode: 1
+ state: PAUSE
+ recovery: NO
+ startTime: 2025-04-24 18:00:00
+ endTime: null
+ runTimes: 1
+ host: 5.6.7.8:5678
commandType: START_PROCESS
commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
taskDependType: TASK_POST
@@ -120,7 +190,7 @@ workflowInstances:
taskInstances:
- id: 1
- name: sub_workflow_task
+ name: sub_workflow_task_submitted
taskType: SUB_WORKFLOW
workflowInstanceId: 1
workflowInstanceName:
workflow_with_one_sub_workflow_running-20240816071251690
@@ -143,13 +213,14 @@ taskInstances:
varPool: '[]'
taskExecuteType: BATCH
appLink: '{"subWorkflowInstanceId":2}'
+ taskInstancePriority: MEDIUM
- id: 2
- name: fake_task_A
- taskType: LogicFakeTask
- workflowInstanceId: 2
+ name: sub_workflow_task_stopped
+ taskType: SUB_WORKFLOW
+ workflowInstanceId: 1
workflowInstanceName:
workflow_with_one_sub_workflow_running-20240816071251690
projectCode: 1
- taskCode: 2
+ taskCode: 1
taskDefinitionVersion: 1
state: RUNNING_EXECUTION
firstSubmitTime: 2025-04-24 18:00:00
@@ -158,7 +229,7 @@ taskInstances:
retryTimes: 0
host: 127.0.0.1:1234
maxRetryTimes: 0
- taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls ."}'
+ taskParams: '{"localParams":null,"varPool":[],"workflowDefinitionCode":1}'
flag: YES
retryInterval: 0
delayTime: 0
@@ -166,6 +237,33 @@ taskInstances:
executorId: 1
varPool: '[]'
taskExecuteType: BATCH
+ appLink: '{"subWorkflowInstanceId":3}'
+ taskInstancePriority: MEDIUM
+ - id: 3
+ name: sub_workflow_task_paused
+ taskType: SUB_WORKFLOW
+ workflowInstanceId: 1
+ workflowInstanceName:
workflow_with_one_sub_workflow_running-20240816071251690
+ projectCode: 1
+ taskCode: 1
+ taskDefinitionVersion: 1
+ state: RUNNING_EXECUTION
+ firstSubmitTime: 2025-04-24 18:00:00
+ submitTime: 2025-04-24 18:00:00
+ startTime: 2025-04-24 18:00:00
+ retryTimes: 0
+ host: 127.0.0.1:1234
+ maxRetryTimes: 0
+ taskParams: '{"localParams":null,"varPool":[],"workflowDefinitionCode":1}'
+ flag: YES
+ retryInterval: 0
+ delayTime: 0
+ workerGroup: default
+ executorId: 1
+ varPool: '[]'
+ taskExecuteType: BATCH
+ appLink: '{"subWorkflowInstanceId":4}'
+ taskInstancePriority: MEDIUM
taskRelations:
- projectCode: 1
@@ -178,7 +276,7 @@ taskRelations:
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
- projectCode: 1
- workflowDefinitionCode: 2
+ workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
@@ -186,3 +284,21 @@ taskRelations:
postTaskVersion: 1
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 3
+ postTaskVersion: 1
+ createTime: 2025-04-24 00:00:00
+ updateTime: 2025-04-24 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 2
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 4
+ postTaskVersion: 1
+ createTime: 2025-04-24 00:00:00
+ updateTime: 2025-04-24 00:00:00
diff --git
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
similarity index 78%
copy from
dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
copy to
dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
index 6d1e1e24ee..a97c4f4e45 100644
---
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
+++
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
@@ -25,7 +25,7 @@ project:
updateTime: 2025-04-24 00:00:00
workflows:
- - name: workflow_with_one_sub_workflow_running
+ - name: workflow_with_one_sub_workflows
code: 1
version: 1
projectCode: 1
@@ -35,7 +35,7 @@ workflows:
updateTime: 2025-04-24 00:00:00
userId: 1
executionType: PARALLEL
- - name: sub_workflow_running
+ - name: sub_workflow
code: 2
version: 1
projectCode: 1
@@ -47,7 +47,7 @@ workflows:
executionType: PARALLEL
tasks:
- - name: sub_workflow_task
+ - name: sub_workflow_task_submitted
code: 1
version: 1
projectCode: 1
@@ -58,8 +58,9 @@ tasks:
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
taskExecuteType: BATCH
+ taskPriority: MEDIUM
- name: fake_task_A
- code: 2
+ code: 4
version: 1
projectCode: 1
userId: 1
@@ -69,11 +70,12 @@ tasks:
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
taskExecuteType: BATCH
+ taskPriority: MEDIUM
workflowInstances:
- id: 1
- name: workflow_with_one_sub_workflow_running-20250424180000000
+ name: workflow_with_sub_workflow_running-20250424180000000
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
projectCode: 1
@@ -82,7 +84,7 @@ workflowInstances:
startTime: 2025-04-24 18:00:00
endTime: null
runTimes: 1
- host: 127.0.0.1:5678
+ host: 1.2.3.4:5678
commandType: START_PROCESS
commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
taskDependType: TASK_POST
@@ -95,16 +97,16 @@ workflowInstances:
varPool: '[]'
dryRun: 0
- id: 2
- name: sub_workflow_running-20250424180000000
+ name: sub_workflow_submitted-20250424180000000
workflowDefinitionCode: 2
workflowDefinitionVersion: 1
projectCode: 1
- state: RUNNING_EXECUTION
+ state: SUBMITTED_SUCCESS
recovery: NO
startTime: 2025-04-24 18:00:00
endTime: null
runTimes: 1
- host: 127.0.0.1:5678
+ host: 5.6.7.8:5678
commandType: START_PROCESS
commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
taskDependType: TASK_POST
@@ -120,7 +122,7 @@ workflowInstances:
taskInstances:
- id: 1
- name: sub_workflow_task
+ name: sub_workflow_task_submitted
taskType: SUB_WORKFLOW
workflowInstanceId: 1
workflowInstanceName:
workflow_with_one_sub_workflow_running-20240816071251690
@@ -143,29 +145,7 @@ taskInstances:
varPool: '[]'
taskExecuteType: BATCH
appLink: '{"subWorkflowInstanceId":2}'
- - id: 2
- name: fake_task_A
- taskType: LogicFakeTask
- workflowInstanceId: 2
- workflowInstanceName:
workflow_with_one_sub_workflow_running-20240816071251690
- projectCode: 1
- taskCode: 2
- taskDefinitionVersion: 1
- state: RUNNING_EXECUTION
- firstSubmitTime: 2025-04-24 18:00:00
- submitTime: 2025-04-24 18:00:00
- startTime: 2025-04-24 18:00:00
- retryTimes: 0
- host: 127.0.0.1:1234
- maxRetryTimes: 0
- taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls ."}'
- flag: YES
- retryInterval: 0
- delayTime: 0
- workerGroup: default
- executorId: 1
- varPool: '[]'
- taskExecuteType: BATCH
+ taskInstancePriority: MEDIUM
taskRelations:
- projectCode: 1
@@ -182,7 +162,7 @@ taskRelations:
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
- postTaskCode: 2
+ postTaskCode: 4
postTaskVersion: 1
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
diff --git
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
index 6d1e1e24ee..1f858d9991 100644
---
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
+++
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
@@ -58,6 +58,7 @@ tasks:
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
taskExecuteType: BATCH
+ taskPriority: MEDIUM
- name: fake_task_A
code: 2
version: 1
@@ -69,6 +70,7 @@ tasks:
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
taskExecuteType: BATCH
+ taskPriority: MEDIUM
workflowInstances:
@@ -143,6 +145,7 @@ taskInstances:
varPool: '[]'
taskExecuteType: BATCH
appLink: '{"subWorkflowInstanceId":2}'
+ taskInstancePriority: MEDIUM
- id: 2
name: fake_task_A
taskType: LogicFakeTask
@@ -166,6 +169,7 @@ taskInstances:
executorId: 1
varPool: '[]'
taskExecuteType: BATCH
+ taskInstancePriority: MEDIUM
taskRelations:
- projectCode: 1
diff --git
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running_in_diff_master.yaml
similarity index 93%
copy from
dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
copy to
dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running_in_diff_master.yaml
index 6d1e1e24ee..06a59e5bdc 100644
---
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
+++
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running_in_diff_master.yaml
@@ -58,6 +58,7 @@ tasks:
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
taskExecuteType: BATCH
+ taskPriority: MEDIUM
- name: fake_task_A
code: 2
version: 1
@@ -69,6 +70,7 @@ tasks:
createTime: 2025-04-24 00:00:00
updateTime: 2025-04-24 00:00:00
taskExecuteType: BATCH
+ taskPriority: MEDIUM
workflowInstances:
@@ -82,7 +84,7 @@ workflowInstances:
startTime: 2025-04-24 18:00:00
endTime: null
runTimes: 1
- host: 127.0.0.1:5678
+ host: 1.2.3.4:5678
commandType: START_PROCESS
commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
taskDependType: TASK_POST
@@ -104,7 +106,7 @@ workflowInstances:
startTime: 2025-04-24 18:00:00
endTime: null
runTimes: 1
- host: 127.0.0.1:5678
+ host: 5.6.7.8:5678
commandType: START_PROCESS
commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
taskDependType: TASK_POST
@@ -143,6 +145,7 @@ taskInstances:
varPool: '[]'
taskExecuteType: BATCH
appLink: '{"subWorkflowInstanceId":2}'
+ taskInstancePriority: MEDIUM
- id: 2
name: fake_task_A
taskType: LogicFakeTask
@@ -166,6 +169,7 @@ taskInstances:
executorId: 1
varPool: '[]'
taskExecuteType: BATCH
+ taskInstancePriority: MEDIUM
taskRelations:
- projectCode: 1