This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ba4dd9b11 [INLONG-7577][Manager] Fix high CPU usage when the number of 
StreamSource is too large (#7578)
ba4dd9b11 is described below

commit ba4dd9b118131952e4a94a68168913b8f60bcf6c
Author: fuweng11 <[email protected]>
AuthorDate: Mon Mar 13 16:50:38 2023 +0800

    [INLONG-7577][Manager] Fix high CPU usage when the number of StreamSource 
is too large (#7578)
---
 .../dao/mapper/StreamSourceEntityMapper.java       | 10 ++++
 .../resources/mappers/StreamSourceEntityMapper.xml |  9 ++++
 .../service/core/impl/AgentServiceImpl.java        | 53 +++++++++-------------
 3 files changed, 40 insertions(+), 32 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index ada588eda..ba7ad89d1 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -41,6 +41,16 @@ public interface StreamSourceEntityMapper {
      */
     StreamSourceEntity selectForAgentTask(Integer id);
 
+    /**
+     * Select one sub source by template id and agent ip.
+     *
+     * @param templateId template id
+     * @param agentIp agent ip
+     * @return stream source info
+     */
+    StreamSourceEntity selectOneByTemplatedIdAndAgentIp(@Param("templateId") 
Integer templateId,
+            @Param("agentIp") String agentIp);
+
     /**
      * Query un-deleted sources by the given agentIp.
      */
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 983f5809f..0f6aeb5bb 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -97,6 +97,15 @@
         where id = #{id,jdbcType=INTEGER}
         for update
     </select>
+    <select id="selectOneByTemplatedIdAndAgentIp" 
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_source
+        where template_id = #{templateId,jdbcType=INTEGER}
+        and agent_ip = #{agentIp, jdbcType=VARCHAR}
+        and is_deleted = 0
+        limit 1
+    </select>
     <select id="selectCount" resultType="java.lang.Integer">
         select count(1)
         from stream_source
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 3873ae39b..065831141 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -23,7 +23,6 @@ import com.google.gson.Gson;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.common.db.CommandEntity;
@@ -81,7 +80,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -167,9 +165,7 @@ public class AgentServiceImpl implements AgentService {
             nextStatus = SourceStatus.SOURCE_FAILED.getCode();
         } else if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
             // Change the status from 30x to normal / disable / frozen
-            if (SourceStatus.TEMP_TO_NORMAL.contains(previousStatus)) {
-                nextStatus = SourceStatus.SOURCE_NORMAL.getCode();
-            } else if (SourceStatus.BEEN_ISSUED_DELETE.getCode() == 
previousStatus) {
+            if (SourceStatus.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
                 nextStatus = SourceStatus.SOURCE_DISABLE.getCode();
             } else if (SourceStatus.BEEN_ISSUED_FROZEN.getCode() == 
previousStatus) {
                 nextStatus = SourceStatus.SOURCE_FROZEN.getCode();
@@ -338,31 +334,25 @@ public class AgentServiceImpl implements AgentService {
         // find those node whose tag match stream_source tag and agent ip 
match stream_source agent ip
         List<StreamSourceEntity> sourceEntities = 
sourceMapper.selectTemplateSourceByCluster(needCopiedStatusList,
                 Lists.newArrayList(SourceType.FILE), agentClusterName);
-        sourceEntities.stream()
-                .filter(sourceEntity -> sourceEntity.getTemplateId() == null) 
// only apply template task
-                .map(sourceEntity -> {
-                    List<StreamSourceEntity> subSources = 
sourceMapper.selectByTemplateId(sourceEntity.getId());
-                    Optional<StreamSourceEntity> optionalSource = 
subSources.stream()
-                            .filter(subSource -> 
subSource.getAgentIp().equals(agentIp))
-                            .findAny();
-                    return Pair.<StreamSourceEntity, Optional>of(sourceEntity, 
optionalSource);
-                }).filter(parAndSonEntity -> 
!parAndSonEntity.getValue().isPresent()) // haven't cloned subtask
-                .forEach(parAndSonEntity -> {
-                    // if not, clone a subtask for this Agent.
-                    // note: a new source name with random suffix is generated 
to adhere to the unique constraint
-                    StreamSourceEntity sourceEntity = parAndSonEntity.getKey();
-                    StreamSourceEntity fileEntity =
-                            CommonBeanUtils.copyProperties(sourceEntity, 
StreamSourceEntity::new);
-                    fileEntity.setSourceName(fileEntity.getSourceName() + "-"
-                            + 
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT));
-                    fileEntity.setTemplateId(sourceEntity.getId());
-                    fileEntity.setAgentIp(agentIp);
-                    
fileEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
-                    // create new sub source task
-                    sourceMapper.insert(fileEntity);
-                    LOGGER.info("Transform new template task({}) for agent({}) 
in cluster({}).",
-                            fileEntity.getId(), taskRequest.getAgentIp(), 
taskRequest.getClusterName());
-                });
+        sourceEntities.forEach(sourceEntity -> {
+            StreamSourceEntity subSource = 
sourceMapper.selectOneByTemplatedIdAndAgentIp(sourceEntity.getId(),
+                    agentIp);
+            if (subSource == null) {
+                // if not, clone a subtask for this Agent.
+                // note: a new source name with random suffix is generated to 
adhere to the unique constraint
+                StreamSourceEntity fileEntity =
+                        CommonBeanUtils.copyProperties(sourceEntity, 
StreamSourceEntity::new);
+                fileEntity.setSourceName(fileEntity.getSourceName() + "-"
+                        + 
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT));
+                fileEntity.setTemplateId(sourceEntity.getId());
+                fileEntity.setAgentIp(agentIp);
+                fileEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
+                // create new sub source task
+                sourceMapper.insert(fileEntity);
+                LOGGER.info("Transform new template task({}) for agent({}) in 
cluster({}).",
+                        fileEntity.getId(), taskRequest.getAgentIp(), 
taskRequest.getClusterName());
+            }
+        });
     }
 
     /**
@@ -410,8 +400,7 @@ public class AgentServiceImpl implements AgentService {
                     SourceStatus.SOURCE_NORMAL,
                     SourceStatus.TO_BE_ISSUED_ADD,
                     SourceStatus.TO_BE_ISSUED_ACTIVE);
-            Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet(
-                    StreamStatus.SUSPENDED, StreamStatus.SUSPENDED);
+            Set<StreamStatus> exceptedMatchedStreamStatus = 
Sets.newHashSet(StreamStatus.SUSPENDED);
             if (matchGroup(sourceEntity, clusterNodeEntity)
                     && 
!exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))
                     && 
!exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus())))
 {

Reply via email to