This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 62a399d  [INLONG-3310][Manager] Optimize Pessimistic Lock for select 
stream sources (#3311)
62a399d is described below

commit 62a399dcb8b9bc6dabeaebd37f3c1bb5ddc0bce9
Author: kipshi <[email protected]>
AuthorDate: Wed Mar 23 09:43:57 2022 +0800

    [INLONG-3310][Manager] Optimize Pessimistic Lock for select stream sources 
(#3311)
---
 .../dao/mapper/StreamSourceEntityMapper.java       | 16 +++++++++++--
 .../resources/mappers/StreamSourceEntityMapper.xml | 28 ++++++++++++++++++++++
 .../service/core/impl/AgentServiceImpl.java        |  5 ++--
 .../service/core/impl/InlongStreamServiceImpl.java |  2 +-
 .../source/AbstractStreamSourceOperation.java      |  4 ++++
 5 files changed, 50 insertions(+), 5 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index e6a7d68..0eee410 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -64,17 +64,29 @@ public interface StreamSourceEntityMapper {
             @Param("streamId") String streamId, @Param("sourceType") String 
sourceType);
 
     /**
-     * Query the tasks that need to be added.
+     * Query the tasks that need to be added for update.
      */
     List<StreamSourceEntity> selectByStatusForUpdate(@Param("list") 
List<Integer> list);
 
+
     /**
-     * Query the sources with status 20x by the given agent IP and agent UUID.
+     * Query the tasks that need to be added.
+     */
+    List<StreamSourceEntity> selectByStatus(@Param("list") List<Integer> list);
+
+    /**
+     * Query the sources with status 20x by the given agent IP and agent UUID 
for update.
      */
     List<StreamSourceEntity> selectByStatusAndIpForUpdate(@Param("statusList") 
List<Integer> statusList,
             @Param("agentIp") String agentIp, @Param("uuid") String uuid);
 
     /**
+     * Query the sources with status 20x by the given agent IP and agent UUID.
+     */
+    List<StreamSourceEntity> selectByStatusAndIp(@Param("statusList") 
List<Integer> statusList,
+            @Param("agentIp") String agentIp, @Param("uuid") String uuid);
+
+    /**
      * Get the distinct source type from the given groupId and streamId
      */
     List<String> selectSourceType(@Param("groupId") String groupId, 
@Param("streamId") String streamId);
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index a76a1d5..be1c3c0 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -175,6 +175,18 @@
             limit 2 for update
         </where>
     </select>
+    <select id="selectByStatus" 
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_source
+        <where>
+            is_deleted = 0
+            and status in
+            <foreach item="item" index="index" collection="list" open="(" 
close=")" separator=",">
+                #{item}
+            </foreach>
+        </where>
+    </select>
     <select id="selectByStatusAndIpForUpdate" 
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
@@ -192,6 +204,22 @@
             for update
         </where>
     </select>
+    <select id="selectByStatusAndIp" 
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_source
+        <where>
+            is_deleted = 0
+            and status in
+            <foreach item="item" index="index" collection="statusList" 
open="(" close=")" separator=",">
+                #{item}
+            </foreach>
+            and agent_ip = #{agentIp, jdbcType=VARCHAR}
+            <if test="uuid != null and uuid != ''">
+                and uuid = #{uuid, jdbcType=VARCHAR}
+            </if>
+        </where>
+    </select>
     <select id="selectSourceType" resultType="java.lang.String">
         select distinct (source_type)
         from stream_source
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 2f61f30..7f67bed 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -174,7 +174,7 @@ public class AgentServiceImpl implements AgentService {
         // Query the tasks that needed to add or active - without agentIp and 
uuid
         List<Integer> addedStatusList = 
Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
                 SourceState.TO_BE_ISSUED_ACTIVE.getCode());
-        List<StreamSourceEntity> entityList = 
sourceMapper.selectByStatusForUpdate(addedStatusList);
+        List<StreamSourceEntity> entityList = 
sourceMapper.selectByStatus(addedStatusList);
 
         String agentIp = request.getAgentIp();
         String uuid = request.getUuid();
@@ -183,13 +183,14 @@ public class AgentServiceImpl implements AgentService {
                 SourceState.TO_BE_ISSUED_RETRY.getCode(), 
SourceState.TO_BE_ISSUED_BACKTRACK.getCode(),
                 SourceState.TO_BE_ISSUED_FROZEN.getCode(), 
SourceState.TO_BE_ISSUED_CHECK.getCode(),
                 SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(), 
SourceState.TO_BE_ISSUED_MAKEUP.getCode());
-        List<StreamSourceEntity> addedList = 
sourceMapper.selectByStatusAndIpForUpdate(statusList, agentIp, uuid);
+        List<StreamSourceEntity> addedList = 
sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
         entityList.addAll(addedList);
 
         List<DataConfig> dataConfigs = Lists.newArrayList();
         for (StreamSourceEntity entity : entityList) {
             // Change 20x to 30x
             int id = entity.getId();
+            entity = sourceMapper.selectByIdForUpdate(id);
             int status = entity.getStatus();
             int op = status % MODULUS_100;
             if (status / MODULUS_100 == UNISSUED_STATUS) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index cc3d97f..8315922 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -246,7 +246,7 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
             this.updateField(groupId, streamId, streamInfo.getFieldList());
         }
 
-        LOGGER.info("success to update inlong group for groupId={}", groupId);
+        LOGGER.info("success to update inlong stream for groupId={}", groupId);
         return true;
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index c82787f..918ce16 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.source;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -33,12 +34,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import javax.validation.constraints.NotNull;
 import java.util.Date;
 import java.util.List;
 
+@Slf4j
 public abstract class AbstractStreamSourceOperation implements 
StreamSourceOperation {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractStreamSourceOperation.class);
@@ -99,6 +102,7 @@ public abstract class AbstractStreamSourceOperation 
implements StreamSourceOpera
     }
 
     @Override
+    @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.NOT_SUPPORTED)
     public SourceResponse getById(@NotNull Integer id) {
         StreamSourceEntity entity = sourceMapper.selectById(id);
         Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());

Reply via email to