fuweng11 commented on code in PR #7578:
URL: https://github.com/apache/inlong/pull/7578#discussion_r1133521905


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java:
##########
@@ -338,31 +334,25 @@ private void preProcessTemplateFileTask(TaskRequest 
taskRequest) {
         // find those node whose tag match stream_source tag and agent ip 
match stream_source agent ip
         List<StreamSourceEntity> sourceEntities = 
sourceMapper.selectTemplateSourceByCluster(needCopiedStatusList,
                 Lists.newArrayList(SourceType.FILE), agentClusterName);
-        sourceEntities.stream()
-                .filter(sourceEntity -> sourceEntity.getTemplateId() == null) 
// only apply template task
-                .map(sourceEntity -> {
-                    List<StreamSourceEntity> subSources = 
sourceMapper.selectByTemplateId(sourceEntity.getId());
-                    Optional<StreamSourceEntity> optionalSource = 
subSources.stream()
-                            .filter(subSource -> 
subSource.getAgentIp().equals(agentIp))
-                            .findAny();
-                    return Pair.<StreamSourceEntity, Optional>of(sourceEntity, 
optionalSource);
-                }).filter(parAndSonEntity -> 
!parAndSonEntity.getValue().isPresent()) // haven't cloned subtask
-                .forEach(parAndSonEntity -> {
-                    // if not, clone a subtask for this Agent.
-                    // note: a new source name with random suffix is generated 
to adhere to the unique constraint
-                    StreamSourceEntity sourceEntity = parAndSonEntity.getKey();
-                    StreamSourceEntity fileEntity =
-                            CommonBeanUtils.copyProperties(sourceEntity, 
StreamSourceEntity::new);
-                    fileEntity.setSourceName(fileEntity.getSourceName() + "-"
-                            + 
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT));
-                    fileEntity.setTemplateId(sourceEntity.getId());
-                    fileEntity.setAgentIp(agentIp);
-                    
fileEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
-                    // create new sub source task
-                    sourceMapper.insert(fileEntity);
-                    LOGGER.info("Transform new template task({}) for agent({}) 
in cluster({}).",
-                            fileEntity.getId(), taskRequest.getAgentIp(), 
taskRequest.getClusterName());
-                });
+        sourceEntities.forEach(sourceEntity -> {
+            StreamSourceEntity subSource = 
sourceMapper.selectOneByTemplatedIdAndAgentIp(sourceEntity.getId(),

Review Comment:
   The previous query was to pull out all Streamsources in the templateId and 
filter them.
   If there were 200 agents, this method would generate 500 + streamSource 
instances each time, leading to a large amount of gc.
   Filtering is now done directly in the mysql layer to avoid generating too 
many instances and causing gc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to