This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ba4dd9b11 [INLONG-7577][Manager] Fix high CPU usage when the number of
StreamSource is too large (#7578)
ba4dd9b11 is described below
commit ba4dd9b118131952e4a94a68168913b8f60bcf6c
Author: fuweng11 <[email protected]>
AuthorDate: Mon Mar 13 16:50:38 2023 +0800
[INLONG-7577][Manager] Fix high CPU usage when the number of StreamSource
is too large (#7578)
---
.../dao/mapper/StreamSourceEntityMapper.java | 10 ++++
.../resources/mappers/StreamSourceEntityMapper.xml | 9 ++++
.../service/core/impl/AgentServiceImpl.java | 53 +++++++++-------------
3 files changed, 40 insertions(+), 32 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index ada588eda..ba7ad89d1 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -41,6 +41,16 @@ public interface StreamSourceEntityMapper {
*/
StreamSourceEntity selectForAgentTask(Integer id);
+ /**
+ * Select one sub source by template id and agent ip.
+ *
+ * @param templateId template id
+ * @param agentIp agent ip
+ * @return stream source info
+ */
+ StreamSourceEntity selectOneByTemplatedIdAndAgentIp(@Param("templateId")
Integer templateId,
+ @Param("agentIp") String agentIp);
+
/**
* Query un-deleted sources by the given agentIp.
*/
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 983f5809f..0f6aeb5bb 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -97,6 +97,15 @@
where id = #{id,jdbcType=INTEGER}
for update
</select>
+ <select id="selectOneByTemplatedIdAndAgentIp"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_source
+ where template_id = #{templateId,jdbcType=INTEGER}
+ and agent_ip = #{agentIp, jdbcType=VARCHAR}
+ and is_deleted = 0
+ limit 1
+ </select>
<select id="selectCount" resultType="java.lang.Integer">
select count(1)
from stream_source
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 3873ae39b..065831141 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -23,7 +23,6 @@ import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.db.CommandEntity;
@@ -81,7 +80,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -167,9 +165,7 @@ public class AgentServiceImpl implements AgentService {
nextStatus = SourceStatus.SOURCE_FAILED.getCode();
} else if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
// Change the status from 30x to normal / disable / frozen
- if (SourceStatus.TEMP_TO_NORMAL.contains(previousStatus)) {
- nextStatus = SourceStatus.SOURCE_NORMAL.getCode();
- } else if (SourceStatus.BEEN_ISSUED_DELETE.getCode() ==
previousStatus) {
+ if (SourceStatus.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
nextStatus = SourceStatus.SOURCE_DISABLE.getCode();
} else if (SourceStatus.BEEN_ISSUED_FROZEN.getCode() ==
previousStatus) {
nextStatus = SourceStatus.SOURCE_FROZEN.getCode();
@@ -338,31 +334,25 @@ public class AgentServiceImpl implements AgentService {
// find those node whose tag match stream_source tag and agent ip
match stream_source agent ip
List<StreamSourceEntity> sourceEntities =
sourceMapper.selectTemplateSourceByCluster(needCopiedStatusList,
Lists.newArrayList(SourceType.FILE), agentClusterName);
- sourceEntities.stream()
- .filter(sourceEntity -> sourceEntity.getTemplateId() == null)
// only apply template task
- .map(sourceEntity -> {
- List<StreamSourceEntity> subSources =
sourceMapper.selectByTemplateId(sourceEntity.getId());
- Optional<StreamSourceEntity> optionalSource =
subSources.stream()
- .filter(subSource ->
subSource.getAgentIp().equals(agentIp))
- .findAny();
- return Pair.<StreamSourceEntity, Optional>of(sourceEntity,
optionalSource);
- }).filter(parAndSonEntity ->
!parAndSonEntity.getValue().isPresent()) // haven't cloned subtask
- .forEach(parAndSonEntity -> {
- // if not, clone a subtask for this Agent.
- // note: a new source name with random suffix is generated
to adhere to the unique constraint
- StreamSourceEntity sourceEntity = parAndSonEntity.getKey();
- StreamSourceEntity fileEntity =
- CommonBeanUtils.copyProperties(sourceEntity,
StreamSourceEntity::new);
- fileEntity.setSourceName(fileEntity.getSourceName() + "-"
- +
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT));
- fileEntity.setTemplateId(sourceEntity.getId());
- fileEntity.setAgentIp(agentIp);
-
fileEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
- // create new sub source task
- sourceMapper.insert(fileEntity);
- LOGGER.info("Transform new template task({}) for agent({})
in cluster({}).",
- fileEntity.getId(), taskRequest.getAgentIp(),
taskRequest.getClusterName());
- });
+ sourceEntities.forEach(sourceEntity -> {
+ StreamSourceEntity subSource =
sourceMapper.selectOneByTemplatedIdAndAgentIp(sourceEntity.getId(),
+ agentIp);
+ if (subSource == null) {
+ // if not, clone a subtask for this Agent.
+ // note: a new source name with random suffix is generated to
adhere to the unique constraint
+ StreamSourceEntity fileEntity =
+ CommonBeanUtils.copyProperties(sourceEntity,
StreamSourceEntity::new);
+ fileEntity.setSourceName(fileEntity.getSourceName() + "-"
+ +
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT));
+ fileEntity.setTemplateId(sourceEntity.getId());
+ fileEntity.setAgentIp(agentIp);
+ fileEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
+ // create new sub source task
+ sourceMapper.insert(fileEntity);
+ LOGGER.info("Transform new template task({}) for agent({}) in
cluster({}).",
+ fileEntity.getId(), taskRequest.getAgentIp(),
taskRequest.getClusterName());
+ }
+ });
}
/**
@@ -410,8 +400,7 @@ public class AgentServiceImpl implements AgentService {
SourceStatus.SOURCE_NORMAL,
SourceStatus.TO_BE_ISSUED_ADD,
SourceStatus.TO_BE_ISSUED_ACTIVE);
- Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet(
- StreamStatus.SUSPENDED, StreamStatus.SUSPENDED);
+ Set<StreamStatus> exceptedMatchedStreamStatus =
Sets.newHashSet(StreamStatus.SUSPENDED);
if (matchGroup(sourceEntity, clusterNodeEntity)
&&
!exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))
&&
!exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus())))
{