This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch 3.1.3-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.3-prepare by this push:
new 1796577b53 Fix task group might be rob by multiple task #13115
1796577b53 is described below
commit 1796577b53ec6ef5e6e454b174167f1a69c2711e
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sun Dec 11 22:58:21 2022 +0800
Fix task group might be rob by multiple task #13115
---
.../api/service/impl/TaskGroupServiceImpl.java | 15 +++--
.../api/service/TaskGroupServiceTest.java | 10 ++-
.../dolphinscheduler/dao/entity/TaskGroup.java | 25 ++-----
.../dao/entity/TaskGroupQueue.java | 20 ++----
.../dao/mapper/TaskGroupMapper.java | 12 ++--
.../dao/mapper/TaskGroupMapper.xml | 10 +--
.../service/process/ProcessService.java | 7 +-
.../service/process/ProcessServiceImpl.java | 77 +++++++++++++---------
8 files changed, 92 insertions(+), 84 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
index f8da83a92b..61d4361214 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
@@ -108,11 +108,18 @@ public class TaskGroupServiceImpl extends BaseServiceImpl
implements TaskGroupSe
putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result;
}
- TaskGroup taskGroup = new TaskGroup(name, projectCode, description,
- groupSize, loginUser.getId(), Flag.YES.getCode());
+ Date now = new Date();
+ TaskGroup taskGroup = TaskGroup.builder()
+ .name(name)
+ .projectCode(projectCode)
+ .description(description)
+ .groupSize(groupSize)
+ .userId(loginUser.getId())
+ .status(Flag.YES.getCode())
+ .createTime(now)
+ .updateTime(now)
+ .build();
- taskGroup.setCreateTime(new Date());
- taskGroup.setUpdateTime(new Date());
if (taskGroupMapper.insert(taskGroup) > 0) {
permissionPostHandle(AuthorizationType.TASK_GROUP,
loginUser.getId(),
Collections.singletonList(taskGroup.getId()), logger);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
index 76c16689b4..b357fac7f9 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
@@ -100,8 +100,14 @@ public class TaskGroupServiceTest {
}
private TaskGroup getTaskGroup() {
- TaskGroup taskGroup = new TaskGroup(taskGroupName,0, taskGroupDesc,
- 100, 1,1);
+ TaskGroup taskGroup = TaskGroup.builder()
+ .name(taskGroupName)
+ .projectCode(0)
+ .description(taskGroupDesc)
+ .groupSize(100)
+ .userId(1)
+ .status(Flag.YES.getCode())
+ .build();
return taskGroup;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
index dcc9ad3147..0d42d8480f 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
@@ -20,13 +20,19 @@ package org.apache.dolphinscheduler.dao.entity;
import java.io.Serializable;
import java.util.Date;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
@TableName("t_ds_task_group")
public class TaskGroup implements Serializable {
@@ -70,23 +76,4 @@ public class TaskGroup implements Serializable {
*/
private long projectCode;
- public TaskGroup(String name, long projectCode, String description, int
groupSize, int userId, int status) {
- this.name = name;
- this.projectCode = projectCode;
- this.description = description;
- this.groupSize = groupSize;
- this.userId = userId;
- this.status = status;
- init();
-
- }
-
- public TaskGroup() {
- init();
- }
-
- public void init() {
- this.status = 1;
- this.useSize = 0;
- }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
index be674856c0..9302d141e3 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
@@ -22,7 +22,10 @@ import
org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import java.io.Serializable;
import java.util.Date;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
@@ -30,6 +33,9 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
@TableName("t_ds_task_group_queue")
public class TaskGroupQueue implements Serializable {
@@ -95,18 +101,4 @@ public class TaskGroupQueue implements Serializable {
* update time
*/
private Date updateTime;
-
- public TaskGroupQueue() {
-
- }
-
- public TaskGroupQueue(int taskId, String taskName, int groupId, int
processId, int priority,
- TaskGroupQueueStatus status) {
- this.taskId = taskId;
- this.taskName = taskName;
- this.groupId = groupId;
- this.processId = processId;
- this.priority = priority;
- this.status = status;
- }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
index 05c99194ce..e21f701bbe 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
@@ -35,14 +35,10 @@ import java.util.List;
*/
public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
- /**
- * compard and set to update table of task group
- *
- * @param id primary key
- * @return affected rows
- */
- int updateTaskGroupResource(@Param("id") int id, @Param("queueId") int
queueId,
- @Param("queueStatus") int queueStatus);
+ int robTaskGroupResource(@Param("id") int id,
+ @Param("currentUseSize") int currentUseSize,
+ @Param("queueId") int queueId,
+ @Param("queueStatus") int queueStatus);
/**
* update table of task group
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
index 7d3fa96b11..969077309e 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
@@ -74,11 +74,13 @@
</select>
<!--modify data by id-->
- <update id="updateTaskGroupResource">
+ <update id="robTaskGroupResource">
update t_ds_task_group
- set use_size = use_size+1
- where id = #{id} and use_size < group_size and
- (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and
status = #{queueStatus} ) = 1
+ set use_size = use_size + 1
+ where id = #{id}
+ and use_size < group_size
+ and use_size = #{currentUseSize}
+ and (select count(1) FROM t_ds_task_group_queue where id =
#{queueId} and status = #{queueStatus}) = 1
</update>
<!--modify data by id-->
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 7b10126188..33aa1a5047 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -281,8 +281,11 @@ public interface ProcessService {
void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status);
TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
- String taskName, Integer groupId,
- Integer processId, Integer
priority, TaskGroupQueueStatus status);
+ String taskName,
+ Integer groupId,
+ Integer processId,
+ Integer priority,
+ TaskGroupQueueStatus status);
int updateTaskGroupQueueStatus(Integer taskId, int status);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index f8621c4afe..e580f1f74d 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -2944,7 +2944,12 @@ public class ProcessServiceImpl implements
ProcessService {
// Create a waiting taskGroupQueue, after acquire resource, we can
update the status to ACQUIRE_SUCCESS
TaskGroupQueue taskGroupQueue =
this.taskGroupQueueMapper.queryByTaskId(taskId);
if (taskGroupQueue == null) {
- taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName,
groupId, processId, priority,
+ taskGroupQueue = insertIntoTaskGroupQueue(
+ taskId,
+ taskName,
+ groupId,
+ processId,
+ priority,
TaskGroupQueueStatus.WAIT_QUEUE);
} else {
logger.info("The task queue is already exist, taskId: {}", taskId);
@@ -2956,7 +2961,9 @@ public class ProcessServiceImpl implements ProcessService
{
this.taskGroupQueueMapper.updateById(taskGroupQueue);
}
// check if there already exist higher priority tasks
- List<TaskGroupQueue> highPriorityTasks =
taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority,
+ List<TaskGroupQueue> highPriorityTasks =
taskGroupQueueMapper.queryHighPriorityTasks(
+ groupId,
+ priority,
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
return false;
@@ -2977,20 +2984,27 @@ public class ProcessServiceImpl implements
ProcessService {
*/
@Override
public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
- TaskGroup taskGroup =
taskGroupMapper.selectById(taskGroupQueue.getGroupId());
- int affectedCount =
taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),
- taskGroupQueue.getId(),
- TaskGroupQueueStatus.WAIT_QUEUE.getCode());
- if (affectedCount > 0) {
- logger.info("Success rob taskGroup, taskInstanceId: {},
taskGroupId: {}", taskGroupQueue.getTaskId(),
- taskGroupQueue.getId());
- taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
- this.taskGroupQueueMapper.updateById(taskGroupQueue);
- this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(),
taskGroupQueue.getId());
- return true;
+ // set the default max size to avoid dead loop
+ for (int i = 0; i < 10; i++) {
+ TaskGroup taskGroup =
taskGroupMapper.selectById(taskGroupQueue.getGroupId());
+ if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) {
+ logger.info("The current task Group is full, taskGroup: {}",
taskGroup);
+ return false;
+ }
+ int affectedCount =
taskGroupMapper.robTaskGroupResource(taskGroup.getId(),
+ taskGroup.getUseSize(),
+ taskGroupQueue.getId(),
+ TaskGroupQueueStatus.WAIT_QUEUE.getCode());
+ if (affectedCount > 0) {
+ logger.info("Success rob taskGroup, taskInstanceId: {},
taskGroupId: {}", taskGroupQueue.getTaskId(),
+ taskGroupQueue.getId());
+ taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
+ this.taskGroupQueueMapper.updateById(taskGroupQueue);
+ this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(),
taskGroupQueue.getId());
+ return true;
+ }
}
- logger.info("Failed to rob taskGroup, taskInstanceId: {}, taskGroupId:
{}", taskGroupQueue.getTaskId(),
- taskGroupQueue.getId());
+ logger.info("Failed to rob taskGroup, taskGroupQueue: {}",
taskGroupQueue);
return false;
}
@@ -3071,23 +3085,24 @@ public class ProcessServiceImpl implements
ProcessService {
taskGroupQueueMapper.updateById(taskGroupQueue);
}
- /**
- * insert into task group queue
- *
- * @param taskId task id
- * @param taskName task name
- * @param groupId group id
- * @param processId process id
- * @param priority priority
- * @return inserted task group queue
- */
@Override
- public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
- String taskName, Integer
groupId,
- Integer processId, Integer
priority, TaskGroupQueueStatus status) {
- TaskGroupQueue taskGroupQueue = new TaskGroupQueue(taskId, taskName,
groupId, processId, priority, status);
- taskGroupQueue.setCreateTime(new Date());
- taskGroupQueue.setUpdateTime(new Date());
+ public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskInstanceId,
+ String taskName,
+ Integer taskGroupId,
+ Integer workflowInstanceId,
+ Integer taskGroupPriority,
+ TaskGroupQueueStatus
status) {
+ Date now = new Date();
+ TaskGroupQueue taskGroupQueue = TaskGroupQueue.builder()
+ .taskId(taskInstanceId)
+ .taskName(taskName)
+ .groupId(taskGroupId)
+ .processId(workflowInstanceId)
+ .priority(taskGroupPriority)
+ .status(status)
+ .createTime(now)
+ .updateTime(now)
+ .build();
taskGroupQueueMapper.insert(taskGroupQueue);
return taskGroupQueue;
}