This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new e119b1d7c9 [Fix-17732] Change workflow instance status to failure when
command handle failed (#17745)
e119b1d7c9 is described below
commit e119b1d7c94fe6febe2b1a8b3759cd0d03033e2a
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 3 09:51:40 2025 +0800
[Fix-17732] Change workflow instance status to failure when command handle
failed (#17745)
---
.../dao/mapper/WorkflowInstanceMapper.java | 2 +
.../dao/repository/WorkflowInstanceDao.java | 2 +
.../repository/impl/WorkflowInstanceDaoImpl.java | 5 ++
.../dao/mapper/WorkflowInstanceMapper.xml | 4 ++
.../impl/WorkflowInstanceDaoImplTest.java | 9 +++
.../master/engine/command/CommandEngine.java | 22 +++++-
.../server/master/engine/graph/WorkflowGraph.java | 21 ++++--
.../integration/cases/WorkflowStartTestCase.java | 24 +++++++
.../start/workflow_with_duplicate_task_name.yaml | 81 ++++++++++++++++++++++
.../service/command/CommandServiceImpl.java | 10 +--
10 files changed, 164 insertions(+), 16 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
index dee3a559b5..eae9691d1b 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
@@ -138,6 +138,8 @@ public interface WorkflowInstanceMapper extends
BaseMapper<WorkflowInstance> {
@Param("originState")
WorkflowExecutionStatus originState,
@Param("targetState")
WorkflowExecutionStatus targetState);
+ int forceUpdateWorkflowInstanceState(@Param("id") Integer id,
@Param("status") WorkflowExecutionStatus status);
+
/**
* update workflow instance by tenantCode
*
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
index fb1a093bf3..52b0770109 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
@@ -39,6 +39,8 @@ public interface WorkflowInstanceDao extends
IDao<WorkflowInstance> {
WorkflowExecutionStatus originState,
WorkflowExecutionStatus targetState);
+ void forceUpdateWorkflowInstanceState(Integer id, WorkflowExecutionStatus
status);
+
/**
* find last scheduler workflow instance in the date interval
*
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
index b9b92f54dd..6f35057d44 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
@@ -72,6 +72,11 @@ public class WorkflowInstanceDaoImpl extends
BaseDao<WorkflowInstance, WorkflowI
}
}
+ @Override
+ public void forceUpdateWorkflowInstanceState(Integer id,
WorkflowExecutionStatus status) {
+ mybatisMapper.forceUpdateWorkflowInstanceState(id, status);
+ }
+
/**
* find last scheduler process instance in the date interval
*
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
index fc2fb6acc2..906e72137f 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
@@ -158,6 +158,10 @@
where id = #{workflowInstanceId} and state = #{originState}
</update>
+ <update id="forceUpdateWorkflowInstanceState">
+ update t_ds_workflow_instance set state = #{status} where id = #{id}
+ </update>
+
<update id="updateWorkflowInstanceByTenantCode">
update t_ds_workflow_instance
set tenant_code = #{destTenantCode}
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java
index 1860b0d389..f2f76d8f09 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java
@@ -96,6 +96,15 @@ class WorkflowInstanceDaoImplTest extends BaseDaoTest {
unsupportedOperationException.getMessage());
}
+ @Test
+ void forceUpdateWorkflowInstanceState() {
+ WorkflowInstance workflowInstance = createWorkflowInstance(1L, 1,
WorkflowExecutionStatus.RUNNING_EXECUTION);
+ workflowInstanceDao.insert(workflowInstance);
+
workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstance.getId(),
WorkflowExecutionStatus.FAILURE);
+ assertEquals(WorkflowExecutionStatus.FAILURE,
+
workflowInstanceDao.queryById(workflowInstance.getId()).getState());
+ }
+
@Test
void queryByWorkflowCodeVersionStatus_EXIST_FINISH_INSTANCE() {
long workflowDefinitionCode = 1L;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
index 5e84a24a76..1e24695264 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@@ -52,6 +53,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.support.TransactionTemplate;
/**
* Master scheduler thread, this thread will consume the commands from
database and trigger processInstance executed.
@@ -75,6 +77,9 @@ public class CommandEngine extends BaseDaemonThread
implements AutoCloseable {
@Autowired
private IWorkflowRepository workflowRepository;
+ @Autowired
+ private WorkflowInstanceDao workflowInstanceDao;
+
@Autowired
private WorkflowExecutionRunnableFactory workflowExecutionRunnableFactory;
@@ -84,6 +89,9 @@ public class CommandEngine extends BaseDaemonThread
implements AutoCloseable {
@Autowired
private WorkflowEventBusCoordinator workflowEventBusCoordinator;
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+
private ExecutorService commandHandleThreadPool;
private boolean flag = false;
@@ -189,8 +197,18 @@ public class CommandEngine extends BaseDaemonThread
implements AutoCloseable {
throwable);
return null;
}
- log.error("Failed bootstrap command {} ",
JSONUtils.toPrettyJsonString(command), throwable);
- commandService.moveToErrorCommand(command,
ExceptionUtils.getStackTrace(throwable));
+
+ transactionTemplate.execute(status -> {
+ log.warn("Failed bootstrap command {} ",
JSONUtils.toPrettyJsonString(command), throwable);
+ final int workflowInstanceId = command.getWorkflowInstanceId();
+
+
workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstanceId,
WorkflowExecutionStatus.FAILURE);
+ log.info("Set workflow instance {} state to FAILURE",
workflowInstanceId);
+
+ commandService.moveToErrorCommand(command,
ExceptionUtils.getStackTrace(throwable));
+ log.info("Move command {} to error command table",
command.getId());
+ return null;
+ });
return null;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java
index 5641822816..5245c7c595 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java
@@ -28,7 +28,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Function;
import java.util.stream.Collectors;
public class WorkflowGraph implements IWorkflowGraph {
@@ -46,12 +45,20 @@ public class WorkflowGraph implements IWorkflowGraph {
this.predecessors = new HashMap<>();
this.successors = new HashMap<>();
- this.taskDefinitionMap = taskDefinitions
- .stream()
- .collect(Collectors.toMap(TaskDefinition::getName,
Function.identity()));
- this.taskDefinitionCodeMap = taskDefinitions
- .stream()
- .collect(Collectors.toMap(TaskDefinition::getCode,
Function.identity()));
+ this.taskDefinitionMap = new HashMap<>(taskDefinitions.size());
+ this.taskDefinitionCodeMap = new HashMap<>(taskDefinitions.size());
+ for (TaskDefinition taskDefinition : taskDefinitions) {
+ if (taskDefinitionMap.containsKey(taskDefinition.getName())) {
+ throw new IllegalArgumentException(
+ "Duplicate task name: " + taskDefinition.getName() + "
in the workflow");
+ }
+ taskDefinitionMap.put(taskDefinition.getName(), taskDefinition);
+ if (taskDefinitionCodeMap.containsKey(taskDefinition.getCode())) {
+ throw new IllegalArgumentException(
+ "Duplicate task code: " + taskDefinition.getCode() + "
in the workflow");
+ }
+ taskDefinitionCodeMap.put(taskDefinition.getCode(),
taskDefinition);
+ }
addTaskNodes(taskDefinitions);
addTaskEdge(workflowTaskRelations);
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 1ae906aa3f..80eddee923 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -124,6 +124,30 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test start a workflow with two fake task(A) has the same
name")
+ public void testStartWorkflow_contains_duplicateTaskName() {
+ final String yaml = "/it/start/workflow_with_duplicate_task_name.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ final Integer workflowInstanceId =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
+ .isEqualTo(WorkflowExecutionStatus.FAILURE);
+
assertThat(repository.queryTaskInstance(workflowInstanceId)).isEmpty();
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
@Test
@DisplayName("Test start a workflow with one fake task(A) using serial
wait strategy")
public void testStartWorkflow_with_serialWaitStrategy() {
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_duplicate_task_name.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_duplicate_task_name.yaml
new file mode 100644
index 0000000000..2597c24e07
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_duplicate_task_name.yaml
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_duplicate_task_name
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with two parallel failed tasks
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: A
+ code: 2
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 2
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
index fa7a0a4f14..9874548615 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
-import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.commons.lang3.StringUtils;
@@ -61,14 +60,11 @@ public class CommandServiceImpl implements CommandService {
@Autowired
private ScheduleMapper scheduleMapper;
- @Autowired
- private WorkflowDefinitionMapper processDefineMapper;
-
@Override
public void moveToErrorCommand(Command command, String message) {
- ErrorCommand errorCommand = new ErrorCommand(command, message);
- this.errorCommandMapper.insert(errorCommand);
- this.commandMapper.deleteById(command.getId());
+ final ErrorCommand errorCommand = new ErrorCommand(command, message);
+ errorCommandMapper.insert(errorCommand);
+ commandMapper.deleteById(command.getId());
}
@Override