This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 98dfd64444 Clear the task group data when delete a
project/workflowInstance (#13383)
98dfd64444 is described below
commit 98dfd6444433ac1b3e909760ac70f5b9bb14cf20
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Jan 12 21:51:51 2023 +0800
Clear the task group data when delete a project/workflowInstance (#13383)
---
.../api/service/TaskGroupQueueService.java | 7 +++++
.../api/service/TaskGroupService.java | 2 ++
.../api/service/impl/ExecutorServiceImpl.java | 2 ++
.../api/service/impl/ProjectServiceImpl.java | 9 +++++++
.../service/impl/TaskGroupQueueServiceImpl.java | 23 ++++++++++++++++
.../api/service/impl/TaskGroupServiceImpl.java | 19 ++++++++++---
.../api/service/impl/TaskInstanceServiceImpl.java | 6 +++++
.../api/service/ProjectServiceTest.java | 3 +++
.../dolphinscheduler/dao/entity/TaskGroup.java | 2 +-
.../dao/mapper/TaskGroupMapper.java | 3 +++
.../dao/mapper/TaskGroupQueueMapper.java | 6 +++++
.../dao/mapper/TaskGroupMapper.xml | 9 ++++++-
.../dao/mapper/TaskGroupQueueMapper.xml | 23 ++++++++++++++++
.../service/process/ProcessServiceImpl.java | 31 +++++++++++++---------
14 files changed, 126 insertions(+), 19 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java
index a0d05c5b8d..72aa498241 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.dao.entity.User;
+import java.util.List;
import java.util.Map;
/**
@@ -68,7 +69,13 @@ public interface TaskGroupQueueService {
*/
boolean deleteByTaskId(int taskId);
+ void deleteByTaskInstanceIds(List<Integer> taskInstanceIds);
+
+ void deleteByWorkflowInstanceId(Integer workflowInstanceId);
+
void forceStartTask(int queueId, int forceStart);
void modifyPriority(Integer queueId, Integer priority);
+
+ void deleteByTaskGroupIds(List<Integer> taskGroupIds);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java
index 3987109733..db909da158 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java
@@ -139,4 +139,6 @@ public interface TaskGroupService {
Map<String, Object> forceStartTask(User loginUser, int taskId);
Map<String, Object> modifyPriority(User loginUser, Integer queueId,
Integer priority);
+
+ void deleteTaskGroupByProjectCode(long projectCode);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index d254b31481..2761b67ca3 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -104,6 +104,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -135,6 +136,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
@Autowired
private ProcessInstanceMapper processInstanceMapper;
+ @Lazy()
@Autowired
private ProcessService processService;
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
index 7450679a72..2279edff6e 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
@@ -24,6 +24,7 @@ import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.TaskGroupService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
@@ -60,6 +61,7 @@ import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -74,6 +76,10 @@ public class ProjectServiceImpl extends BaseServiceImpl
implements ProjectServic
private static final Logger logger =
LoggerFactory.getLogger(ProjectServiceImpl.class);
+ @Lazy
+ @Autowired
+ private TaskGroupService taskGroupService;
+
@Autowired
private ProjectMapper projectMapper;
@@ -446,6 +452,9 @@ public class ProjectServiceImpl extends BaseServiceImpl
implements ProjectServic
putMsg(result, Status.DELETE_PROJECT_ERROR_DEFINES_NOT_NULL);
return result;
}
+ // delete the task group
+ taskGroupService.deleteTaskGroupByProjectCode(project.getCode());
+
int delete = projectMapper.deleteById(project.getId());
if (delete > 0) {
logger.info("Project is deleted and id is :{}.", project.getId());
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java
index d2f990fb3b..89f2c4f133 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java
@@ -28,6 +28,8 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
+import org.apache.commons.collections4.CollectionUtils;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -145,6 +147,19 @@ public class TaskGroupQueueServiceImpl extends
BaseServiceImpl implements TaskGr
return taskGroupQueueMapper.deleteByTaskId(taskId) == 1;
}
+ @Override
+ public void deleteByTaskInstanceIds(List<Integer> taskInstanceIds) {
+ if (CollectionUtils.isEmpty(taskInstanceIds)) {
+ return;
+ }
+ taskGroupQueueMapper.deleteByTaskInstanceIds(taskInstanceIds);
+ }
+
+ @Override
+ public void deleteByWorkflowInstanceId(Integer workflowInstanceId) {
+ taskGroupQueueMapper.deleteByWorkflowInstanceId(workflowInstanceId);
+ }
+
@Override
public void forceStartTask(int queueId, int forceStart) {
taskGroupQueueMapper.updateForceStart(queueId, forceStart);
@@ -154,4 +169,12 @@ public class TaskGroupQueueServiceImpl extends
BaseServiceImpl implements TaskGr
public void modifyPriority(Integer queueId, Integer priority) {
taskGroupQueueMapper.modifyPriority(queueId, priority);
}
+
+ @Override
+ public void deleteByTaskGroupIds(List<Integer> taskGroupIds) {
+ if (CollectionUtils.isEmpty(taskGroupIds)) {
+ return;
+ }
+ taskGroupQueueMapper.deleteByTaskGroupIds(taskGroupIds);
+ }
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
index 1d722e1fd9..232268970a 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
@@ -29,8 +29,8 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
-import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
@@ -40,6 +40,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,9 +64,6 @@ public class TaskGroupServiceImpl extends BaseServiceImpl
implements TaskGroupSe
@Autowired
private TaskGroupQueueService taskGroupQueueService;
- @Autowired
- private ProcessService processService;
-
@Autowired
private ExecutorService executorService;
@@ -429,4 +427,17 @@ public class TaskGroupServiceImpl extends BaseServiceImpl
implements TaskGroupSe
putMsg(result, Status.SUCCESS);
return result;
}
+
+ @Override
+ public void deleteTaskGroupByProjectCode(long projectCode) {
+ List<TaskGroup> taskGroups =
taskGroupMapper.selectByProjectCode(projectCode);
+ if (CollectionUtils.isEmpty(taskGroups)) {
+ return;
+ }
+ List<Integer> taskGroupIds = taskGroups.stream()
+ .map(TaskGroup::getId)
+ .collect(Collectors.toList());
+ taskGroupQueueService.deleteByTaskGroupIds(taskGroupIds);
+ taskGroupMapper.deleteBatchIds(taskGroupIds);
+ }
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index d8e8de5b90..0d74489f4b 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -24,6 +24,7 @@ import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import
org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -107,6 +108,9 @@ public class TaskInstanceServiceImpl extends
BaseServiceImpl implements TaskInst
@Autowired
private DqExecuteResultDao dqExecuteResultDao;
+ @Autowired
+ private TaskGroupQueueService taskGroupQueueService;
+
/**
* query task list by project, process instance, task name, task start
time, task end time, task status, keyword paging
*
@@ -384,7 +388,9 @@ public class TaskInstanceServiceImpl extends
BaseServiceImpl implements TaskInst
}
}
}
+
dqExecuteResultDao.deleteByWorkflowInstanceId(workflowInstanceId);
+ taskGroupQueueService.deleteByWorkflowInstanceId(workflowInstanceId);
taskInstanceDao.deleteByWorkflowInstanceId(workflowInstanceId);
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java
index 2eaeebc7a5..6be492e060 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java
@@ -96,6 +96,9 @@ public class ProjectServiceTest {
@Mock
private ResourcePermissionCheckService resourcePermissionCheckService;
+ @Mock
+ private TaskGroupService taskGroupService;
+
private String projectName = "ProjectServiceTest";
private String userName = "ProjectServiceTest";
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
index 0d42d8480f..3aa49f8e17 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
@@ -72,7 +72,7 @@ public class TaskGroup implements Serializable {
*/
private Date updateTime;
/**
- * project Id
+ * project code
*/
private long projectCode;
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
index 9893726873..7cb79c9e58 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
@@ -82,8 +82,11 @@ public interface TaskGroupMapper extends
BaseMapper<TaskGroup> {
/**
* listAuthorizedResource
+ *
* @param userId
* @return
*/
List<TaskGroup> listAuthorizedResource(@Param("userId") int userId);
+
+ List<TaskGroup> selectByProjectCode(@Param("projectCode") long
projectCode);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
index 9826b3e6d5..437a22efb7 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
@@ -99,4 +99,10 @@ public interface TaskGroupQueueMapper extends
BaseMapper<TaskGroupQueue> {
@Param("status") Integer status,
@Param("groupId") int groupId,
@Param("projects") List<Project> projects);
+
+ void deleteByTaskInstanceIds(@Param("taskInstanceIds") List<Integer>
taskInstanceIds);
+
+ void deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer
workflowInstanceId);
+
+ void deleteByTaskGroupIds(@Param("taskGroupIds") List<Integer>
taskGroupIds);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
index 969077309e..6e387afb60 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
@@ -69,10 +69,17 @@
#{i}
</foreach>
</if>
- and project_code in ( #{projectCode} , 0)
+ and project_code in ( #{projectCode} , 0)
order by update_time desc
</select>
+ <select id="selectByProjectCode"
resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
+ select
+ <include refid="baseSql"/>
+ from t_ds_task_group
+ where project_code = #{projectCode}
+ </select>
+
<!--modify data by id-->
<update id="robTaskGroupResource">
update t_ds_task_group
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
index 5eacbea9b8..d2c43f18a7 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
@@ -157,4 +157,27 @@
order by queue.update_time desc
</select>
+ <delete id="deleteByTaskIds">
+ delete from t_ds_task_group_queue
+ where task_id in
+ <foreach collection="taskInstanceIds" index="index" item="i" open="("
separator="," close=")">
+ #{i}
+ </foreach>
+ </delete>
+
+ <delete id="deleteByWorkflowInstanceId">
+ delete
+ from t_ds_task_group_queue
+ where process_id = #{workflowInstanceId}
+ </delete>
+
+ <delete id="deleteByTaskGroupIds">
+ delete
+ from t_ds_task_group_queue
+ where group_id in
+ <foreach collection="taskGroupIds" index="index" item="i" open="("
separator="," close=")">
+ #{i}
+ </foreach>
+ </delete>
+
</mapper>
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 865692a38e..359a36cde5 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -2433,8 +2433,12 @@ public class ProcessServiceImpl implements
ProcessService {
* @param taskId task id
*/
@Override
- public boolean acquireTaskGroup(int taskId, String taskName, int groupId,
int processId, int priority) {
- TaskGroup taskGroup = taskGroupMapper.selectById(groupId);
+ public boolean acquireTaskGroup(int taskInstanceId,
+ String taskName,
+ int taskGroupId,
+ int workflowInstanceId,
+ int taskGroupPriority) {
+ TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupId);
if (taskGroup == null) {
// we don't throw exception here, to avoid the task group has been
deleted during workflow running
return true;
@@ -2444,17 +2448,17 @@ public class ProcessServiceImpl implements
ProcessService {
return true;
}
// Create a waiting taskGroupQueue, after acquire resource, we can
update the status to ACQUIRE_SUCCESS
- TaskGroupQueue taskGroupQueue =
this.taskGroupQueueMapper.queryByTaskId(taskId);
+ TaskGroupQueue taskGroupQueue =
this.taskGroupQueueMapper.queryByTaskId(taskInstanceId);
if (taskGroupQueue == null) {
taskGroupQueue = insertIntoTaskGroupQueue(
- taskId,
+ taskInstanceId,
taskName,
- groupId,
- processId,
- priority,
+ taskGroupId,
+ workflowInstanceId,
+ taskGroupPriority,
TaskGroupQueueStatus.WAIT_QUEUE);
} else {
- logger.info("The task queue is already exist, taskId: {}", taskId);
+ logger.info("The task queue is already exist, taskId: {}",
taskInstanceId);
if (taskGroupQueue.getStatus() ==
TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
return true;
}
@@ -2464,19 +2468,19 @@ public class ProcessServiceImpl implements
ProcessService {
}
// check if there already exist higher priority tasks
List<TaskGroupQueue> highPriorityTasks =
taskGroupQueueMapper.queryHighPriorityTasks(
- groupId,
- priority,
+ taskGroupId,
+ taskGroupPriority,
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
return false;
}
// try to get taskGroup
- int count = taskGroupMapper.selectAvailableCountById(groupId);
+ int count = taskGroupMapper.selectAvailableCountById(taskGroupId);
if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
- logger.info("Success acquire taskGroup, taskInstanceId: {},
taskGroupId: {}", taskId, groupId);
+ logger.info("Success acquire taskGroup, taskInstanceId: {},
taskGroupId: {}", taskInstanceId, taskGroupId);
return true;
}
- logger.info("Failed to acquire taskGroup, taskInstanceId: {},
taskGroupId: {}", taskId, groupId);
+ logger.info("Failed to acquire taskGroup, taskInstanceId: {},
taskGroupId: {}", taskInstanceId, taskGroupId);
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(),
taskGroupQueue.getId());
return false;
}
@@ -2608,6 +2612,7 @@ public class ProcessServiceImpl implements ProcessService
{
.processId(workflowInstanceId)
.priority(taskGroupPriority)
.status(status)
+ .inQueue(Flag.NO.getCode())
.createTime(now)
.updateTime(now)
.build();