This is an automated email from the ASF dual-hosted git repository.

zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new ddc1cee3 feat: optimize log processing and configuration delivery 
logic (#637)
ddc1cee3 is described below

commit ddc1cee3da7316116962d4701705a70f18f47a41
Author: wtt <[email protected]>
AuthorDate: Thu Mar 5 18:39:27 2026 +0800

    feat: optimize log processing and configuration delivery logic (#637)
    
    * fix: solve the blocking problem caused by serverless startup
    
    * fix: fix pipeline execution logic error
    
    * feat: optimize log processing and configuration delivery logic
    
    * fix: fix the logic problem when obtaining the list of all agents
    
    * refactor: optimize log collection and configure publishing logic
    
    * fix: correct comment content and adjust code style
    
    * feat: add synchronous sending configuration function and optimize sending 
logic
    
    * refactor: optimize agentMachine default value and collection 
initialization method
---
 ozhera-log/log-agent-server/pom.xml                |   3 +-
 .../server/service/DefaultLogProcessCollector.java | 447 ++++++++++++++++++---
 .../service/DefaultPublishConfigService.java       | 217 ++++++++--
 .../log/server/DefaultLogProcessCollectorTest.java |   4 +-
 ozhera-log/log-agent/pom.xml                       |   2 +-
 .../log/agent/channel/ChannelServiceImpl.java      |   1 +
 ozhera-log/log-api/pom.xml                         |   1 +
 .../log/api/service/PublishConfigService.java      |   2 +
 ozhera-log/log-manager/pom.xml                     |   4 +-
 .../extension/agent/MilogAgentServiceImpl.java     |  23 +-
 .../service/impl/MiLogMetaManageServiceImpl.java   |   7 +-
 11 files changed, 616 insertions(+), 95 deletions(-)

diff --git a/ozhera-log/log-agent-server/pom.xml 
b/ozhera-log/log-agent-server/pom.xml
index 531ac649..691da14c 100644
--- a/ozhera-log/log-agent-server/pom.xml
+++ b/ozhera-log/log-agent-server/pom.xml
@@ -26,7 +26,7 @@ http://www.apache.org/licenses/LICENSE-2.0
         <version>2.2.6-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <version>2.2.8-SNAPSHOT</version>
+    <version>2.2.9-SNAPSHOT</version>
 
     <artifactId>log-agent-server</artifactId>
 
@@ -82,6 +82,7 @@ http://www.apache.org/licenses/LICENSE-2.0
         <dependency>
             <groupId>org.apache.ozhera</groupId>
             <artifactId>log-api</artifactId>
+            <version>2.2.7-SNAPSHOT</version>
         </dependency>
 
         <dependency>
diff --git 
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultLogProcessCollector.java
 
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultLogProcessCollector.java
index 7bc32451..079c6b56 100644
--- 
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultLogProcessCollector.java
+++ 
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultLogProcessCollector.java
@@ -42,6 +42,8 @@ import javax.annotation.Resource;
 import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -61,6 +63,12 @@ public class DefaultLogProcessCollector implements 
LogProcessCollector {
 
     private final Map<String, List<UpdateLogProcessCmd.CollectDetail>> 
tailProgressMap = new ConcurrentHashMap<>(256);
 
+    // Maximum number of stored IPs to prevent unlimited memory growth
+    private static final int MAX_IP_COUNT = 200000;
+
+    // The maximum number of CollectDetails under each IP to prevent excessive 
data for a single IP
+    private static final int MAX_COLLECT_DETAIL_PER_IP = 5000;
+
     private static final Integer MAX_INTERRUPT_TIME = 10;
 
     private static final Integer MAX_STATIC_INTERRUPT_TIME_HOUR = 4;
@@ -70,13 +78,242 @@ public class DefaultLogProcessCollector implements 
LogProcessCollector {
     @Resource
     private RpcServer rpcServer;
 
+    public void init() {
+        // Scheduled cleaning tasks
+        ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(r -> {
+            Thread t = new Thread(r, "LogProcessCollector-Cleanup-Thread");
+            t.setDaemon(true);
+            return t;
+        });
+
+        // Every 2 minutes, perform cleanup of expired data
+        
scheduledExecutorService.scheduleWithFixedDelay(this::cleanupExpiredData, 2, 2, 
TimeUnit.MINUTES);
+        log.info("Initialized scheduled cleanup for LogProcessCollector");
+    }
+
+
     @Override
     public void collectLogProcess(UpdateLogProcessCmd cmd) {
         log.debug("[LogProcess.updateLogProcess] cmd:{} ", cmd);
         if (cmd == null || StringUtils.isEmpty(cmd.getIp())) {
             return;
         }
-        tailProgressMap.put(cmd.getIp(), cmd.getCollectList());
+
+        // Deduplicate the collected data to avoid duplicate data
+        List<UpdateLogProcessCmd.CollectDetail> deduplicatedCollectList = 
deduplicateCollectList(cmd.getCollectList());
+
+        // Control the size of the map to prevent memory overflow
+        if (tailProgressMap.size() >= MAX_IP_COUNT) {
+            log.warn("tailProgressMap size reaches limit {}, removing oldest 
entries", MAX_IP_COUNT);
+            // Remove some old entries, keep the latest
+            removeOldestEntries();
+        }
+
+        tailProgressMap.put(cmd.getIp(), deduplicatedCollectList);
+    }
+
+    /**
+     * Deduplicate the collect list
+     */
+    private List<UpdateLogProcessCmd.CollectDetail> 
deduplicateCollectList(List<UpdateLogProcessCmd.CollectDetail> collectList) {
+        if (CollectionUtils.isEmpty(collectList)) {
+            return collectList;
+        }
+
+        // Use LinkedHashSet to keep order and remove duplicates
+        Set<UpdateLogProcessCmd.CollectDetail> uniqueSet = new 
LinkedHashSet<>();
+        for (UpdateLogProcessCmd.CollectDetail detail : collectList) {
+            boolean exists = false;
+            for (UpdateLogProcessCmd.CollectDetail existingDetail : uniqueSet) 
{
+                if (isCollectDetailEqual(existingDetail, detail)) {
+                    exists = true;
+                    // Update with newer data
+                    if (isNewerThan(detail, existingDetail)) {
+                        uniqueSet.remove(existingDetail);
+                        uniqueSet.add(detail);
+                    }
+                    break;
+                }
+            }
+            if (!exists) {
+                // Limit the number of fileProgressDetails in each 
CollectDetail
+                if (detail.getFileProgressDetails() != null &&
+                        detail.getFileProgressDetails().size() > 
MAX_COLLECT_DETAIL_PER_IP) {
+                    // Only keep the latest part of the records
+                    List<UpdateLogProcessCmd.FileProgressDetail> 
limitedDetails =
+                            detail.getFileProgressDetails().stream()
+                                    .sorted((a, b) -> {
+                                        Long timeA = a.getCollectTime();
+                                        Long timeB = b.getCollectTime();
+                                        if (timeA == null && timeB == null) 
return 0;
+                                        if (timeA == null) return -1;
+                                        if (timeB == null) return 1;
+                                        return timeB.compareTo(timeA); // ι™εΊζŽ’εˆ—
+                                    })
+                                    .limit(MAX_COLLECT_DETAIL_PER_IP)
+                                    .collect(Collectors.toList());
+                    detail.setFileProgressDetails(limitedDetails);
+                }
+                uniqueSet.add(detail);
+            }
+        }
+
+        // If the deduplicated data is still too much, only keep the latest 
part
+        List<UpdateLogProcessCmd.CollectDetail> result = new 
ArrayList<>(uniqueSet);
+        if (result.size() > MAX_COLLECT_DETAIL_PER_IP) {
+            result = result.stream()
+                    .sorted((a, b) -> {
+                        // Sort by time, take the latest
+                        List<UpdateLogProcessCmd.FileProgressDetail> detailsA 
= a.getFileProgressDetails();
+                        List<UpdateLogProcessCmd.FileProgressDetail> detailsB 
= b.getFileProgressDetails();
+
+                        Long latestTimeA = getLatestCollectTime(detailsA);
+                        Long latestTimeB = getLatestCollectTime(detailsB);
+
+                        if (latestTimeA == null && latestTimeB == null) {
+                            return 0;
+                        }
+                        if (latestTimeA == null) {
+                            return -1;
+                        }
+                        if (latestTimeB == null) {
+                            return 1;
+                        }
+                        return latestTimeB.compareTo(latestTimeA);
+                    })
+                    .limit(MAX_COLLECT_DETAIL_PER_IP)
+                    .collect(Collectors.toList());
+        }
+
+        return result;
+    }
+
+    /**
+     * Determine if two CollectDetails are equal (based on key fields)
+     */
+    private boolean isCollectDetailEqual(UpdateLogProcessCmd.CollectDetail 
detail1, UpdateLogProcessCmd.CollectDetail detail2) {
+        if (detail1 == detail2) return true;
+        if (detail1 == null || detail2 == null) return false;
+
+        // Compare key fields
+        return Objects.equals(detail1.getTailId(), detail2.getTailId()) &&
+                Objects.equals(detail1.getTailName(), detail2.getTailName()) &&
+                Objects.equals(detail1.getPath(), detail2.getPath()) &&
+                Objects.equals(detail1.getAppId(), detail2.getAppId());
+    }
+
+    /**
+     * Determine if detail1 is newer than detail2
+     */
+    private boolean isNewerThan(UpdateLogProcessCmd.CollectDetail detail1, 
UpdateLogProcessCmd.CollectDetail detail2) {
+        Long latestTime1 = 
getLatestCollectTime(detail1.getFileProgressDetails());
+        Long latestTime2 = 
getLatestCollectTime(detail2.getFileProgressDetails());
+
+        if (latestTime1 == null && latestTime2 == null) return false;
+        if (latestTime1 == null) return false;
+        if (latestTime2 == null) return true;
+
+        return latestTime1 > latestTime2;
+    }
+
+    /**
+     * Get the latest collect time from file progress details
+     */
+    private Long 
getLatestCollectTime(List<UpdateLogProcessCmd.FileProgressDetail> details) {
+        if (CollectionUtils.isEmpty(details)) {
+            return null;
+        }
+        return details.stream()
+                .map(UpdateLogProcessCmd.FileProgressDetail::getCollectTime)
+                .filter(Objects::nonNull)
+                .max(Long::compareTo)
+                .orElse(null);
+    }
+
+    /**
+     * Remove oldest entries to control map size
+     */
+    private void removeOldestEntries() {
+        // Get all keys and remove a part of them according to some strategy
+        List<String> allKeys = new ArrayList<>(tailProgressMap.keySet());
+        if (allKeys.size() <= MAX_IP_COUNT / 2) {
+            return; // If the number doesn't exceed half the limit, no removal 
is needed
+        }
+
+        // Simple strategy: remove the latter half of the entries
+        allKeys.sort(String::compareTo);
+
+        int removeCount = allKeys.size() / 4;
+
+        for (int i = 0; i < removeCount && i < allKeys.size(); i++) {
+            tailProgressMap.remove(allKeys.get(i));
+        }
+
+        log.info("Removed {} old entries from tailProgressMap, current size: 
{}", removeCount, tailProgressMap.size());
+    }
+
+    /**
+     * Clean up expired data
+     */
+    private void cleanupExpiredData() {
+        try {
+            int removedCount = 0;
+            Iterator<Map.Entry<String, 
List<UpdateLogProcessCmd.CollectDetail>>> iterator = 
tailProgressMap.entrySet().iterator();
+
+            while (iterator.hasNext()) {
+                Map.Entry<String, List<UpdateLogProcessCmd.CollectDetail>> 
entry = iterator.next();
+                List<UpdateLogProcessCmd.CollectDetail> collectDetails = 
entry.getValue();
+
+                List<UpdateLogProcessCmd.CollectDetail> filteredDetails = new 
ArrayList<>();
+                for (UpdateLogProcessCmd.CollectDetail detail : 
collectDetails) {
+                    List<UpdateLogProcessCmd.FileProgressDetail> 
fileProgressDetails = detail.getFileProgressDetails();
+                    if (CollectionUtils.isNotEmpty(fileProgressDetails)) {
+                        // Filter out data that has not been updated for a 
specified period of time
+                        List<UpdateLogProcessCmd.FileProgressDetail> 
recentDetails = fileProgressDetails.stream()
+                                .filter(fileDetail -> {
+                                    Long collectTime = 
fileDetail.getCollectTime();
+                                    if (collectTime == null) {
+                                        return false;
+                                    }
+                                    return Instant.now().toEpochMilli() - 
collectTime < TimeUnit.MINUTES.toMillis(MAX_INTERRUPT_TIME);
+                                })
+                                .collect(Collectors.toList());
+
+                        if (!recentDetails.isEmpty()) {
+                            UpdateLogProcessCmd.CollectDetail newDetail = new 
UpdateLogProcessCmd.CollectDetail();
+                            newDetail.setTailId(detail.getTailId());
+                            newDetail.setTailName(detail.getTailName());
+                            newDetail.setIpList(detail.getIpList());
+                            newDetail.setPath(detail.getPath());
+                            newDetail.setAppId(detail.getAppId());
+                            newDetail.setAppName(detail.getAppName());
+                            newDetail.setFileProgressDetails(recentDetails);
+                            filteredDetails.add(newDetail);
+                        }
+                    } else {
+                        filteredDetails.add(detail);
+                    }
+                }
+
+                if (filteredDetails.isEmpty()) {
+                    // If there is no valid data after filtering, remove the 
entire entry
+                    iterator.remove();
+                    removedCount++;
+                } else {
+                    entry.setValue(filteredDetails);
+                }
+            }
+
+            if (removedCount > 0) {
+                log.info("Cleaned up {} expired entries from tailProgressMap", 
removedCount);
+            }
+
+            if (tailProgressMap.size() > MAX_IP_COUNT) {
+                removeOldestEntries();
+            }
+        } catch (Exception e) {
+            log.error("Error during cleanupExpiredData", e);
+        }
     }
 
     @Override
@@ -84,30 +321,73 @@ public class DefaultLogProcessCollector implements 
LogProcessCollector {
         if (null == tailId || StringUtils.isBlank(tailName)) {
             return new ArrayList<>();
         }
-        List<TailLogProcessDTO> dtoList = tailProgressMap.values().stream()
-                .flatMap(Collection::stream)
-                .filter(collectDetail -> Objects.equals(tailId.toString(), 
collectDetail.getTailId()))
-                .flatMap(collectDetail -> 
collectDetail.getFileProgressDetails().stream())
-                .map(fileProgressDetail -> TailLogProcessDTO.builder()
-                        .tailName(tailName)
-                        .collectTime(fileProgressDetail.getCollectTime())
-                        
.collectPercentage(fileProgressDetail.getCollectPercentage())
-                        .ip(fileProgressDetail.getConfigIp())
-                        .path(fileProgressDetail.getPattern())
-                        
.fileRowNumber(fileProgressDetail.getFileRowNumber()).build())
-                .filter(processDTO -> 
StringUtils.isNotBlank(processDTO.getIp()))
-                .collect(Collectors.toList());
-        if (StringUtils.isNotBlank(targetIp)) {
-            dtoList = dtoList.stream().filter(processDTO -> 
Objects.equals(targetIp, processDTO.getIp())).collect(Collectors.toList());
-        }
-        List<TailLogProcessDTO> perOneIpProgressList = Lists.newArrayList();
+
+        List<TailLogProcessDTO> dtoList = new ArrayList<>();
         try {
+            for (List<UpdateLogProcessCmd.CollectDetail> collectDetails : 
tailProgressMap.values()) {
+                if (CollectionUtils.isEmpty(collectDetails)) {
+                    continue;
+                }
+
+                List<UpdateLogProcessCmd.CollectDetail> limitedDetails = 
collectDetails.stream()
+                        .filter(collectDetail -> 
Objects.equals(tailId.toString(), collectDetail.getTailId()))
+                        .limit(MAX_COLLECT_DETAIL_PER_IP)
+                        .collect(Collectors.toList());
+
+                for (UpdateLogProcessCmd.CollectDetail collectDetail : 
limitedDetails) {
+                    if 
(CollectionUtils.isNotEmpty(collectDetail.getFileProgressDetails())) {
+                        // Limit the number of file progress details again
+                        List<UpdateLogProcessCmd.FileProgressDetail> 
fileProgressDetails =
+                                collectDetail.getFileProgressDetails().stream()
+                                        .limit(MAX_COLLECT_DETAIL_PER_IP)
+                                        .collect(Collectors.toList());
+
+                        for (UpdateLogProcessCmd.FileProgressDetail 
fileProgressDetail : fileProgressDetails) {
+                            TailLogProcessDTO dto = TailLogProcessDTO.builder()
+                                    .tailName(tailName)
+                                    
.collectTime(fileProgressDetail.getCollectTime())
+                                    
.collectPercentage(fileProgressDetail.getCollectPercentage())
+                                    .ip(fileProgressDetail.getConfigIp())
+                                    .path(fileProgressDetail.getPattern())
+                                    
.fileRowNumber(fileProgressDetail.getFileRowNumber()).build();
+
+                            if (StringUtils.isNotBlank(dto.getIp())) {
+                                dtoList.add(dto);
+
+                                // Set a reasonable upper limit to prevent 
returning too much data
+                                if (dtoList.size() >= 
MAX_COLLECT_DETAIL_PER_IP) {
+                                    log.warn("getTailLogProcess reached size 
limit: {}, stopping processing", MAX_COLLECT_DETAIL_PER_IP);
+                                    break;
+                                }
+                            }
+                        }
+                    }
+
+                    if (dtoList.size() >= MAX_COLLECT_DETAIL_PER_IP) {
+                        break;
+                    }
+                }
+
+                if (dtoList.size() >= MAX_COLLECT_DETAIL_PER_IP) {
+                    break;
+                }
+            }
+
+            if (StringUtils.isNotBlank(targetIp)) {
+                dtoList = dtoList.stream()
+                        .filter(processDTO -> Objects.equals(targetIp, 
processDTO.getIp()))
+                        .collect(Collectors.toList());
+            }
+
+            List<TailLogProcessDTO> perOneIpProgressList = 
Lists.newArrayList();
             perOneIpProgressList = getTailLogProcessDTOS(dtoList, 
perOneIpProgressList);
             perOneIpProgressList = filterExpireTimePath(perOneIpProgressList);
+
+            return perOneIpProgressList;
         } catch (Exception e) {
-            log.error("getTailLogProcess error,dtoList:{}", 
GSON.toJson(dtoList), e);
+            log.error("getTailLogProcess error", e);
+            return new ArrayList<>();
         }
-        return perOneIpProgressList;
     }
 
     @Override
@@ -116,32 +396,60 @@ public class DefaultLogProcessCollector implements 
LogProcessCollector {
         if (StringUtils.isEmpty(ip) || tailProgressMap.isEmpty()) {
             return dtoList;
         }
-        List<UpdateLogProcessCmd.CollectDetail> collect = 
tailProgressMap.values().stream()
-                .flatMap(Collection::stream)
-                .collect(Collectors.toList());
-        collect.stream().forEach(collectDetail -> {
-            try {
-                String appName = collectDetail.getAppName();
-                if 
(CollectionUtils.isNotEmpty(collectDetail.getFileProgressDetails())) {
-                    
dtoList.addAll(collectDetail.getFileProgressDetails().stream()
-                            .filter(processDTO -> 
StringUtils.isNotBlank(processDTO.getConfigIp()))
-                            .filter(processDTO -> Objects.equals(ip, 
processDTO.getConfigIp()))
-                            .map(fileProgressDetail -> {
-                                AgentLogProcessDTO agentLogProcessDTO = new 
AgentLogProcessDTO();
-                                
agentLogProcessDTO.setPath(fileProgressDetail.getPattern());
-                                
agentLogProcessDTO.setFileRowNumber(fileProgressDetail.getFileRowNumber());
-                                
agentLogProcessDTO.setPointer(fileProgressDetail.getPointer());
-                                
agentLogProcessDTO.setFileMaxPointer(fileProgressDetail.getFileMaxPointer());
-                                agentLogProcessDTO.setAppName(appName);
-                                
agentLogProcessDTO.setCollectPercentage(fileProgressDetail.getCollectPercentage());
-                                
agentLogProcessDTO.setCollectTime(fileProgressDetail.getCollectTime());
-                                return agentLogProcessDTO;
-                            }).collect(Collectors.toList()));
+
+        try {
+            int processedCount = 0;
+            for (List<UpdateLogProcessCmd.CollectDetail> collectDetails : 
tailProgressMap.values()) {
+                if (CollectionUtils.isEmpty(collectDetails)) {
+                    continue;
+                }
+
+                for (UpdateLogProcessCmd.CollectDetail collectDetail : 
collectDetails) {
+                    try {
+                        String appName = collectDetail.getAppName();
+                        if 
(CollectionUtils.isNotEmpty(collectDetail.getFileProgressDetails())) {
+                            List<AgentLogProcessDTO> partialResults = 
collectDetail.getFileProgressDetails().stream()
+                                    .filter(processDTO -> 
StringUtils.isNotBlank(processDTO.getConfigIp()))
+                                    .filter(processDTO -> Objects.equals(ip, 
processDTO.getConfigIp()))
+                                    .limit(MAX_COLLECT_DETAIL_PER_IP)
+                                    .map(fileProgressDetail -> {
+                                        AgentLogProcessDTO agentLogProcessDTO 
= new AgentLogProcessDTO();
+                                        
agentLogProcessDTO.setPath(fileProgressDetail.getPattern());
+                                        
agentLogProcessDTO.setFileRowNumber(fileProgressDetail.getFileRowNumber());
+                                        
agentLogProcessDTO.setPointer(fileProgressDetail.getPointer());
+                                        
agentLogProcessDTO.setFileMaxPointer(fileProgressDetail.getFileMaxPointer());
+                                        agentLogProcessDTO.setAppName(appName);
+                                        
agentLogProcessDTO.setCollectPercentage(fileProgressDetail.getCollectPercentage());
+                                        
agentLogProcessDTO.setCollectTime(fileProgressDetail.getCollectTime());
+                                        return agentLogProcessDTO;
+                                    })
+                                    .collect(Collectors.toList());
+
+                            dtoList.addAll(partialResults);
+
+                            processedCount += partialResults.size();
+                            if (processedCount >= MAX_COLLECT_DETAIL_PER_IP) {
+                                log.warn("getAgentLogProcess reached size 
limit: {}, stopping processing", MAX_COLLECT_DETAIL_PER_IP);
+                                break;
+                            }
+                        }
+                    } catch (Exception e) {
+                        log.error("getAgentLogProcess 
error,ip:{},CollectDetail:{}", ip, GSON.toJson(collectDetail), e);
+                    }
+
+                    if (processedCount >= MAX_COLLECT_DETAIL_PER_IP) {
+                        break;
+                    }
+                }
+
+                if (processedCount >= MAX_COLLECT_DETAIL_PER_IP) {
+                    break;
                 }
-            } catch (Exception e) {
-                log.error("getAgentLogProcess error,ip:{},CollectDetail:{}", 
ip, GSON.toJson(collectDetail), e);
             }
-        });
+        } catch (Exception e) {
+            log.error("getAgentLogProcess error, ip: {}", ip, e);
+        }
+
         return dtoList;
     }
 
@@ -151,21 +459,41 @@ public class DefaultLogProcessCollector implements 
LogProcessCollector {
         if (null == progressRation || tailProgressMap.isEmpty()) {
             return resultList;
         }
-        resultList = 
tailProgressMap.values().stream().flatMap(Collection::stream)
-                .map(collectDetail -> {
+
+        try {
+            int count = 0;
+            for (List<UpdateLogProcessCmd.CollectDetail> collectDetails : 
tailProgressMap.values()) {
+                for (UpdateLogProcessCmd.CollectDetail collectDetail : 
collectDetails) {
                     List<UpdateLogProcessCmd.FileProgressDetail> 
fileProgressDetails = collectDetail.getFileProgressDetails();
                     if (CollectionUtils.isNotEmpty(fileProgressDetails)) {
                         List<UpdateLogProcessCmd.FileProgressDetail> 
progressDetails = fileProgressDetails.stream()
                                 .filter(fileProgressDetail -> 
lessThenRation(fileProgressDetail.getCollectPercentage(), progressRation))
                                 .filter(tailLogProcessDTO -> null != 
tailLogProcessDTO.getCollectTime() &&
                                         Instant.now().toEpochMilli() - 
tailLogProcessDTO.getCollectTime() < 
TimeUnit.HOURS.toMillis(MAX_STATIC_INTERRUPT_TIME_HOUR))
+                                .limit(MAX_COLLECT_DETAIL_PER_IP)
                                 .collect(Collectors.toList());
                         collectDetail.setFileProgressDetails(progressDetails);
                     }
-                    return collectDetail;
-                })
-                .filter(collectDetail -> 
CollectionUtils.isNotEmpty(collectDetail.getFileProgressDetails()))
-                .collect(Collectors.toList());
+
+                    if 
(CollectionUtils.isNotEmpty(collectDetail.getFileProgressDetails())) {
+                        resultList.add(collectDetail);
+                        count++;
+
+                        if (count >= MAX_COLLECT_DETAIL_PER_IP) {
+                            log.warn("getColProcessImperfect reached size 
limit: {}, stopping processing", MAX_COLLECT_DETAIL_PER_IP);
+                            break;
+                        }
+                    }
+                }
+
+                if (count >= MAX_COLLECT_DETAIL_PER_IP) {
+                    break;
+                }
+            }
+        } catch (Exception e) {
+            log.error("getColProcessImperfect error", e);
+        }
+
         return resultList;
     }
 
@@ -176,12 +504,29 @@ public class DefaultLogProcessCollector implements 
LogProcessCollector {
             return resultList;
         }
         try {
+            int count = 0;
             for (List<UpdateLogProcessCmd.CollectDetail> details : 
tailProgressMap.values()) {
                 for (UpdateLogProcessCmd.CollectDetail detail : details) {
                     if (String.valueOf(tailId).equals(detail.getTailId())) {
-                        resultList.addAll(detail.getFileProgressDetails());
+                        List<UpdateLogProcessCmd.FileProgressDetail> 
fileProgressDetails = detail.getFileProgressDetails();
+                        if (CollectionUtils.isNotEmpty(fileProgressDetails)) {
+                            List<UpdateLogProcessCmd.FileProgressDetail> 
limitedDetails = fileProgressDetails.stream()
+                                    .limit(MAX_COLLECT_DETAIL_PER_IP - count)
+                                    .collect(Collectors.toList());
+                            resultList.addAll(limitedDetails);
+                            count += limitedDetails.size();
+
+                            if (count >= MAX_COLLECT_DETAIL_PER_IP) {
+                                log.warn("getFileProcessDetailByTail reached 
size limit: {}, stopping processing", MAX_COLLECT_DETAIL_PER_IP);
+                                break;
+                            }
+                        }
                     }
                 }
+
+                if (count >= MAX_COLLECT_DETAIL_PER_IP) {
+                    break;
+                }
             }
         } catch (Throwable t) {
             log.error("getFileProcessDetailByTail error : ", t);
diff --git 
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
 
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
index dfd80ae1..4bc9946f 100644
--- 
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
+++ 
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
@@ -35,9 +35,7 @@ import org.apache.ozhera.log.utils.NetUtil;
 
 import javax.annotation.Resource;
 import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.ozhera.log.common.Constant.GSON;
@@ -58,37 +56,59 @@ public class DefaultPublishConfigService implements 
PublishConfigService {
     private RpcServer rpcServer;
 
     private static final String CONFIG_COMPRESS_KEY = 
"CONFIG_COMPRESS_ENABLED";
+    private static final String CONFIG_COMPRESS_MACHINE_KEY = 
"CONFIG_COMPRESS_MACHINE";
 
     private volatile boolean configCompressValue = false;
 
+    private volatile String configCompressMachine;
+
+    private final Random random = new Random();
 
     private static final ExecutorService SEND_CONFIG_EXECUTOR;
 
     static {
-        SEND_CONFIG_EXECUTOR = Executors.newThreadPerTaskExecutor(
-                Thread.ofVirtual()
-                        .name("send-config-vt-", 0)
-                        .uncaughtExceptionHandler((t, e) ->
-                                log.error("send config uncaught exception", e))
-                        .factory()
+        int corePoolSize = Math.max(2, 
Runtime.getRuntime().availableProcessors() / 2);
+        int maximumPoolSize = Runtime.getRuntime().availableProcessors();
+        int queueCapacity = 2000;
+
+        SEND_CONFIG_EXECUTOR = new ThreadPoolExecutor(
+                corePoolSize,
+                maximumPoolSize,
+                60L, TimeUnit.SECONDS,
+                new ArrayBlockingQueue<>(queueCapacity),
+                r -> {
+                    Thread t = new Thread(r, "send-config-pool-" + 
COUNT_INCR.getAndIncrement());
+                    t.setDaemon(true);
+                    t.setUncaughtExceptionHandler((thread, e) ->
+                            log.error("send config uncaught exception in 
thread: {}", thread.getName(), e));
+                    return t;
+                },
+                (r, executor) -> log.warn("send config task rejected due to 
full queue, task will be dropped")
         );
     }
 
     public void init() {
-        String raw = System.getenv(CONFIG_COMPRESS_KEY);
-        if (StringUtils.isBlank(raw)) {
-            raw = System.getProperty(CONFIG_COMPRESS_KEY);
-        }
-        if (StringUtils.isNotBlank(raw)) {
+        String compressRaw = getConfig(CONFIG_COMPRESS_KEY);
+        configCompressMachine = getConfig(CONFIG_COMPRESS_MACHINE_KEY);
+        log.info("init configCompressValue {},configCompressMachine {}", 
configCompressValue, configCompressMachine);
+        if (StringUtils.isNotBlank(compressRaw)) {
             try {
-                configCompressValue = Boolean.parseBoolean(raw);
-                log.info("configCompressValue {}", configCompressValue);
+                configCompressValue = Boolean.parseBoolean(compressRaw);
+                log.info("configCompressValue {},configCompressMachine{}", 
configCompressValue, configCompressMachine);
             } catch (Exception e) {
-                log.error("parse {} error,use default value:{},config 
value:{}", CONFIG_COMPRESS_KEY, configCompressValue, raw);
+                log.error("parse {} error,use default value:{},config 
value:{}", CONFIG_COMPRESS_KEY, configCompressValue, compressRaw);
             }
         }
     }
 
+    private String getConfig(String key) {
+        String raw = System.getenv(key);
+        if (StringUtils.isBlank(raw)) {
+            raw = System.getProperty(key);
+        }
+        return raw;
+    }
+
 
     /**
      * dubbo interface, the timeout period cannot be too long
@@ -101,13 +121,27 @@ public class DefaultPublishConfigService implements 
PublishConfigService {
         if (StringUtils.isBlank(agentIp) || meta == null) {
             return;
         }
-        doSendConfig(agentIp, meta);
+//        doSendConfigSync(agentIp, meta);
+        doSendConfigAsync(agentIp, meta);
 //        SEND_CONFIG_EXECUTOR.execute(() -> );
     }
 
-    private void doSendConfig(String agentIp, LogCollectMeta meta) {
+    @Override
+    public void sengConfigToAgentSync(String agentIp, LogCollectMeta 
logCollectMeta) {
+        if (StringUtils.isBlank(agentIp) || logCollectMeta == null) {
+            return;
+        }
+        doSendConfigSync(agentIp, logCollectMeta);
+    }
+
+    /**
+     * Send configuration to agent synchronously
+     * @param agentIp
+     * @param meta
+     */
+    private void doSendConfigSync(String agentIp, LogCollectMeta meta) {
         int count = 1;
-        while (count < 3) {
+        while (count < 2) {
             Map<String, AgentChannel> logAgentMap = getAgentChannelMap();
             String agentCurrentIp = getCorrectDockerAgentIP(agentIp, 
logAgentMap);
             if (logAgentMap.containsKey(agentCurrentIp)) {
@@ -116,8 +150,11 @@ public class DefaultPublishConfigService implements 
PublishConfigService {
                     RemotingCommand req = 
RemotingCommand.createRequestCommand(LogCmd.LOG_REQ);
                     req.setBody(sendStr.getBytes());
 
-                    if (configCompressValue) {
+                    if (configCompressValue || 
(StringUtils.isNotBlank(configCompressMachine) &&
+                            StringUtils.isNotBlank(meta.getAgentMachine()) &&
+                            
configCompressMachine.contains(meta.getAgentMachine()))) {
                         req.enableCompression();
+                        log.info("The configuration is compressed,agent 
ip:{},Configuration information:{}", agentCurrentIp, sendStr);
                     }
 
                     log.info("Send the configuration,agent ip:{},Configuration 
information:{}", agentCurrentIp, sendStr);
@@ -133,24 +170,150 @@ public class DefaultPublishConfigService implements 
PublishConfigService {
             } else {
                 log.info("The current agent IP is not 
connected,ip:{},configuration data:{}", agentIp, GSON.toJson(meta));
             }
-            //Retry policy - Retry 4 times, sleep 200 ms each time
+            //Retry policy - Retry 2 times, sleep 100 ms each time
             try {
-                TimeUnit.MILLISECONDS.sleep(200L);
+                TimeUnit.MILLISECONDS.sleep(100L);
             } catch (final InterruptedException ignored) {
             }
             count++;
         }
     }
 
+    /**
+     * Send configuration to agent asynchronously
+     *
+     * @param agentIp agent IP address
+     * @param meta    log collection metadata
+     */
+    public void doSendConfigAsync(String agentIp, LogCollectMeta meta) {
+        if (StringUtils.isBlank(agentIp) || meta == null) {
+            return;
+        }
+
+        sendWithRetry(agentIp, meta, 1)
+                .exceptionally(ex -> {
+                    log.error("send config failed after retry, agentIp:{}", 
agentIp, ex);
+                    return false;
+                });
+        try {
+            Thread.sleep(random.nextInt(800, 1000));
+        } catch (InterruptedException e) {
+            log.error("sleep interrupted, agentIp:{}", agentIp, e);
+        }
+    }
+
+    private CompletableFuture<Boolean> sendWithRetry(String agentIp,
+                                                     LogCollectMeta meta,
+                                                     int attempt) {
+        return trySendOnce(agentIp, meta)
+                .thenCompose(success -> {
+
+                    if (success) {
+                        return CompletableFuture.completedFuture(true);
+                    }
+
+                    if (attempt > 2) {
+                        return CompletableFuture.completedFuture(false);
+                    }
+
+                    long delay = (long) Math.pow(2, attempt) * 200L;
+                    log.warn("send config retry attempt:{}, delay:{}ms, 
agentIp:{}",
+                            attempt, delay, agentIp);
+
+                    return CompletableFuture
+                            .runAsync(() -> {
+                            }, CompletableFuture.delayedExecutor(
+                                    delay,
+                                    TimeUnit.MILLISECONDS,
+                                    SEND_CONFIG_EXECUTOR))
+                            .thenCompose(v ->
+                                    sendWithRetry(agentIp, meta, attempt + 1));
+                });
+    }
+
+    private CompletableFuture<Boolean> trySendOnce(String agentIp,
+                                                   LogCollectMeta meta) {
+
+        Map<String, AgentChannel> logAgentMap = getAgentChannelMap();
+        String agentCurrentIp = getCorrectDockerAgentIP(agentIp, logAgentMap);
+
+        if (!logAgentMap.containsKey(agentCurrentIp)) {
+            log.warn("agent not connected, ip:{}", agentIp);
+            return CompletableFuture.completedFuture(false);
+        }
+
+        if (CollectionUtils.isEmpty(meta.getAppLogMetaList())) {
+            return CompletableFuture.completedFuture(false);
+        }
+
+        String sendStr = GSON.toJson(meta);
+
+        RemotingCommand req = 
RemotingCommand.createRequestCommand(LogCmd.LOG_REQ);
+        req.setBody(sendStr.getBytes());
+
+        if (configCompressValue || 
(StringUtils.isNotBlank(configCompressMachine) &&
+                StringUtils.isNotBlank(meta.getAgentMachine()) &&
+                configCompressMachine.contains(meta.getAgentMachine()))) {
+            req.enableCompression();
+            log.info("The configuration is compressed,agent 
ip:{},Configuration information:{}", agentCurrentIp, sendStr);
+        }
+
+        Stopwatch started = Stopwatch.createStarted();
+
+        log.info("Send the configuration asynchronously,agent 
ip:{},Configuration information:{}", agentCurrentIp, sendStr);
+
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+
+        try {
+
+            rpcServer.send(
+                    logAgentMap.get(agentCurrentIp).getChannel(),
+                    req,
+                    10000,
+                    response -> {
+
+                        started.stop();
+
+                        if (response == null
+                                || response.getResponseCommand() == null) {
+
+                            future.complete(false);
+                            return;
+                        }
+
+                        String resp = new String(
+                                response.getResponseCommand().getBody());
+
+                        log.info("async send result:{}, cost:{}ms, ip:{}",
+                                resp,
+                                started.elapsed().toMillis(),
+                                agentCurrentIp);
+
+                        future.complete(Objects.equals(resp, "ok"));
+                    });
+
+        } catch (Exception e) {
+
+            log.error("send exception, agentIp:{}", agentIp, e);
+            future.complete(false);
+        }
+
+        return future;
+    }
+
     @Override
     public List<String> getAllAgentList() {
         List<String> remoteAddress = Lists.newArrayList();
-        List<String> ipAddress = Lists.newArrayList();
+//        List<String> ipAddress = Lists.newArrayList();
+        List<String> finalRemoteAddress = remoteAddress;
         AgentContext.ins().map.forEach((key, value) -> {
-            remoteAddress.add(key);
-            ipAddress.add(StringUtils.substringBefore(key, SYMBOL_COLON));
+            finalRemoteAddress.add(key);
+//            ipAddress.add(StringUtils.substringBefore(key, SYMBOL_COLON));
         });
-        if (COUNT_INCR.getAndIncrement() % 200 == 0) {
+        if (remoteAddress.size() > 1000) {
+            remoteAddress = remoteAddress.subList(0, 600);
+        }
+        if (COUNT_INCR.getAndIncrement() % 1000 == 0) {
             log.info("The set of remote addresses of the connected agent 
machine is:{}", GSON.toJson(remoteAddress));
         }
         return remoteAddress;
diff --git 
a/ozhera-log/log-agent-server/src/test/java/org/apache/ozhera/log/server/DefaultLogProcessCollectorTest.java
 
b/ozhera-log/log-agent-server/src/test/java/org/apache/ozhera/log/server/DefaultLogProcessCollectorTest.java
index 034fd7d5..16f4eb42 100644
--- 
a/ozhera-log/log-agent-server/src/test/java/org/apache/ozhera/log/server/DefaultLogProcessCollectorTest.java
+++ 
b/ozhera-log/log-agent-server/src/test/java/org/apache/ozhera/log/server/DefaultLogProcessCollectorTest.java
@@ -44,7 +44,7 @@ public class DefaultLogProcessCollectorTest {
 
     @Before
     public void buildBean() {
-        Ioc.ins().init("com.xiaomi");
+        Ioc.ins().init("com.xiaomi", "org.apache.ozhera.log");
         processCollector = Ioc.ins().getBean(DefaultLogProcessCollector.class);
         gson = new GsonBuilder().create();
     }
@@ -59,8 +59,10 @@ public class DefaultLogProcessCollectorTest {
         progressDetail.setPattern("/home/work/log/test/server.log");
         progressDetail.setCollectPercentage("98%");
         fileProgressDetails.add(progressDetail);
+        fileProgressDetails.add(progressDetail);
         collectDetail.setFileProgressDetails(fileProgressDetails);
         collectList.add(collectDetail);
+        collectList.add(collectDetail);
         updateLogProcessCmd.setCollectList(collectList);
         updateLogProcessCmd.setIp("127.0.0.1");
         processCollector.collectLogProcess(updateLogProcessCmd);
diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml
index d0362c05..ac0da4e8 100644
--- a/ozhera-log/log-agent/pom.xml
+++ b/ozhera-log/log-agent/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>log-agent</artifactId>
-    <version>2.2.16-SNAPSHOT</version>
+    <version>2.2.17-SNAPSHOT</version>
 
     <properties>
         <maven.compiler.source>21</maven.compiler.source>
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 6ea04c38..8d9cee84 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -530,6 +530,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
             Future<?> future = getExecutorServiceByType(logTypeEnum).submit(() 
-> {
                 try {
                     log.info("filePath:{},is VirtualThread {}, 
thread:{},id:{}", filePath, Thread.currentThread().isVirtual(), 
Thread.currentThread(), Thread.currentThread().threadId());
+                    logFile.setStop(false);
                     logFile.readLine();
                     // Reset exception count on successful read
                     fileExceptionCountMap.remove(filePath);
diff --git a/ozhera-log/log-api/pom.xml b/ozhera-log/log-api/pom.xml
index d69b7cf2..bd3b1de2 100644
--- a/ozhera-log/log-api/pom.xml
+++ b/ozhera-log/log-api/pom.xml
@@ -29,6 +29,7 @@ http://www.apache.org/licenses/LICENSE-2.0
 
     <modelVersion>4.0.0</modelVersion>
     <artifactId>log-api</artifactId>
+    <version>2.2.7-SNAPSHOT</version>
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
diff --git 
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/service/PublishConfigService.java
 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/service/PublishConfigService.java
index 8095a68a..bdafa568 100644
--- 
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/service/PublishConfigService.java
+++ 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/service/PublishConfigService.java
@@ -32,5 +32,7 @@ public interface PublishConfigService {
 
     void sengConfigToAgent(final String agentIp, LogCollectMeta 
logCollectMeta);
 
+    void sengConfigToAgentSync(final String agentIp, LogCollectMeta 
logCollectMeta);
+
     List<String> getAllAgentList();
 }
diff --git a/ozhera-log/log-manager/pom.xml b/ozhera-log/log-manager/pom.xml
index 395744dd..a2bdf3c1 100644
--- a/ozhera-log/log-manager/pom.xml
+++ b/ozhera-log/log-manager/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>log-manager</artifactId>
-    <version>2.2.8-SNAPSHOT</version>
+    <version>2.2.9-SNAPSHOT</version>
 
     <properties>
         <maven.compiler.source>21</maven.compiler.source>
@@ -305,11 +305,13 @@ http://www.apache.org/licenses/LICENSE-2.0
         <dependency>
             <groupId>redis.clients</groupId>
             <artifactId>jedis</artifactId>
+            <version>3.3.0</version>
         </dependency>
 
         <dependency>
             <groupId>com.knuddels</groupId>
             <artifactId>jtokkit</artifactId>
+            <version>1.1.0</version>
         </dependency>
     </dependencies>
 
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/agent/MilogAgentServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/agent/MilogAgentServiceImpl.java
index a278c817..af8a519e 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/agent/MilogAgentServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/agent/MilogAgentServiceImpl.java
@@ -22,6 +22,12 @@ import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.xiaomi.data.push.context.AgentContext;
 import com.xiaomi.data.push.rpc.netty.AgentChannel;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.common.NamedThreadFactory;
+import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.app.api.response.AppBaseInfo;
 import org.apache.ozhera.log.api.enums.LogTypeEnum;
 import org.apache.ozhera.log.api.enums.MQSourceEnum;
@@ -48,12 +54,6 @@ import 
org.apache.ozhera.log.manager.service.impl.HeraAppServiceImpl;
 import org.apache.ozhera.log.manager.service.impl.LogTailServiceImpl;
 import org.apache.ozhera.log.manager.service.path.LogPathMapping;
 import org.apache.ozhera.log.manager.service.path.LogPathMappingFactory;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.common.NamedThreadFactory;
-import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.jetbrains.annotations.NotNull;
 
 import javax.annotation.Resource;
@@ -178,11 +178,14 @@ public class MilogAgentServiceImpl implements 
MilogAgentService {
         }
         printMangerInfo();
         AppBaseInfo appBaseInfo = heraAppService.queryById(milogAppId);
+        MilogLogTailDo milogLogTailDo = milogLogtailDao.queryById(tailId);
+        MilogLogStoreDO logStore = 
logstoreDao.queryById(milogLogTailDo.getStoreId());
         ips.forEach(ip -> {
             AppLogMeta appLogMeta = assembleSingleConfig(milogAppId, 
queryLogPattern(milogAppId, ip, appBaseInfo.getPlatformType()));
             LogCollectMeta logCollectMeta = new LogCollectMeta();
             logCollectMeta.setAgentIp(ip);
-            logCollectMeta.setAppLogMetaList(Arrays.asList(appLogMeta));
+            logCollectMeta.setAgentMachine(null != logStore && 
StringUtils.isNotEmpty(logStore.getMachineRoom()) ? logStore.getMachineRoom() : 
StringUtils.EMPTY);
+            logCollectMeta.setAppLogMetaList(List.of(appLogMeta));
             AgentDefine agentDefine = new AgentDefine();
             agentDefine.setFilters(new ArrayList<>());
             logCollectMeta.setAgentDefine(agentDefine);
@@ -199,8 +202,8 @@ public class MilogAgentServiceImpl implements 
MilogAgentService {
     }
 
     private void printMangerInfo() {
-        List<String> remoteAddress = publishConfigService.getAllAgentList();
         if (COUNT_INCR.getAndIncrement() % 200 == 0) {
+            List<String> remoteAddress = 
publishConfigService.getAllAgentList();
             log.info("The set of remote addresses for the connected agent 
machine is:{}", gson.toJson(remoteAddress));
         }
     }
@@ -223,7 +226,7 @@ public class MilogAgentServiceImpl implements 
MilogAgentService {
         ips.forEach(ip -> {
             LogCollectMeta logCollectMeta = new LogCollectMeta();
             logCollectMeta.setAgentIp(ip);
-            logCollectMeta.setAgentMachine("");
+            logCollectMeta.setAgentMachine(StringUtils.EMPTY);
             logCollectMeta.setAgentId("");
             logCollectMeta.setAppLogMetaList(Arrays.asList(appLogMeta));
             sengConfigToAgent(ip, logCollectMeta);
@@ -297,7 +300,7 @@ public class MilogAgentServiceImpl implements 
MilogAgentService {
     private LogCollectMeta buildLogCollectMeta(String agentIp) {
         LogCollectMeta logCollectMeta = new LogCollectMeta();
         logCollectMeta.setAgentIp(agentIp);
-        logCollectMeta.setAgentMachine("");
+        logCollectMeta.setAgentMachine(StringUtils.EMPTY);
         logCollectMeta.setAgentId("");
         return logCollectMeta;
     }
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MiLogMetaManageServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MiLogMetaManageServiceImpl.java
index 034d7a74..790788d9 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MiLogMetaManageServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MiLogMetaManageServiceImpl.java
@@ -19,6 +19,9 @@
 package org.apache.ozhera.log.manager.service.impl;
 
 import com.google.common.collect.Lists;
+import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.log.api.model.meta.AppLogMeta;
 import org.apache.ozhera.log.api.model.meta.LogCollectMeta;
 import org.apache.ozhera.log.api.model.meta.LogPattern;
@@ -26,8 +29,6 @@ import org.apache.ozhera.log.manager.dao.MilogAppTopicRelDao;
 import org.apache.ozhera.log.manager.dao.MilogLogTailDao;
 import org.apache.ozhera.log.manager.model.pojo.MilogLogTailDo;
 import org.apache.ozhera.log.manager.service.MiLogMetaManageService;
-import com.xiaomi.youpin.docean.anno.Service;
-import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Resource;
 import java.util.ArrayList;
@@ -55,7 +56,7 @@ public class MiLogMetaManageServiceImpl implements 
MiLogMetaManageService {
         LogCollectMeta meta = new LogCollectMeta();
         meta.setAgentId(agentId);
         meta.setAgentIp(agentIp);
-        meta.setAgentMachine(null);
+        meta.setAgentMachine(StringUtils.EMPTY);
         List<AppLogMeta> metaList = new ArrayList<>();
         for (Map.Entry<Long, List<MilogLogTailDo>> entry : 
miLogTailMap.entrySet()) {
             AppLogMeta appLogMeta = new AppLogMeta();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to