This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.3-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.3-prepare by this push:
     new 1796577b53 Fix task group might be rob by multiple task #13115
1796577b53 is described below

commit 1796577b53ec6ef5e6e454b174167f1a69c2711e
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sun Dec 11 22:58:21 2022 +0800

    Fix task group might be rob by multiple task #13115
---
 .../api/service/impl/TaskGroupServiceImpl.java     | 15 +++--
 .../api/service/TaskGroupServiceTest.java          | 10 ++-
 .../dolphinscheduler/dao/entity/TaskGroup.java     | 25 ++-----
 .../dao/entity/TaskGroupQueue.java                 | 20 ++----
 .../dao/mapper/TaskGroupMapper.java                | 12 ++--
 .../dao/mapper/TaskGroupMapper.xml                 | 10 +--
 .../service/process/ProcessService.java            |  7 +-
 .../service/process/ProcessServiceImpl.java        | 77 +++++++++++++---------
 8 files changed, 92 insertions(+), 84 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
index f8da83a92b..61d4361214 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
@@ -108,11 +108,18 @@ public class TaskGroupServiceImpl extends BaseServiceImpl 
implements TaskGroupSe
             putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
             return result;
         }
-        TaskGroup taskGroup = new TaskGroup(name, projectCode, description,
-                groupSize, loginUser.getId(), Flag.YES.getCode());
+        Date now = new Date();
+        TaskGroup taskGroup = TaskGroup.builder()
+                .name(name)
+                .projectCode(projectCode)
+                .description(description)
+                .groupSize(groupSize)
+                .userId(loginUser.getId())
+                .status(Flag.YES.getCode())
+                .createTime(now)
+                .updateTime(now)
+                .build();
 
-        taskGroup.setCreateTime(new Date());
-        taskGroup.setUpdateTime(new Date());
         if (taskGroupMapper.insert(taskGroup) > 0) {
             permissionPostHandle(AuthorizationType.TASK_GROUP, 
loginUser.getId(),
                     Collections.singletonList(taskGroup.getId()), logger);
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
index 76c16689b4..b357fac7f9 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
@@ -100,8 +100,14 @@ public class TaskGroupServiceTest {
     }
 
     private TaskGroup getTaskGroup() {
-        TaskGroup taskGroup = new TaskGroup(taskGroupName,0, taskGroupDesc,
-            100, 1,1);
+        TaskGroup taskGroup = TaskGroup.builder()
+                .name(taskGroupName)
+                .projectCode(0)
+                .description(taskGroupDesc)
+                .groupSize(100)
+                .userId(1)
+                .status(Flag.YES.getCode())
+                .build();
         return taskGroup;
     }
 
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
index dcc9ad3147..0d42d8480f 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
@@ -20,13 +20,19 @@ package org.apache.dolphinscheduler.dao.entity;
 import java.io.Serializable;
 import java.util.Date;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 
 @Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
 @TableName("t_ds_task_group")
 public class TaskGroup implements Serializable {
 
@@ -70,23 +76,4 @@ public class TaskGroup implements Serializable {
      */
     private long projectCode;
 
-    public TaskGroup(String name, long projectCode, String description, int 
groupSize, int userId, int status) {
-        this.name = name;
-        this.projectCode = projectCode;
-        this.description = description;
-        this.groupSize = groupSize;
-        this.userId = userId;
-        this.status = status;
-        init();
-
-    }
-
-    public TaskGroup() {
-        init();
-    }
-
-    public void init() {
-        this.status = 1;
-        this.useSize = 0;
-    }
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
index be674856c0..9302d141e3 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
@@ -22,7 +22,10 @@ import 
org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
 import java.io.Serializable;
 import java.util.Date;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableField;
@@ -30,6 +33,9 @@ import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 
 @Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 @TableName("t_ds_task_group_queue")
 public class TaskGroupQueue implements Serializable {
 
@@ -95,18 +101,4 @@ public class TaskGroupQueue implements Serializable {
      * update time
      */
     private Date updateTime;
-
-    public TaskGroupQueue() {
-
-    }
-
-    public TaskGroupQueue(int taskId, String taskName, int groupId, int 
processId, int priority,
-                          TaskGroupQueueStatus status) {
-        this.taskId = taskId;
-        this.taskName = taskName;
-        this.groupId = groupId;
-        this.processId = processId;
-        this.priority = priority;
-        this.status = status;
-    }
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
index 05c99194ce..e21f701bbe 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
@@ -35,14 +35,10 @@ import java.util.List;
  */
 public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
 
-    /**
-     * compard and set to update table of task group
-     *
-     * @param id primary key
-     * @return affected rows
-     */
-    int updateTaskGroupResource(@Param("id") int id, @Param("queueId") int 
queueId,
-                                @Param("queueStatus") int queueStatus);
+    int robTaskGroupResource(@Param("id") int id,
+                             @Param("currentUseSize") int currentUseSize,
+                             @Param("queueId") int queueId,
+                             @Param("queueStatus") int queueStatus);
 
     /**
      * update table of task group
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
index 7d3fa96b11..969077309e 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
@@ -74,11 +74,13 @@
     </select>
 
     <!--modify data by id-->
-    <update id="updateTaskGroupResource">
+    <update id="robTaskGroupResource">
         update t_ds_task_group
-           set use_size = use_size+1
-        where id = #{id} and use_size  &lt; group_size and
-         (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and 
status = #{queueStatus} ) = 1
+        set use_size = use_size + 1
+        where id = #{id}
+          and use_size &lt; group_size
+          and use_size = #{currentUseSize}
+          and (select count(1) FROM t_ds_task_group_queue where id = 
#{queueId} and status = #{queueStatus}) = 1
     </update>
 
     <!--modify data by id-->
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 7b10126188..33aa1a5047 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -281,8 +281,11 @@ public interface ProcessService {
     void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status);
 
     TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
-                                            String taskName, Integer groupId,
-                                            Integer processId, Integer 
priority, TaskGroupQueueStatus status);
+                                            String taskName,
+                                            Integer groupId,
+                                            Integer processId,
+                                            Integer priority,
+                                            TaskGroupQueueStatus status);
 
     int updateTaskGroupQueueStatus(Integer taskId, int status);
 
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index f8621c4afe..e580f1f74d 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -2944,7 +2944,12 @@ public class ProcessServiceImpl implements 
ProcessService {
         // Create a waiting taskGroupQueue, after acquire resource, we can 
update the status to ACQUIRE_SUCCESS
         TaskGroupQueue taskGroupQueue = 
this.taskGroupQueueMapper.queryByTaskId(taskId);
         if (taskGroupQueue == null) {
-            taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, 
groupId, processId, priority,
+            taskGroupQueue = insertIntoTaskGroupQueue(
+                    taskId,
+                    taskName,
+                    groupId,
+                    processId,
+                    priority,
                     TaskGroupQueueStatus.WAIT_QUEUE);
         } else {
             logger.info("The task queue is already exist, taskId: {}", taskId);
@@ -2956,7 +2961,9 @@ public class ProcessServiceImpl implements ProcessService 
{
             this.taskGroupQueueMapper.updateById(taskGroupQueue);
         }
         // check if there already exist higher priority tasks
-        List<TaskGroupQueue> highPriorityTasks = 
taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority,
+        List<TaskGroupQueue> highPriorityTasks = 
taskGroupQueueMapper.queryHighPriorityTasks(
+                groupId,
+                priority,
                 TaskGroupQueueStatus.WAIT_QUEUE.getCode());
         if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
             return false;
@@ -2977,20 +2984,27 @@ public class ProcessServiceImpl implements 
ProcessService {
      */
     @Override
     public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
-        TaskGroup taskGroup = 
taskGroupMapper.selectById(taskGroupQueue.getGroupId());
-        int affectedCount = 
taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),
-                taskGroupQueue.getId(),
-                TaskGroupQueueStatus.WAIT_QUEUE.getCode());
-        if (affectedCount > 0) {
-            logger.info("Success rob taskGroup, taskInstanceId: {}, 
taskGroupId: {}", taskGroupQueue.getTaskId(),
-                    taskGroupQueue.getId());
-            taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
-            this.taskGroupQueueMapper.updateById(taskGroupQueue);
-            this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), 
taskGroupQueue.getId());
-            return true;
+        // set the default max size to avoid dead loop
+        for (int i = 0; i < 10; i++) {
+            TaskGroup taskGroup = 
taskGroupMapper.selectById(taskGroupQueue.getGroupId());
+            if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) {
+                logger.info("The current task Group is full, taskGroup: {}", 
taskGroup);
+                return false;
+            }
+            int affectedCount = 
taskGroupMapper.robTaskGroupResource(taskGroup.getId(),
+                    taskGroup.getUseSize(),
+                    taskGroupQueue.getId(),
+                    TaskGroupQueueStatus.WAIT_QUEUE.getCode());
+            if (affectedCount > 0) {
+                logger.info("Success rob taskGroup, taskInstanceId: {}, 
taskGroupId: {}", taskGroupQueue.getTaskId(),
+                        taskGroupQueue.getId());
+                taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
+                this.taskGroupQueueMapper.updateById(taskGroupQueue);
+                this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), 
taskGroupQueue.getId());
+                return true;
+            }
         }
-        logger.info("Failed to rob taskGroup, taskInstanceId: {}, taskGroupId: 
{}", taskGroupQueue.getTaskId(),
-                taskGroupQueue.getId());
+        logger.info("Failed to rob taskGroup, taskGroupQueue: {}", 
taskGroupQueue);
         return false;
     }
 
@@ -3071,23 +3085,24 @@ public class ProcessServiceImpl implements 
ProcessService {
         taskGroupQueueMapper.updateById(taskGroupQueue);
     }
 
-    /**
-     * insert into task group queue
-     *
-     * @param taskId    task id
-     * @param taskName  task name
-     * @param groupId   group id
-     * @param processId process id
-     * @param priority  priority
-     * @return inserted task group queue
-     */
     @Override
-    public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
-                                                   String taskName, Integer 
groupId,
-                                                   Integer processId, Integer 
priority, TaskGroupQueueStatus status) {
-        TaskGroupQueue taskGroupQueue = new TaskGroupQueue(taskId, taskName, 
groupId, processId, priority, status);
-        taskGroupQueue.setCreateTime(new Date());
-        taskGroupQueue.setUpdateTime(new Date());
+    public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskInstanceId,
+                                                   String taskName,
+                                                   Integer taskGroupId,
+                                                   Integer workflowInstanceId,
+                                                   Integer taskGroupPriority,
+                                                   TaskGroupQueueStatus 
status) {
+        Date now = new Date();
+        TaskGroupQueue taskGroupQueue = TaskGroupQueue.builder()
+                .taskId(taskInstanceId)
+                .taskName(taskName)
+                .groupId(taskGroupId)
+                .processId(workflowInstanceId)
+                .priority(taskGroupPriority)
+                .status(status)
+                .createTime(now)
+                .updateTime(now)
+                .build();
         taskGroupQueueMapper.insert(taskGroupQueue);
         return taskGroupQueue;
     }

Reply via email to