This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 62a399d [INLONG-3310][Manager] Optimize Pessimistic Lock for select
stream sources (#3311)
62a399d is described below
commit 62a399dcb8b9bc6dabeaebd37f3c1bb5ddc0bce9
Author: kipshi <[email protected]>
AuthorDate: Wed Mar 23 09:43:57 2022 +0800
[INLONG-3310][Manager] Optimize Pessimistic Lock for select stream sources
(#3311)
---
.../dao/mapper/StreamSourceEntityMapper.java | 16 +++++++++++--
.../resources/mappers/StreamSourceEntityMapper.xml | 28 ++++++++++++++++++++++
.../service/core/impl/AgentServiceImpl.java | 5 ++--
.../service/core/impl/InlongStreamServiceImpl.java | 2 +-
.../source/AbstractStreamSourceOperation.java | 4 ++++
5 files changed, 50 insertions(+), 5 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index e6a7d68..0eee410 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -64,17 +64,29 @@ public interface StreamSourceEntityMapper {
@Param("streamId") String streamId, @Param("sourceType") String
sourceType);
/**
- * Query the tasks that need to be added.
+ * Query the tasks that need to be added for update.
*/
List<StreamSourceEntity> selectByStatusForUpdate(@Param("list")
List<Integer> list);
+
/**
- * Query the sources with status 20x by the given agent IP and agent UUID.
+ * Query the tasks that need to be added.
+ */
+ List<StreamSourceEntity> selectByStatus(@Param("list") List<Integer> list);
+
+ /**
+ * Query the sources with status 20x by the given agent IP and agent UUID
for update.
*/
List<StreamSourceEntity> selectByStatusAndIpForUpdate(@Param("statusList")
List<Integer> statusList,
@Param("agentIp") String agentIp, @Param("uuid") String uuid);
/**
+ * Query the sources with status 20x by the given agent IP and agent UUID.
+ */
+ List<StreamSourceEntity> selectByStatusAndIp(@Param("statusList")
List<Integer> statusList,
+ @Param("agentIp") String agentIp, @Param("uuid") String uuid);
+
+ /**
* Get the distinct source type from the given groupId and streamId
*/
List<String> selectSourceType(@Param("groupId") String groupId,
@Param("streamId") String streamId);
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index a76a1d5..be1c3c0 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -175,6 +175,18 @@
limit 2 for update
</where>
</select>
+ <select id="selectByStatus"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_source
+ <where>
+ is_deleted = 0
+ and status in
+ <foreach item="item" index="index" collection="list" open="("
close=")" separator=",">
+ #{item}
+ </foreach>
+ </where>
+ </select>
<select id="selectByStatusAndIpForUpdate"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
@@ -192,6 +204,22 @@
for update
</where>
</select>
+ <select id="selectByStatusAndIp"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_source
+ <where>
+ is_deleted = 0
+ and status in
+ <foreach item="item" index="index" collection="statusList"
open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ and agent_ip = #{agentIp, jdbcType=VARCHAR}
+ <if test="uuid != null and uuid != ''">
+ and uuid = #{uuid, jdbcType=VARCHAR}
+ </if>
+ </where>
+ </select>
<select id="selectSourceType" resultType="java.lang.String">
select distinct (source_type)
from stream_source
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 2f61f30..7f67bed 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -174,7 +174,7 @@ public class AgentServiceImpl implements AgentService {
// Query the tasks that needed to add or active - without agentIp and
uuid
List<Integer> addedStatusList =
Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
SourceState.TO_BE_ISSUED_ACTIVE.getCode());
- List<StreamSourceEntity> entityList =
sourceMapper.selectByStatusForUpdate(addedStatusList);
+ List<StreamSourceEntity> entityList =
sourceMapper.selectByStatus(addedStatusList);
String agentIp = request.getAgentIp();
String uuid = request.getUuid();
@@ -183,13 +183,14 @@ public class AgentServiceImpl implements AgentService {
SourceState.TO_BE_ISSUED_RETRY.getCode(),
SourceState.TO_BE_ISSUED_BACKTRACK.getCode(),
SourceState.TO_BE_ISSUED_FROZEN.getCode(),
SourceState.TO_BE_ISSUED_CHECK.getCode(),
SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(),
SourceState.TO_BE_ISSUED_MAKEUP.getCode());
- List<StreamSourceEntity> addedList =
sourceMapper.selectByStatusAndIpForUpdate(statusList, agentIp, uuid);
+ List<StreamSourceEntity> addedList =
sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
entityList.addAll(addedList);
List<DataConfig> dataConfigs = Lists.newArrayList();
for (StreamSourceEntity entity : entityList) {
// Change 20x to 30x
int id = entity.getId();
+ entity = sourceMapper.selectByIdForUpdate(id);
int status = entity.getStatus();
int op = status % MODULUS_100;
if (status / MODULUS_100 == UNISSUED_STATUS) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index cc3d97f..8315922 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -246,7 +246,7 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
this.updateField(groupId, streamId, streamInfo.getFieldList());
}
- LOGGER.info("success to update inlong group for groupId={}", groupId);
+ LOGGER.info("success to update inlong stream for groupId={}", groupId);
return true;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index c82787f..918ce16 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.source;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -33,12 +34,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.validation.constraints.NotNull;
import java.util.Date;
import java.util.List;
+@Slf4j
public abstract class AbstractStreamSourceOperation implements
StreamSourceOperation {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractStreamSourceOperation.class);
@@ -99,6 +102,7 @@ public abstract class AbstractStreamSourceOperation
implements StreamSourceOpera
}
@Override
+ @Transactional(rollbackFor = Throwable.class, propagation =
Propagation.NOT_SUPPORTED)
public SourceResponse getById(@NotNull Integer id) {
StreamSourceEntity entity = sourceMapper.selectById(id);
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());