This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b3b8c0784d Fix kill dynamic task doesn't kill the wait to run workflow
instances (#15896)
b3b8c0784d is described below
commit b3b8c0784dfc31b516b13601a311bc78550bf931
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Apr 25 11:05:51 2024 +0800
Fix kill dynamic task doesn't kill the wait to run workflow instances
(#15896)
---
.../dolphinscheduler/dao/mapper/CommandMapper.java | 9 +++-
.../dolphinscheduler/dao/mapper/CommandMapper.xml | 7 +++
.../dao/mapper/CommandMapperTest.java | 11 ++++-
.../runner/task/dynamic/DynamicLogicTask.java | 51 +++++++++++++++++++++-
4 files changed, 74 insertions(+), 4 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
index a8490cbef7..9fb6643227 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
@@ -34,8 +34,9 @@ public interface CommandMapper extends BaseMapper<Command> {
/**
* count command state
- * @param startTime startTime
- * @param endTime endTime
+ *
+ * @param startTime startTime
+ * @param endTime endTime
* @param projectCodes projectCodes
* @return CommandCount list
*/
@@ -46,15 +47,19 @@ public interface CommandMapper extends BaseMapper<Command> {
/**
* query command page
+ *
* @return
*/
List<Command> queryCommandPage(@Param("limit") int limit, @Param("offset")
int offset);
/**
* query command page by slot
+ *
* @return command list
*/
List<Command> queryCommandPageBySlot(@Param("limit") int limit,
@Param("masterCount") int masterCount,
@Param("thisMasterSlot") int
thisMasterSlot);
+
+ void deleteByWorkflowInstanceIds(@Param("workflowInstanceIds")
List<Integer> workflowInstanceIds);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index c950f66413..56db890ef0 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -47,4 +47,11 @@
order by process_instance_priority, id asc
limit #{limit}
</select>
+ <delete id="deleteByWorkflowInstanceIds" >
+ delete from t_ds_command
+ where process_instance_id in
+ <foreach collection="workflowInstanceIds" index="index" item="i"
open="(" close=")" separator=",">
+ #{i}
+ </foreach>
+ </delete>
</mapper>
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
index 3d45477d85..2d367e46e4 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.dao.mapper;
+import static com.google.common.truth.Truth.assertThat;
+
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -173,6 +175,14 @@ public class CommandMapperTest extends BaseDaoTest {
toTestQueryCommandPageBySlot(masterCount, thisMasterSlot);
}
+ @Test
+ void deleteByWorkflowInstanceIds() {
+ Command command = createCommand();
+ assertThat(commandMapper.selectList(null)).isNotEmpty();
+
commandMapper.deleteByWorkflowInstanceIds(Lists.newArrayList(command.getProcessInstanceId()));
+ assertThat(commandMapper.selectList(null)).isEmpty();
+ }
+
private boolean toTestQueryCommandPageBySlot(int masterCount, int
thisMasterSlot) {
Command command = createCommand();
Integer id = command.getId();
@@ -280,5 +290,4 @@ public class CommandMapperTest extends BaseDaoTest {
return command;
}
-
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
index 3baa10b343..12cae5c53e 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
@@ -252,12 +252,61 @@ public class DynamicLogicTask extends
BaseAsyncLogicTask<DynamicParameters> {
@Override
public void kill() {
try {
-
changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus.READY_STOP);
+ doKillSubWorkflowInstances();
} catch (MasterTaskExecuteException e) {
log.error("kill {} error", taskInstance.getName(), e);
}
}
+ private void doKillSubWorkflowInstances() throws
MasterTaskExecuteException {
+ List<ProcessInstance> existsSubProcessInstanceList =
+
subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(),
taskInstance.getTaskCode());
+ if (CollectionUtils.isEmpty(existsSubProcessInstanceList)) {
+ return;
+ }
+
+ commandMapper.deleteByWorkflowInstanceIds(
+
existsSubProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
+
+ List<ProcessInstance> runningSubProcessInstanceList =
+
subWorkflowService.filterRunningProcessInstances(existsSubProcessInstanceList);
+ doKillRunningSubWorkflowInstances(runningSubProcessInstanceList);
+
+ List<ProcessInstance> waitToRunProcessInstances =
+
subWorkflowService.filterWaitToRunProcessInstances(existsSubProcessInstanceList);
+ doKillWaitToRunSubWorkflowInstances(waitToRunProcessInstances);
+
+ this.haveBeenCanceled = true;
+ }
+
+ private void doKillRunningSubWorkflowInstances(List<ProcessInstance>
runningSubProcessInstanceList) throws MasterTaskExecuteException {
+ for (ProcessInstance subProcessInstance :
runningSubProcessInstanceList) {
+ subProcessInstance.setState(WorkflowExecutionStatus.READY_STOP);
+ processInstanceDao.updateById(subProcessInstance);
+ if (subProcessInstance.getState().isFinished()) {
+ log.info("The process instance [{}] is finished, no need to
stop", subProcessInstance.getId());
+ continue;
+ }
+ try {
+ sendToSubProcess(taskExecutionContext, subProcessInstance);
+ log.info("Success send [{}] request to SubWorkflow's master:
{}", WorkflowExecutionStatus.READY_STOP,
+ subProcessInstance.getHost());
+ } catch (Exception e) {
+ throw new MasterTaskExecuteException(
+ String.format("Send stop request to SubWorkflow's
master: %s failed",
+ subProcessInstance.getHost()),
+ e);
+ }
+ }
+ }
+
+ private void doKillWaitToRunSubWorkflowInstances(List<ProcessInstance>
waitToRunWorkflowInstances) {
+ for (ProcessInstance subProcessInstance : waitToRunWorkflowInstances) {
+ subProcessInstance.setState(WorkflowExecutionStatus.STOP);
+ processInstanceDao.updateById(subProcessInstance);
+ }
+ }
+
private void
changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus stopStatus)
throws MasterTaskExecuteException {
this.haveBeenCanceled = true;
List<ProcessInstance> existsSubProcessInstanceList =