healchow commented on code in PR #7578:
URL: https://github.com/apache/inlong/pull/7578#discussion_r1133487231
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java:
##########
@@ -338,31 +334,25 @@ private void preProcessTemplateFileTask(TaskRequest
taskRequest) {
// find those node whose tag match stream_source tag and agent ip
match stream_source agent ip
List<StreamSourceEntity> sourceEntities =
sourceMapper.selectTemplateSourceByCluster(needCopiedStatusList,
Lists.newArrayList(SourceType.FILE), agentClusterName);
- sourceEntities.stream()
- .filter(sourceEntity -> sourceEntity.getTemplateId() == null)
// only apply template task
- .map(sourceEntity -> {
- List<StreamSourceEntity> subSources =
sourceMapper.selectByTemplateId(sourceEntity.getId());
- Optional<StreamSourceEntity> optionalSource =
subSources.stream()
- .filter(subSource ->
subSource.getAgentIp().equals(agentIp))
- .findAny();
- return Pair.<StreamSourceEntity, Optional>of(sourceEntity,
optionalSource);
- }).filter(parAndSonEntity ->
!parAndSonEntity.getValue().isPresent()) // haven't cloned subtask
- .forEach(parAndSonEntity -> {
- // if not, clone a subtask for this Agent.
- // note: a new source name with random suffix is generated
to adhere to the unique constraint
- StreamSourceEntity sourceEntity = parAndSonEntity.getKey();
- StreamSourceEntity fileEntity =
- CommonBeanUtils.copyProperties(sourceEntity,
StreamSourceEntity::new);
- fileEntity.setSourceName(fileEntity.getSourceName() + "-"
- +
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT));
- fileEntity.setTemplateId(sourceEntity.getId());
- fileEntity.setAgentIp(agentIp);
-
fileEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
- // create new sub source task
- sourceMapper.insert(fileEntity);
- LOGGER.info("Transform new template task({}) for agent({})
in cluster({}).",
- fileEntity.getId(), taskRequest.getAgentIp(),
taskRequest.getClusterName());
- });
+ sourceEntities.forEach(sourceEntity -> {
+ StreamSourceEntity subSource =
sourceMapper.selectOneByTemplatedIdAndAgentIp(sourceEntity.getId(),
Review Comment:
Are you sure it's the code here that is causing the high CPU usage?
Only one query is made here, and the amount of data is too large, which
should lead to excessive memory usage, and then trigger the GC process of the
JVM.
After changing to query one by one, you can solve the problem of too many
query results, but it will cause the method to cycle too many times, frequently
query the DB, put too much pressure on the DB, and take too long time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]