This is an automated email from the ASF dual-hosted git repository.
chufenggao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f5801a0e1c Fix task group might be rob by multiple task (#13115)
f5801a0e1c is described below
commit f5801a0e1c8bebc900d75aab8ca3314bc0410c6b
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sun Dec 11 22:58:21 2022 +0800
Fix task group might be rob by multiple task (#13115)
---
.../api/service/impl/TaskGroupServiceImpl.java | 15 ++--
.../api/service/TaskGroupServiceTest.java | 11 ++-
.../dolphinscheduler/dao/entity/TaskGroup.java | 25 ++-----
.../dao/entity/TaskGroupQueue.java | 20 ++----
.../dao/mapper/TaskGroupMapper.java | 12 ++--
.../dao/mapper/TaskGroupMapper.xml | 10 +--
.../service/process/ProcessService.java | 7 +-
.../service/process/ProcessServiceImpl.java | 79 +++++++++++++---------
8 files changed, 94 insertions(+), 85 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
index 9cf7024aec..1d722e1fd9 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
@@ -112,11 +112,18 @@ public class TaskGroupServiceImpl extends BaseServiceImpl
implements TaskGroupSe
putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result;
}
- TaskGroup taskGroup = new TaskGroup(name, projectCode, description,
- groupSize, loginUser.getId(), Flag.YES.getCode());
+ Date now = new Date();
+ TaskGroup taskGroup = TaskGroup.builder()
+ .name(name)
+ .projectCode(projectCode)
+ .description(description)
+ .groupSize(groupSize)
+ .userId(loginUser.getId())
+ .status(Flag.YES.getCode())
+ .createTime(now)
+ .updateTime(now)
+ .build();
- taskGroup.setCreateTime(new Date());
- taskGroup.setUpdateTime(new Date());
if (taskGroupMapper.insert(taskGroup) > 0) {
permissionPostHandle(AuthorizationType.TASK_GROUP,
loginUser.getId(),
Collections.singletonList(taskGroup.getId()), logger);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
index 7d475a621e..1f8f2abb4a 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
@@ -103,8 +103,15 @@ public class TaskGroupServiceTest {
}
private TaskGroup getTaskGroup() {
- TaskGroup taskGroup = new TaskGroup(taskGroupName, 0, taskGroupDesc,
- 100, 1, 1);
+ TaskGroup taskGroup = TaskGroup.builder()
+ .name(taskGroupName)
+ .projectCode(0)
+ .description(taskGroupDesc)
+ .groupSize(100)
+ .userId(1)
+ .status(Flag.YES.getCode())
+ .build();
+
return taskGroup;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
index dcc9ad3147..0d42d8480f 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
@@ -20,13 +20,19 @@ package org.apache.dolphinscheduler.dao.entity;
import java.io.Serializable;
import java.util.Date;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
@TableName("t_ds_task_group")
public class TaskGroup implements Serializable {
@@ -70,23 +76,4 @@ public class TaskGroup implements Serializable {
*/
private long projectCode;
- public TaskGroup(String name, long projectCode, String description, int
groupSize, int userId, int status) {
- this.name = name;
- this.projectCode = projectCode;
- this.description = description;
- this.groupSize = groupSize;
- this.userId = userId;
- this.status = status;
- init();
-
- }
-
- public TaskGroup() {
- init();
- }
-
- public void init() {
- this.status = 1;
- this.useSize = 0;
- }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
index be674856c0..9302d141e3 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
@@ -22,7 +22,10 @@ import
org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import java.io.Serializable;
import java.util.Date;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
@@ -30,6 +33,9 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
@TableName("t_ds_task_group_queue")
public class TaskGroupQueue implements Serializable {
@@ -95,18 +101,4 @@ public class TaskGroupQueue implements Serializable {
* update time
*/
private Date updateTime;
-
- public TaskGroupQueue() {
-
- }
-
- public TaskGroupQueue(int taskId, String taskName, int groupId, int
processId, int priority,
- TaskGroupQueueStatus status) {
- this.taskId = taskId;
- this.taskName = taskName;
- this.groupId = groupId;
- this.processId = processId;
- this.priority = priority;
- this.status = status;
- }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
index 819e70dcfb..9893726873 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
@@ -35,14 +35,10 @@ import
com.baomidou.mybatisplus.extension.plugins.pagination.Page;
*/
public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
- /**
- * compard and set to update table of task group
- *
- * @param id primary key
- * @return affected rows
- */
- int updateTaskGroupResource(@Param("id") int id, @Param("queueId") int
queueId,
- @Param("queueStatus") int queueStatus);
+ int robTaskGroupResource(@Param("id") int id,
+ @Param("currentUseSize") int currentUseSize,
+ @Param("queueId") int queueId,
+ @Param("queueStatus") int queueStatus);
/**
* update table of task group
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
index 7d3fa96b11..969077309e 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
@@ -74,11 +74,13 @@
</select>
<!--modify data by id-->
- <update id="updateTaskGroupResource">
+ <update id="robTaskGroupResource">
update t_ds_task_group
- set use_size = use_size+1
- where id = #{id} and use_size < group_size and
- (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and
status = #{queueStatus} ) = 1
+ set use_size = use_size + 1
+ where id = #{id}
+ and use_size < group_size
+ and use_size = #{currentUseSize}
+ and (select count(1) FROM t_ds_task_group_queue where id =
#{queueId} and status = #{queueStatus}) = 1
</update>
<!--modify data by id-->
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index bca4f0f7dc..e321ce5441 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -219,8 +219,11 @@ public interface ProcessService {
void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status);
TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
- String taskName, Integer groupId,
- Integer processId, Integer
priority, TaskGroupQueueStatus status);
+ String taskName,
+ Integer groupId,
+ Integer processId,
+ Integer priority,
+ TaskGroupQueueStatus status);
int updateTaskGroupQueueStatus(Integer taskId, int status);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 55f6bde34c..8c171185d2 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -2424,7 +2424,12 @@ public class ProcessServiceImpl implements
ProcessService {
// Create a waiting taskGroupQueue, after acquire resource, we can
update the status to ACQUIRE_SUCCESS
TaskGroupQueue taskGroupQueue =
this.taskGroupQueueMapper.queryByTaskId(taskId);
if (taskGroupQueue == null) {
- taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName,
groupId, processId, priority,
+ taskGroupQueue = insertIntoTaskGroupQueue(
+ taskId,
+ taskName,
+ groupId,
+ processId,
+ priority,
TaskGroupQueueStatus.WAIT_QUEUE);
} else {
logger.info("The task queue is already exist, taskId: {}", taskId);
@@ -2436,7 +2441,9 @@ public class ProcessServiceImpl implements ProcessService
{
this.taskGroupQueueMapper.updateById(taskGroupQueue);
}
// check if there already exist higher priority tasks
- List<TaskGroupQueue> highPriorityTasks =
taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority,
+ List<TaskGroupQueue> highPriorityTasks =
taskGroupQueueMapper.queryHighPriorityTasks(
+ groupId,
+ priority,
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
return false;
@@ -2457,20 +2464,27 @@ public class ProcessServiceImpl implements
ProcessService {
*/
@Override
public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
- TaskGroup taskGroup =
taskGroupMapper.selectById(taskGroupQueue.getGroupId());
- int affectedCount =
taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),
- taskGroupQueue.getId(),
- TaskGroupQueueStatus.WAIT_QUEUE.getCode());
- if (affectedCount > 0) {
- logger.info("Success rob taskGroup, taskInstanceId: {},
taskGroupId: {}", taskGroupQueue.getTaskId(),
- taskGroupQueue.getId());
- taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
- this.taskGroupQueueMapper.updateById(taskGroupQueue);
- this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(),
taskGroupQueue.getId());
- return true;
+ // set the default max size to avoid dead loop
+ for (int i = 0; i < 10; i++) {
+ TaskGroup taskGroup =
taskGroupMapper.selectById(taskGroupQueue.getGroupId());
+ if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) {
+ logger.info("The current task Group is full, taskGroup: {}",
taskGroup);
+ return false;
+ }
+ int affectedCount =
taskGroupMapper.robTaskGroupResource(taskGroup.getId(),
+ taskGroup.getUseSize(),
+ taskGroupQueue.getId(),
+ TaskGroupQueueStatus.WAIT_QUEUE.getCode());
+ if (affectedCount > 0) {
+ logger.info("Success rob taskGroup, taskInstanceId: {},
taskGroupId: {}", taskGroupQueue.getTaskId(),
+ taskGroupQueue.getId());
+ taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
+ this.taskGroupQueueMapper.updateById(taskGroupQueue);
+ this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(),
taskGroupQueue.getId());
+ return true;
+ }
}
- logger.info("Failed to rob taskGroup, taskInstanceId: {}, taskGroupId:
{}", taskGroupQueue.getTaskId(),
- taskGroupQueue.getId());
+ logger.info("Failed to rob taskGroup, taskGroupQueue: {}",
taskGroupQueue);
return false;
}
@@ -2551,23 +2565,24 @@ public class ProcessServiceImpl implements
ProcessService {
taskGroupQueueMapper.updateById(taskGroupQueue);
}
- /**
- * insert into task group queue
- *
- * @param taskId task id
- * @param taskName task name
- * @param groupId group id
- * @param processId process id
- * @param priority priority
- * @return inserted task group queue
- */
- @Override
- public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
- String taskName, Integer
groupId,
- Integer processId, Integer
priority, TaskGroupQueueStatus status) {
- TaskGroupQueue taskGroupQueue = new TaskGroupQueue(taskId, taskName,
groupId, processId, priority, status);
- taskGroupQueue.setCreateTime(new Date());
- taskGroupQueue.setUpdateTime(new Date());
+ @Override
+ public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskInstanceId,
+ String taskName,
+ Integer taskGroupId,
+ Integer workflowInstanceId,
+ Integer taskGroupPriority,
+ TaskGroupQueueStatus
status) {
+ Date now = new Date();
+ TaskGroupQueue taskGroupQueue = TaskGroupQueue.builder()
+ .taskId(taskInstanceId)
+ .taskName(taskName)
+ .groupId(taskGroupId)
+ .processId(workflowInstanceId)
+ .priority(taskGroupPriority)
+ .status(status)
+ .createTime(now)
+ .updateTime(now)
+ .build();
taskGroupQueueMapper.insert(taskGroupQueue);
return taskGroupQueue;
}