This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 66c1d87f7a [Fix-17109] Recovered workflow host may incorrect (#17112)
66c1d87f7a is described below
commit 66c1d87f7aba8dae729dc409b8cbf2317d64a24f
Author: lile <[email protected]>
AuthorDate: Fri Apr 11 10:14:31 2025 +0800
[Fix-17109] Recovered workflow host may incorrect (#17112)
---
.../handler/RecoverFailureTaskCommandHandler.java | 5 +
.../handler/WorkflowFailoverCommandHandler.java | 5 +
.../master/AbstractMasterIntegrationTestCase.java | 8 ++
.../cases/WorkflowInstanceFailoverTestCase.java | 60 +++++++++++
...WorkflowInstanceRecoverFailureTaskTestCase.java | 51 ++++++++++
.../cases/WorkflowInstanceRecoverStopTestCase.java | 50 ++++++++++
...nning_workflowInstance_from_another_master.yaml | 110 ++++++++++++++++++++
.../failure_workflow_from_another_master.yaml | 111 +++++++++++++++++++++
.../stopped_workflow_from_another_master.yaml | 111 +++++++++++++++++++++
9 files changed, 511 insertions(+)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
index 5ad06eaa48..7ef570eab8 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
@@ -25,6 +25,7 @@ import
org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
import
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
import
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor;
@@ -67,6 +68,9 @@ public class RecoverFailureTaskCommandHandler extends
AbstractCommandHandler {
@Autowired
private TaskInstanceFactories taskInstanceFactories;
+ @Autowired
+ private MasterConfig masterConfig;
+
/**
* Generate the recover workflow instance.
* <p> Will use the origin workflow instance, but will update the
following fields. Need to note we cannot not
@@ -90,6 +94,7 @@ public class RecoverFailureTaskCommandHandler extends
AbstractCommandHandler {
workflowInstance.setVarPool(null);
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
command.getCommandType().name());
workflowInstance.setCommandType(command.getCommandType());
+ workflowInstance.setHost(masterConfig.getMasterAddress());
workflowInstanceDao.updateById(workflowInstance);
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
index 24d117fd87..45716ec1dd 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import
org.apache.dolphinscheduler.extract.master.command.WorkflowFailoverCommandParam;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator;
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
import
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
@@ -58,6 +59,9 @@ public class WorkflowFailoverCommandHandler extends
AbstractCommandHandler {
@Autowired
private ApplicationContext applicationContext;
+ @Autowired
+ private MasterConfig masterConfig;
+
/**
* Generate the recover workflow instance.
* <p> Will use the origin workflow instance, but will update the
following fields. Need to note we cannot not
@@ -86,6 +90,7 @@ public class WorkflowFailoverCommandHandler extends
AbstractCommandHandler {
"The WorkflowFailoverCommandParam: " +
command.getCommandParam() + " is invalid");
}
workflowInstance.setState(workflowFailoverCommandParam.getWorkflowExecutionStatus());
+ workflowInstance.setHost(masterConfig.getMasterAddress());
workflowInstanceDao.updateById(workflowInstance);
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/AbstractMasterIntegrationTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/AbstractMasterIntegrationTestCase.java
index 91d11ee714..d6fdfb0c80 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/AbstractMasterIntegrationTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/AbstractMasterIntegrationTestCase.java
@@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.integration.MasterContainer;
import org.apache.dolphinscheduler.server.master.integration.Repository;
import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
@@ -52,4 +54,10 @@ public abstract class AbstractMasterIntegrationTestCase {
@Autowired
protected MasterContainer masterContainer;
+
+ @Autowired
+ protected RegistryClient registryClient;
+
+ @Autowired
+ protected MasterConfig masterConfig;
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java
index 4ef678fe51..ce95719e97 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java
@@ -24,7 +24,12 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
+import org.apache.dolphinscheduler.extract.base.client.Clients;
+import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
+import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
+import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
import
org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
import
org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent;
@@ -555,4 +560,59 @@ public class WorkflowInstanceFailoverTestCase extends
AbstractMasterIntegrationT
masterContainer.assertAllResourceReleased();
}
+ @Test
+ public void testGlobalFailover_runningWorkflow_fromAnotherMaster() {
+ final String yaml =
"/it/failover/running_workflowInstance_from_another_master.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date()));
+
+ final String masterFailoverNodePath =
RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(
+ "127.0.0.1:15678");
+ // wait failover process
+ await()
+ .atMost(Duration.ofMinutes(3))
+ .untilAsserted(() -> {
+
assertThat(registryClient.exists(masterFailoverNodePath)).isTrue();
+ });
+
+ // check workflow's status and can stop it
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ assertThat(repository.queryWorkflowInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
+ assertThat(workflowInstance.getName())
+
.isEqualTo("workflow_with_one_fake_task_running-20250322201900000");
+
+ final WorkflowInstanceStopResponse
stopResponse = Clients
+
.withService(IWorkflowControlClient.class)
+ .withHost(workflowInstance.getHost())
+ .stopWorkflowInstance(
+ new
WorkflowInstanceStopRequest(workflowInstance.getId()));
+
+ assertThat((stopResponse != null &&
stopResponse.isSuccess())).isTrue();
+ });
+ });
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ assertThat(repository.queryWorkflowInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.STOP);
+ assertThat(workflowInstance.getName())
+
.isEqualTo("workflow_with_one_fake_task_running-20250322201900000");
+ });
+ });
+
+ masterContainer.assertAllResourceReleased();
+
+ }
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceRecoverFailureTaskTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceRecoverFailureTaskTestCase.java
index c10b481e0c..6df4edb121 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceRecoverFailureTaskTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceRecoverFailureTaskTestCase.java
@@ -23,7 +23,12 @@ import static org.awaitility.Awaitility.await;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.extract.base.client.Clients;
+import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
+import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
+import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import
org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
import
org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
@@ -89,4 +94,50 @@ public class WorkflowInstanceRecoverFailureTaskTestCase
extends AbstractMasterIn
masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test recover a failure workflow from another master")
+ public void testRecoverFailureWorkflow_from_another_master() {
+ final String yaml =
"/it/recover_failure_tasks/failure_workflow_from_another_master.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final Integer workflowInstanceId =
context.getWorkflowInstance().getId();
+ workflowOperator.recoverFailureTasks(workflowInstanceId);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ assertThat(repository.queryWorkflowInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
+ assertThat(workflowInstance.getName())
+
.isEqualTo("workflow_with_one_fake_task_killed-20250322201900000");
+
+ final WorkflowInstanceStopResponse
stopResponse = Clients
+
.withService(IWorkflowControlClient.class)
+ .withHost(workflowInstance.getHost())
+ .stopWorkflowInstance(
+ new
WorkflowInstanceStopRequest(workflowInstance.getId()));
+
+ assertThat(stopResponse != null &&
stopResponse.isSuccess()).isTrue();
+ });
+ });
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ assertThat(repository.queryWorkflowInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.STOP);
+ assertThat(workflowInstance.getName())
+
.isEqualTo("workflow_with_one_fake_task_killed-20250322201900000");
+ });
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceRecoverStopTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceRecoverStopTestCase.java
index 491dea9e9b..a4616fd88b 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceRecoverStopTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceRecoverStopTestCase.java
@@ -24,7 +24,11 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.extract.base.client.Clients;
+import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
import
org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
+import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
+import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import
org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
@@ -121,4 +125,50 @@ public class WorkflowInstanceRecoverStopTestCase extends
AbstractMasterIntegrati
masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test recover a stopped workflow from another master")
+ public void testRecoverStoppedWorkflow_from_another_master() {
+ final String yaml =
"/it/recover_stopped/stopped_workflow_from_another_master.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final Integer workflowInstanceId =
context.getWorkflowInstance().getId();
+
assertThat(workflowOperator.recoverSuspendWorkflowInstance(workflowInstanceId).isSuccess()).isTrue();
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ assertThat(repository.queryWorkflowInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
+ assertThat(workflowInstance.getName())
+
.isEqualTo("workflow_with_one_fake_task_killed-20250322201900000");
+
+ final WorkflowInstanceStopResponse
stopResponse = Clients
+
.withService(IWorkflowControlClient.class)
+ .withHost(workflowInstance.getHost())
+ .stopWorkflowInstance(
+ new
WorkflowInstanceStopRequest(workflowInstance.getId()));
+
+ assertThat(stopResponse != null &&
stopResponse.isSuccess()).isTrue();
+ });
+ });
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ assertThat(repository.queryWorkflowInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.STOP);
+ assertThat(workflowInstance.getName())
+
.isEqualTo("workflow_with_one_fake_task_killed-20250322201900000");
+ });
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
}
diff --git
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_from_another_master.yaml
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_from_another_master.yaml
new file mode 100644
index 0000000000..93f03e240d
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_from_another_master.yaml
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2025-03-22 00:00:00
+ updateTime: 2025-03-22 00:00:00
+
+workflows:
+ - name: workflow_with_one_fake_task_running
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2025-03-22 00:00:00
+ updateTime: 2025-03-22 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+workflowInstance:
+ id: 1
+ name: workflow_with_one_fake_task_running-20250322201900000
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ projectCode: 1
+ state: RUNNING_EXECUTION
+ recovery: NO
+ startTime: 2025-03-22 20:19:00
+ endTime: null
+ runTimes: 1
+ host: "127.0.0.1:15678"
+ commandType: START_PROCESS
+ commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
+ taskDependType: TASK_POST
+ commandStartTime: 2025-03-22 20:19:00
+ isSubWorkflow: NO
+ executorId: 1
+ historyCmd: START_PROCESS
+ workerGroup: default
+ globalParams: '[]'
+ varPool: '[]'
+ dryRun: 0
+
+taskInstances:
+ - id: 1
+ name: A
+ taskType: LogicFakeTask
+ workflowInstanceId: 1
+ workflowInstanceName: workflow_with_one_fake_task_running-20250322201900000
+ projectCode: 1
+ taskCode: 1
+ taskDefinitionVersion: 1
+ state: RUNNING_EXECUTION
+ firstSubmitTime: 2025-03-22 20:19:00
+ submitTime: 2025-03-22 20:19:00
+ startTime: 2025-03-22 20:19:00
+ retryTimes: 0
+ host: 127.0.0.1:1234
+ maxRetryTimes: 0
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 200"}'
+ flag: YES
+ retryInterval: 0
+ delayTime: 0
+ workerGroup: default
+ executorId: 1
+ varPool: '[]'
+ taskExecuteType: BATCH
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 200"}'
+ workerGroup: default
+ createTime: 2025-03-22 00:00:00
+ updateTime: 2025-03-22 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2025-03-22 00:00:00
+ updateTime: 2025-03-22 00:00:00
diff --git
a/dolphinscheduler-master/src/test/resources/it/recover_failure_tasks/failure_workflow_from_another_master.yaml
b/dolphinscheduler-master/src/test/resources/it/recover_failure_tasks/failure_workflow_from_another_master.yaml
new file mode 100644
index 0000000000..f4047feb25
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/recover_failure_tasks/failure_workflow_from_another_master.yaml
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2025-03-22 00:00:00
+ updateTime: 2025-03-22 00:00:00
+
+workflows:
+ - name: workflow_with_one_fake_task_killed
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2025-03-22 00:00:00
+ updateTime: 2025-03-22 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+workflowInstance:
+ id: 1
+ name: workflow_with_one_fake_task_killed-20250322201900000
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ projectCode: 1
+ state: FAILURE
+ recovery: NO
+ startTime: 2025-03-22 20:19:00
+ endTime: null
+ runTimes: 1
+ host: "127.0.0.1:15678"
+ commandType: START_PROCESS
+ commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
+ taskDependType: TASK_POST
+ commandStartTime: 2025-03-22 20:19:00
+ isSubWorkflow: NO
+ executorId: 1
+ historyCmd: START_PROCESS
+ workerGroup: default
+ globalParams: '[]'
+ varPool: '[]'
+ dryRun: 0
+
+taskInstances:
+ - id: 1
+ name: A
+ taskType: LogicFakeTask
+ workflowInstanceId: 1
+ workflowInstanceName: workflow_with_one_fake_task_killed-20250322201900000
+ projectCode: 1
+ taskCode: 1
+ taskDefinitionVersion: 1
+ state: FAILURE
+ firstSubmitTime: 2025-03-22 20:19:00
+ submitTime: 2025-03-22 20:19:00
+ startTime: 2025-03-22 20:19:00
+ endTime: 2025-03-22 20:19:01
+ retryTimes: 0
+ host: 127.0.0.1:1234
+ maxRetryTimes: 0
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 200"}'
+ flag: YES
+ retryInterval: 0
+ delayTime: 0
+ workerGroup: default
+ executorId: 1
+ varPool: '[]'
+ taskExecuteType: BATCH
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 200"}'
+ workerGroup: default
+ createTime: 2025-03-22 00:00:00
+ updateTime: 2025-03-22 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2025-03-22 00:00:00
+ updateTime: 2025-03-22 00:00:00
diff --git
a/dolphinscheduler-master/src/test/resources/it/recover_stopped/stopped_workflow_from_another_master.yaml
b/dolphinscheduler-master/src/test/resources/it/recover_stopped/stopped_workflow_from_another_master.yaml
new file mode 100644
index 0000000000..cc8d31c51b
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/recover_stopped/stopped_workflow_from_another_master.yaml
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2025-03-22 00:00:00
+ updateTime: 2025-03-22 00:00:00
+
+workflows:
+ - name: workflow_with_one_fake_task_killed
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2025-03-22 00:00:00
+ updateTime: 2025-03-22 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+workflowInstance:
+ id: 1
+ name: workflow_with_one_fake_task_killed-20250322201900000
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ projectCode: 1
+ state: STOP
+ recovery: NO
+ startTime: 2025-03-22 20:19:00
+ endTime: null
+ runTimes: 1
+ host: "127.0.0.1:15678"
+ commandType: START_PROCESS
+ commandParam:
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
+ taskDependType: TASK_POST
+ commandStartTime: 2025-03-22 20:19:00
+ isSubWorkflow: NO
+ executorId: 1
+ historyCmd: START_PROCESS
+ workerGroup: default
+ globalParams: '[]'
+ varPool: '[]'
+ dryRun: 0
+
+taskInstances:
+ - id: 1
+ name: A
+ taskType: LogicFakeTask
+ workflowInstanceId: 1
+ workflowInstanceName: workflow_with_one_fake_task_killed-20250322201900000
+ projectCode: 1
+ taskCode: 1
+ taskDefinitionVersion: 1
+ state: KILL
+ firstSubmitTime: 2025-03-22 20:19:00
+ submitTime: 2025-03-22 20:19:00
+ startTime: 2025-03-22 20:19:00
+ endTime: 2025-03-22 20:19:01
+ retryTimes: 0
+ host: 127.0.0.1:1234
+ maxRetryTimes: 0
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 200"}'
+ flag: YES
+ retryInterval: 0
+ delayTime: 0
+ workerGroup: default
+ executorId: 1
+ varPool: '[]'
+ taskExecuteType: BATCH
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 200"}'
+ workerGroup: default
+ createTime: 2025-03-22 00:00:00
+ updateTime: 2025-03-22 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2025-03-22 00:00:00
+ updateTime: 2025-03-22 00:00:00