This is an automated email from the ASF dual-hosted git repository.
zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new ddc1cee3 feat: optimize log processing and configuration delivery
logic (#637)
ddc1cee3 is described below
commit ddc1cee3da7316116962d4701705a70f18f47a41
Author: wtt <[email protected]>
AuthorDate: Thu Mar 5 18:39:27 2026 +0800
feat: optimize log processing and configuration delivery logic (#637)
* fix: solve the blocking problem caused by serverless startup
* fix: fix pipeline execution logic error
* feat: optimize log processing and configuration delivery logic
* fix: fix the logic problem when obtaining the list of all agents
* refactor: optimize log collection and configure publishing logic
* fix: correct comment content and adjust code style
* feat: add synchronous sending configuration function and optimize sending
logic
* refactor: optimize agentMachine default value and collection
initialization method
---
ozhera-log/log-agent-server/pom.xml | 3 +-
.../server/service/DefaultLogProcessCollector.java | 447 ++++++++++++++++++---
.../service/DefaultPublishConfigService.java | 217 ++++++++--
.../log/server/DefaultLogProcessCollectorTest.java | 4 +-
ozhera-log/log-agent/pom.xml | 2 +-
.../log/agent/channel/ChannelServiceImpl.java | 1 +
ozhera-log/log-api/pom.xml | 1 +
.../log/api/service/PublishConfigService.java | 2 +
ozhera-log/log-manager/pom.xml | 4 +-
.../extension/agent/MilogAgentServiceImpl.java | 23 +-
.../service/impl/MiLogMetaManageServiceImpl.java | 7 +-
11 files changed, 616 insertions(+), 95 deletions(-)
diff --git a/ozhera-log/log-agent-server/pom.xml
b/ozhera-log/log-agent-server/pom.xml
index 531ac649..691da14c 100644
--- a/ozhera-log/log-agent-server/pom.xml
+++ b/ozhera-log/log-agent-server/pom.xml
@@ -26,7 +26,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<version>2.2.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <version>2.2.8-SNAPSHOT</version>
+ <version>2.2.9-SNAPSHOT</version>
<artifactId>log-agent-server</artifactId>
@@ -82,6 +82,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<dependency>
<groupId>org.apache.ozhera</groupId>
<artifactId>log-api</artifactId>
+ <version>2.2.7-SNAPSHOT</version>
</dependency>
<dependency>
diff --git
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultLogProcessCollector.java
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultLogProcessCollector.java
index 7bc32451..079c6b56 100644
---
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultLogProcessCollector.java
+++
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultLogProcessCollector.java
@@ -42,6 +42,8 @@ import javax.annotation.Resource;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -61,6 +63,12 @@ public class DefaultLogProcessCollector implements
LogProcessCollector {
private final Map<String, List<UpdateLogProcessCmd.CollectDetail>>
tailProgressMap = new ConcurrentHashMap<>(256);
+ // Maximum number of stored IPs to prevent unlimited memory growth
+ private static final int MAX_IP_COUNT = 200000;
+
+ // The maximum number of CollectDetails under each IP to prevent excessive
data for a single IP
+ private static final int MAX_COLLECT_DETAIL_PER_IP = 5000;
+
private static final Integer MAX_INTERRUPT_TIME = 10;
private static final Integer MAX_STATIC_INTERRUPT_TIME_HOUR = 4;
@@ -70,13 +78,242 @@ public class DefaultLogProcessCollector implements
LogProcessCollector {
@Resource
private RpcServer rpcServer;
+ public void init() {
+ // Scheduled cleaning tasks
+ ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "LogProcessCollector-Cleanup-Thread");
+ t.setDaemon(true);
+ return t;
+ });
+
+ // Every 2 minutes, perform cleanup of expired data
+
scheduledExecutorService.scheduleWithFixedDelay(this::cleanupExpiredData, 2, 2,
TimeUnit.MINUTES);
+ log.info("Initialized scheduled cleanup for LogProcessCollector");
+ }
+
+
@Override
public void collectLogProcess(UpdateLogProcessCmd cmd) {
log.debug("[LogProcess.updateLogProcess] cmd:{} ", cmd);
if (cmd == null || StringUtils.isEmpty(cmd.getIp())) {
return;
}
- tailProgressMap.put(cmd.getIp(), cmd.getCollectList());
+
+ // Deduplicate the collected data to avoid duplicate data
+ List<UpdateLogProcessCmd.CollectDetail> deduplicatedCollectList =
deduplicateCollectList(cmd.getCollectList());
+
+ // Control the size of the map to prevent memory overflow
+ if (tailProgressMap.size() >= MAX_IP_COUNT) {
+ log.warn("tailProgressMap size reaches limit {}, removing oldest
entries", MAX_IP_COUNT);
+ // Remove some old entries, keep the latest
+ removeOldestEntries();
+ }
+
+ tailProgressMap.put(cmd.getIp(), deduplicatedCollectList);
+ }
+
+ /**
+ * Deduplicate the collect list
+ */
+ private List<UpdateLogProcessCmd.CollectDetail>
deduplicateCollectList(List<UpdateLogProcessCmd.CollectDetail> collectList) {
+ if (CollectionUtils.isEmpty(collectList)) {
+ return collectList;
+ }
+
+ // Use LinkedHashSet to keep order and remove duplicates
+ Set<UpdateLogProcessCmd.CollectDetail> uniqueSet = new
LinkedHashSet<>();
+ for (UpdateLogProcessCmd.CollectDetail detail : collectList) {
+ boolean exists = false;
+ for (UpdateLogProcessCmd.CollectDetail existingDetail : uniqueSet)
{
+ if (isCollectDetailEqual(existingDetail, detail)) {
+ exists = true;
+ // Update with newer data
+ if (isNewerThan(detail, existingDetail)) {
+ uniqueSet.remove(existingDetail);
+ uniqueSet.add(detail);
+ }
+ break;
+ }
+ }
+ if (!exists) {
+ // Limit the number of fileProgressDetails in each
CollectDetail
+ if (detail.getFileProgressDetails() != null &&
+ detail.getFileProgressDetails().size() >
MAX_COLLECT_DETAIL_PER_IP) {
+ // Only keep the latest part of the records
+ List<UpdateLogProcessCmd.FileProgressDetail>
limitedDetails =
+ detail.getFileProgressDetails().stream()
+ .sorted((a, b) -> {
+ Long timeA = a.getCollectTime();
+ Long timeB = b.getCollectTime();
+ if (timeA == null && timeB == null)
return 0;
+ if (timeA == null) return -1;
+ if (timeB == null) return 1;
+ return timeB.compareTo(timeA); // ιεΊζε
+ })
+ .limit(MAX_COLLECT_DETAIL_PER_IP)
+ .collect(Collectors.toList());
+ detail.setFileProgressDetails(limitedDetails);
+ }
+ uniqueSet.add(detail);
+ }
+ }
+
+ // If the deduplicated data is still too much, only keep the latest
part
+ List<UpdateLogProcessCmd.CollectDetail> result = new
ArrayList<>(uniqueSet);
+ if (result.size() > MAX_COLLECT_DETAIL_PER_IP) {
+ result = result.stream()
+ .sorted((a, b) -> {
+ // Sort by time, take the latest
+ List<UpdateLogProcessCmd.FileProgressDetail> detailsA
= a.getFileProgressDetails();
+ List<UpdateLogProcessCmd.FileProgressDetail> detailsB
= b.getFileProgressDetails();
+
+ Long latestTimeA = getLatestCollectTime(detailsA);
+ Long latestTimeB = getLatestCollectTime(detailsB);
+
+ if (latestTimeA == null && latestTimeB == null) {
+ return 0;
+ }
+ if (latestTimeA == null) {
+ return -1;
+ }
+ if (latestTimeB == null) {
+ return 1;
+ }
+ return latestTimeB.compareTo(latestTimeA);
+ })
+ .limit(MAX_COLLECT_DETAIL_PER_IP)
+ .collect(Collectors.toList());
+ }
+
+ return result;
+ }
+
+ /**
+ * Determine if two CollectDetails are equal (based on key fields)
+ */
+ private boolean isCollectDetailEqual(UpdateLogProcessCmd.CollectDetail
detail1, UpdateLogProcessCmd.CollectDetail detail2) {
+ if (detail1 == detail2) return true;
+ if (detail1 == null || detail2 == null) return false;
+
+ // Compare key fields
+ return Objects.equals(detail1.getTailId(), detail2.getTailId()) &&
+ Objects.equals(detail1.getTailName(), detail2.getTailName()) &&
+ Objects.equals(detail1.getPath(), detail2.getPath()) &&
+ Objects.equals(detail1.getAppId(), detail2.getAppId());
+ }
+
+ /**
+ * Determine if detail1 is newer than detail2
+ */
+ private boolean isNewerThan(UpdateLogProcessCmd.CollectDetail detail1,
UpdateLogProcessCmd.CollectDetail detail2) {
+ Long latestTime1 =
getLatestCollectTime(detail1.getFileProgressDetails());
+ Long latestTime2 =
getLatestCollectTime(detail2.getFileProgressDetails());
+
+ if (latestTime1 == null && latestTime2 == null) return false;
+ if (latestTime1 == null) return false;
+ if (latestTime2 == null) return true;
+
+ return latestTime1 > latestTime2;
+ }
+
+ /**
+ * Get the latest collect time from file progress details
+ */
+ private Long
getLatestCollectTime(List<UpdateLogProcessCmd.FileProgressDetail> details) {
+ if (CollectionUtils.isEmpty(details)) {
+ return null;
+ }
+ return details.stream()
+ .map(UpdateLogProcessCmd.FileProgressDetail::getCollectTime)
+ .filter(Objects::nonNull)
+ .max(Long::compareTo)
+ .orElse(null);
+ }
+
+ /**
+ * Remove oldest entries to control map size
+ */
+ private void removeOldestEntries() {
+ // Get all keys and remove a part of them according to some strategy
+ List<String> allKeys = new ArrayList<>(tailProgressMap.keySet());
+ if (allKeys.size() <= MAX_IP_COUNT / 2) {
+ return; // If the number doesn't exceed half the limit, no removal
is needed
+ }
+
+ // Simple strategy: remove the latter half of the entries
+ allKeys.sort(String::compareTo);
+
+ int removeCount = allKeys.size() / 4;
+
+ for (int i = 0; i < removeCount && i < allKeys.size(); i++) {
+ tailProgressMap.remove(allKeys.get(i));
+ }
+
+ log.info("Removed {} old entries from tailProgressMap, current size:
{}", removeCount, tailProgressMap.size());
+ }
+
+ /**
+ * Clean up expired data
+ */
+ private void cleanupExpiredData() {
+ try {
+ int removedCount = 0;
+ Iterator<Map.Entry<String,
List<UpdateLogProcessCmd.CollectDetail>>> iterator =
tailProgressMap.entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ Map.Entry<String, List<UpdateLogProcessCmd.CollectDetail>>
entry = iterator.next();
+ List<UpdateLogProcessCmd.CollectDetail> collectDetails =
entry.getValue();
+
+ List<UpdateLogProcessCmd.CollectDetail> filteredDetails = new
ArrayList<>();
+ for (UpdateLogProcessCmd.CollectDetail detail :
collectDetails) {
+ List<UpdateLogProcessCmd.FileProgressDetail>
fileProgressDetails = detail.getFileProgressDetails();
+ if (CollectionUtils.isNotEmpty(fileProgressDetails)) {
+ // Filter out data that has not been updated for a
specified period of time
+ List<UpdateLogProcessCmd.FileProgressDetail>
recentDetails = fileProgressDetails.stream()
+ .filter(fileDetail -> {
+ Long collectTime =
fileDetail.getCollectTime();
+ if (collectTime == null) {
+ return false;
+ }
+ return Instant.now().toEpochMilli() -
collectTime < TimeUnit.MINUTES.toMillis(MAX_INTERRUPT_TIME);
+ })
+ .collect(Collectors.toList());
+
+ if (!recentDetails.isEmpty()) {
+ UpdateLogProcessCmd.CollectDetail newDetail = new
UpdateLogProcessCmd.CollectDetail();
+ newDetail.setTailId(detail.getTailId());
+ newDetail.setTailName(detail.getTailName());
+ newDetail.setIpList(detail.getIpList());
+ newDetail.setPath(detail.getPath());
+ newDetail.setAppId(detail.getAppId());
+ newDetail.setAppName(detail.getAppName());
+ newDetail.setFileProgressDetails(recentDetails);
+ filteredDetails.add(newDetail);
+ }
+ } else {
+ filteredDetails.add(detail);
+ }
+ }
+
+ if (filteredDetails.isEmpty()) {
+ // If there is no valid data after filtering, remove the
entire entry
+ iterator.remove();
+ removedCount++;
+ } else {
+ entry.setValue(filteredDetails);
+ }
+ }
+
+ if (removedCount > 0) {
+ log.info("Cleaned up {} expired entries from tailProgressMap",
removedCount);
+ }
+
+ if (tailProgressMap.size() > MAX_IP_COUNT) {
+ removeOldestEntries();
+ }
+ } catch (Exception e) {
+ log.error("Error during cleanupExpiredData", e);
+ }
}
@Override
@@ -84,30 +321,73 @@ public class DefaultLogProcessCollector implements
LogProcessCollector {
if (null == tailId || StringUtils.isBlank(tailName)) {
return new ArrayList<>();
}
- List<TailLogProcessDTO> dtoList = tailProgressMap.values().stream()
- .flatMap(Collection::stream)
- .filter(collectDetail -> Objects.equals(tailId.toString(),
collectDetail.getTailId()))
- .flatMap(collectDetail ->
collectDetail.getFileProgressDetails().stream())
- .map(fileProgressDetail -> TailLogProcessDTO.builder()
- .tailName(tailName)
- .collectTime(fileProgressDetail.getCollectTime())
-
.collectPercentage(fileProgressDetail.getCollectPercentage())
- .ip(fileProgressDetail.getConfigIp())
- .path(fileProgressDetail.getPattern())
-
.fileRowNumber(fileProgressDetail.getFileRowNumber()).build())
- .filter(processDTO ->
StringUtils.isNotBlank(processDTO.getIp()))
- .collect(Collectors.toList());
- if (StringUtils.isNotBlank(targetIp)) {
- dtoList = dtoList.stream().filter(processDTO ->
Objects.equals(targetIp, processDTO.getIp())).collect(Collectors.toList());
- }
- List<TailLogProcessDTO> perOneIpProgressList = Lists.newArrayList();
+
+ List<TailLogProcessDTO> dtoList = new ArrayList<>();
try {
+ for (List<UpdateLogProcessCmd.CollectDetail> collectDetails :
tailProgressMap.values()) {
+ if (CollectionUtils.isEmpty(collectDetails)) {
+ continue;
+ }
+
+ List<UpdateLogProcessCmd.CollectDetail> limitedDetails =
collectDetails.stream()
+ .filter(collectDetail ->
Objects.equals(tailId.toString(), collectDetail.getTailId()))
+ .limit(MAX_COLLECT_DETAIL_PER_IP)
+ .collect(Collectors.toList());
+
+ for (UpdateLogProcessCmd.CollectDetail collectDetail :
limitedDetails) {
+ if
(CollectionUtils.isNotEmpty(collectDetail.getFileProgressDetails())) {
+ // Limit the number of file progress details again
+ List<UpdateLogProcessCmd.FileProgressDetail>
fileProgressDetails =
+ collectDetail.getFileProgressDetails().stream()
+ .limit(MAX_COLLECT_DETAIL_PER_IP)
+ .collect(Collectors.toList());
+
+ for (UpdateLogProcessCmd.FileProgressDetail
fileProgressDetail : fileProgressDetails) {
+ TailLogProcessDTO dto = TailLogProcessDTO.builder()
+ .tailName(tailName)
+
.collectTime(fileProgressDetail.getCollectTime())
+
.collectPercentage(fileProgressDetail.getCollectPercentage())
+ .ip(fileProgressDetail.getConfigIp())
+ .path(fileProgressDetail.getPattern())
+
.fileRowNumber(fileProgressDetail.getFileRowNumber()).build();
+
+ if (StringUtils.isNotBlank(dto.getIp())) {
+ dtoList.add(dto);
+
+ // Set a reasonable upper limit to prevent
returning too much data
+ if (dtoList.size() >=
MAX_COLLECT_DETAIL_PER_IP) {
+ log.warn("getTailLogProcess reached size
limit: {}, stopping processing", MAX_COLLECT_DETAIL_PER_IP);
+ break;
+ }
+ }
+ }
+ }
+
+ if (dtoList.size() >= MAX_COLLECT_DETAIL_PER_IP) {
+ break;
+ }
+ }
+
+ if (dtoList.size() >= MAX_COLLECT_DETAIL_PER_IP) {
+ break;
+ }
+ }
+
+ if (StringUtils.isNotBlank(targetIp)) {
+ dtoList = dtoList.stream()
+ .filter(processDTO -> Objects.equals(targetIp,
processDTO.getIp()))
+ .collect(Collectors.toList());
+ }
+
+ List<TailLogProcessDTO> perOneIpProgressList =
Lists.newArrayList();
perOneIpProgressList = getTailLogProcessDTOS(dtoList,
perOneIpProgressList);
perOneIpProgressList = filterExpireTimePath(perOneIpProgressList);
+
+ return perOneIpProgressList;
} catch (Exception e) {
- log.error("getTailLogProcess error,dtoList:{}",
GSON.toJson(dtoList), e);
+ log.error("getTailLogProcess error", e);
+ return new ArrayList<>();
}
- return perOneIpProgressList;
}
@Override
@@ -116,32 +396,60 @@ public class DefaultLogProcessCollector implements
LogProcessCollector {
if (StringUtils.isEmpty(ip) || tailProgressMap.isEmpty()) {
return dtoList;
}
- List<UpdateLogProcessCmd.CollectDetail> collect =
tailProgressMap.values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
- collect.stream().forEach(collectDetail -> {
- try {
- String appName = collectDetail.getAppName();
- if
(CollectionUtils.isNotEmpty(collectDetail.getFileProgressDetails())) {
-
dtoList.addAll(collectDetail.getFileProgressDetails().stream()
- .filter(processDTO ->
StringUtils.isNotBlank(processDTO.getConfigIp()))
- .filter(processDTO -> Objects.equals(ip,
processDTO.getConfigIp()))
- .map(fileProgressDetail -> {
- AgentLogProcessDTO agentLogProcessDTO = new
AgentLogProcessDTO();
-
agentLogProcessDTO.setPath(fileProgressDetail.getPattern());
-
agentLogProcessDTO.setFileRowNumber(fileProgressDetail.getFileRowNumber());
-
agentLogProcessDTO.setPointer(fileProgressDetail.getPointer());
-
agentLogProcessDTO.setFileMaxPointer(fileProgressDetail.getFileMaxPointer());
- agentLogProcessDTO.setAppName(appName);
-
agentLogProcessDTO.setCollectPercentage(fileProgressDetail.getCollectPercentage());
-
agentLogProcessDTO.setCollectTime(fileProgressDetail.getCollectTime());
- return agentLogProcessDTO;
- }).collect(Collectors.toList()));
+
+ try {
+ int processedCount = 0;
+ for (List<UpdateLogProcessCmd.CollectDetail> collectDetails :
tailProgressMap.values()) {
+ if (CollectionUtils.isEmpty(collectDetails)) {
+ continue;
+ }
+
+ for (UpdateLogProcessCmd.CollectDetail collectDetail :
collectDetails) {
+ try {
+ String appName = collectDetail.getAppName();
+ if
(CollectionUtils.isNotEmpty(collectDetail.getFileProgressDetails())) {
+ List<AgentLogProcessDTO> partialResults =
collectDetail.getFileProgressDetails().stream()
+ .filter(processDTO ->
StringUtils.isNotBlank(processDTO.getConfigIp()))
+ .filter(processDTO -> Objects.equals(ip,
processDTO.getConfigIp()))
+ .limit(MAX_COLLECT_DETAIL_PER_IP)
+ .map(fileProgressDetail -> {
+ AgentLogProcessDTO agentLogProcessDTO
= new AgentLogProcessDTO();
+
agentLogProcessDTO.setPath(fileProgressDetail.getPattern());
+
agentLogProcessDTO.setFileRowNumber(fileProgressDetail.getFileRowNumber());
+
agentLogProcessDTO.setPointer(fileProgressDetail.getPointer());
+
agentLogProcessDTO.setFileMaxPointer(fileProgressDetail.getFileMaxPointer());
+ agentLogProcessDTO.setAppName(appName);
+
agentLogProcessDTO.setCollectPercentage(fileProgressDetail.getCollectPercentage());
+
agentLogProcessDTO.setCollectTime(fileProgressDetail.getCollectTime());
+ return agentLogProcessDTO;
+ })
+ .collect(Collectors.toList());
+
+ dtoList.addAll(partialResults);
+
+ processedCount += partialResults.size();
+ if (processedCount >= MAX_COLLECT_DETAIL_PER_IP) {
+ log.warn("getAgentLogProcess reached size
limit: {}, stopping processing", MAX_COLLECT_DETAIL_PER_IP);
+ break;
+ }
+ }
+ } catch (Exception e) {
+ log.error("getAgentLogProcess
error,ip:{},CollectDetail:{}", ip, GSON.toJson(collectDetail), e);
+ }
+
+ if (processedCount >= MAX_COLLECT_DETAIL_PER_IP) {
+ break;
+ }
+ }
+
+ if (processedCount >= MAX_COLLECT_DETAIL_PER_IP) {
+ break;
}
- } catch (Exception e) {
- log.error("getAgentLogProcess error,ip:{},CollectDetail:{}",
ip, GSON.toJson(collectDetail), e);
}
- });
+ } catch (Exception e) {
+ log.error("getAgentLogProcess error, ip: {}", ip, e);
+ }
+
return dtoList;
}
@@ -151,21 +459,41 @@ public class DefaultLogProcessCollector implements
LogProcessCollector {
if (null == progressRation || tailProgressMap.isEmpty()) {
return resultList;
}
- resultList =
tailProgressMap.values().stream().flatMap(Collection::stream)
- .map(collectDetail -> {
+
+ try {
+ int count = 0;
+ for (List<UpdateLogProcessCmd.CollectDetail> collectDetails :
tailProgressMap.values()) {
+ for (UpdateLogProcessCmd.CollectDetail collectDetail :
collectDetails) {
List<UpdateLogProcessCmd.FileProgressDetail>
fileProgressDetails = collectDetail.getFileProgressDetails();
if (CollectionUtils.isNotEmpty(fileProgressDetails)) {
List<UpdateLogProcessCmd.FileProgressDetail>
progressDetails = fileProgressDetails.stream()
.filter(fileProgressDetail ->
lessThenRation(fileProgressDetail.getCollectPercentage(), progressRation))
.filter(tailLogProcessDTO -> null !=
tailLogProcessDTO.getCollectTime() &&
Instant.now().toEpochMilli() -
tailLogProcessDTO.getCollectTime() <
TimeUnit.HOURS.toMillis(MAX_STATIC_INTERRUPT_TIME_HOUR))
+ .limit(MAX_COLLECT_DETAIL_PER_IP)
.collect(Collectors.toList());
collectDetail.setFileProgressDetails(progressDetails);
}
- return collectDetail;
- })
- .filter(collectDetail ->
CollectionUtils.isNotEmpty(collectDetail.getFileProgressDetails()))
- .collect(Collectors.toList());
+
+ if
(CollectionUtils.isNotEmpty(collectDetail.getFileProgressDetails())) {
+ resultList.add(collectDetail);
+ count++;
+
+ if (count >= MAX_COLLECT_DETAIL_PER_IP) {
+ log.warn("getColProcessImperfect reached size
limit: {}, stopping processing", MAX_COLLECT_DETAIL_PER_IP);
+ break;
+ }
+ }
+ }
+
+ if (count >= MAX_COLLECT_DETAIL_PER_IP) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ log.error("getColProcessImperfect error", e);
+ }
+
return resultList;
}
@@ -176,12 +504,29 @@ public class DefaultLogProcessCollector implements
LogProcessCollector {
return resultList;
}
try {
+ int count = 0;
for (List<UpdateLogProcessCmd.CollectDetail> details :
tailProgressMap.values()) {
for (UpdateLogProcessCmd.CollectDetail detail : details) {
if (String.valueOf(tailId).equals(detail.getTailId())) {
- resultList.addAll(detail.getFileProgressDetails());
+ List<UpdateLogProcessCmd.FileProgressDetail>
fileProgressDetails = detail.getFileProgressDetails();
+ if (CollectionUtils.isNotEmpty(fileProgressDetails)) {
+ List<UpdateLogProcessCmd.FileProgressDetail>
limitedDetails = fileProgressDetails.stream()
+ .limit(MAX_COLLECT_DETAIL_PER_IP - count)
+ .collect(Collectors.toList());
+ resultList.addAll(limitedDetails);
+ count += limitedDetails.size();
+
+ if (count >= MAX_COLLECT_DETAIL_PER_IP) {
+ log.warn("getFileProcessDetailByTail reached
size limit: {}, stopping processing", MAX_COLLECT_DETAIL_PER_IP);
+ break;
+ }
+ }
}
}
+
+ if (count >= MAX_COLLECT_DETAIL_PER_IP) {
+ break;
+ }
}
} catch (Throwable t) {
log.error("getFileProcessDetailByTail error : ", t);
diff --git
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
index dfd80ae1..4bc9946f 100644
---
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
+++
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
@@ -35,9 +35,7 @@ import org.apache.ozhera.log.utils.NetUtil;
import javax.annotation.Resource;
import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.ozhera.log.common.Constant.GSON;
@@ -58,37 +56,59 @@ public class DefaultPublishConfigService implements
PublishConfigService {
private RpcServer rpcServer;
private static final String CONFIG_COMPRESS_KEY =
"CONFIG_COMPRESS_ENABLED";
+ private static final String CONFIG_COMPRESS_MACHINE_KEY =
"CONFIG_COMPRESS_MACHINE";
private volatile boolean configCompressValue = false;
+ private volatile String configCompressMachine;
+
+ private final Random random = new Random();
private static final ExecutorService SEND_CONFIG_EXECUTOR;
static {
- SEND_CONFIG_EXECUTOR = Executors.newThreadPerTaskExecutor(
- Thread.ofVirtual()
- .name("send-config-vt-", 0)
- .uncaughtExceptionHandler((t, e) ->
- log.error("send config uncaught exception", e))
- .factory()
+ int corePoolSize = Math.max(2,
Runtime.getRuntime().availableProcessors() / 2);
+ int maximumPoolSize = Runtime.getRuntime().availableProcessors();
+ int queueCapacity = 2000;
+
+ SEND_CONFIG_EXECUTOR = new ThreadPoolExecutor(
+ corePoolSize,
+ maximumPoolSize,
+ 60L, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(queueCapacity),
+ r -> {
+ Thread t = new Thread(r, "send-config-pool-" +
COUNT_INCR.getAndIncrement());
+ t.setDaemon(true);
+ t.setUncaughtExceptionHandler((thread, e) ->
+ log.error("send config uncaught exception in
thread: {}", thread.getName(), e));
+ return t;
+ },
+ (r, executor) -> log.warn("send config task rejected due to
full queue, task will be dropped")
);
}
public void init() {
- String raw = System.getenv(CONFIG_COMPRESS_KEY);
- if (StringUtils.isBlank(raw)) {
- raw = System.getProperty(CONFIG_COMPRESS_KEY);
- }
- if (StringUtils.isNotBlank(raw)) {
+ String compressRaw = getConfig(CONFIG_COMPRESS_KEY);
+ configCompressMachine = getConfig(CONFIG_COMPRESS_MACHINE_KEY);
+ log.info("init configCompressValue {},configCompressMachine {}",
configCompressValue, configCompressMachine);
+ if (StringUtils.isNotBlank(compressRaw)) {
try {
- configCompressValue = Boolean.parseBoolean(raw);
- log.info("configCompressValue {}", configCompressValue);
+ configCompressValue = Boolean.parseBoolean(compressRaw);
+ log.info("configCompressValue {},configCompressMachine{}",
configCompressValue, configCompressMachine);
} catch (Exception e) {
- log.error("parse {} error,use default value:{},config
value:{}", CONFIG_COMPRESS_KEY, configCompressValue, raw);
+ log.error("parse {} error,use default value:{},config
value:{}", CONFIG_COMPRESS_KEY, configCompressValue, compressRaw);
}
}
}
+ private String getConfig(String key) {
+ String raw = System.getenv(key);
+ if (StringUtils.isBlank(raw)) {
+ raw = System.getProperty(key);
+ }
+ return raw;
+ }
+
/**
* dubbo interface, the timeout period cannot be too long
@@ -101,13 +121,27 @@ public class DefaultPublishConfigService implements
PublishConfigService {
if (StringUtils.isBlank(agentIp) || meta == null) {
return;
}
- doSendConfig(agentIp, meta);
+// doSendConfigSync(agentIp, meta);
+ doSendConfigAsync(agentIp, meta);
// SEND_CONFIG_EXECUTOR.execute(() -> );
}
- private void doSendConfig(String agentIp, LogCollectMeta meta) {
+ @Override
+ public void sengConfigToAgentSync(String agentIp, LogCollectMeta
logCollectMeta) {
+ if (StringUtils.isBlank(agentIp) || logCollectMeta == null) {
+ return;
+ }
+ doSendConfigSync(agentIp, logCollectMeta);
+ }
+
+ /**
+ * Send configuration to agent synchronously
+ * @param agentIp
+ * @param meta
+ */
+ private void doSendConfigSync(String agentIp, LogCollectMeta meta) {
int count = 1;
- while (count < 3) {
+ while (count < 2) {
Map<String, AgentChannel> logAgentMap = getAgentChannelMap();
String agentCurrentIp = getCorrectDockerAgentIP(agentIp,
logAgentMap);
if (logAgentMap.containsKey(agentCurrentIp)) {
@@ -116,8 +150,11 @@ public class DefaultPublishConfigService implements
PublishConfigService {
RemotingCommand req =
RemotingCommand.createRequestCommand(LogCmd.LOG_REQ);
req.setBody(sendStr.getBytes());
- if (configCompressValue) {
+ if (configCompressValue ||
(StringUtils.isNotBlank(configCompressMachine) &&
+ StringUtils.isNotBlank(meta.getAgentMachine()) &&
+
configCompressMachine.contains(meta.getAgentMachine()))) {
req.enableCompression();
+ log.info("The configuration is compressed,agent
ip:{},Configuration information:{}", agentCurrentIp, sendStr);
}
log.info("Send the configuration,agent ip:{},Configuration
information:{}", agentCurrentIp, sendStr);
@@ -133,24 +170,150 @@ public class DefaultPublishConfigService implements
PublishConfigService {
} else {
log.info("The current agent IP is not
connected,ip:{},configuration data:{}", agentIp, GSON.toJson(meta));
}
- //Retry policy - Retry 4 times, sleep 200 ms each time
+ //Retry policy - Retry 2 times, sleep 100 ms each time
try {
- TimeUnit.MILLISECONDS.sleep(200L);
+ TimeUnit.MILLISECONDS.sleep(100L);
} catch (final InterruptedException ignored) {
}
count++;
}
}
+ /**
+ * Send configuration to agent asynchronously
+ *
+ * @param agentIp agent IP address
+ * @param meta log collection metadata
+ */
+ public void doSendConfigAsync(String agentIp, LogCollectMeta meta) {
+ if (StringUtils.isBlank(agentIp) || meta == null) {
+ return;
+ }
+
+ sendWithRetry(agentIp, meta, 1)
+ .exceptionally(ex -> {
+ log.error("send config failed after retry, agentIp:{}",
agentIp, ex);
+ return false;
+ });
+ try {
+ Thread.sleep(random.nextInt(800, 1000));
+ } catch (InterruptedException e) {
+ log.error("sleep interrupted, agentIp:{}", agentIp, e);
+ }
+ }
+
+ private CompletableFuture<Boolean> sendWithRetry(String agentIp,
+ LogCollectMeta meta,
+ int attempt) {
+ return trySendOnce(agentIp, meta)
+ .thenCompose(success -> {
+
+ if (success) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ if (attempt > 2) {
+ return CompletableFuture.completedFuture(false);
+ }
+
+ long delay = (long) Math.pow(2, attempt) * 200L;
+ log.warn("send config retry attempt:{}, delay:{}ms,
agentIp:{}",
+ attempt, delay, agentIp);
+
+ return CompletableFuture
+ .runAsync(() -> {
+ }, CompletableFuture.delayedExecutor(
+ delay,
+ TimeUnit.MILLISECONDS,
+ SEND_CONFIG_EXECUTOR))
+ .thenCompose(v ->
+ sendWithRetry(agentIp, meta, attempt + 1));
+ });
+ }
+
+ private CompletableFuture<Boolean> trySendOnce(String agentIp,
+ LogCollectMeta meta) {
+
+ Map<String, AgentChannel> logAgentMap = getAgentChannelMap();
+ String agentCurrentIp = getCorrectDockerAgentIP(agentIp, logAgentMap);
+
+ if (!logAgentMap.containsKey(agentCurrentIp)) {
+ log.warn("agent not connected, ip:{}", agentIp);
+ return CompletableFuture.completedFuture(false);
+ }
+
+ if (CollectionUtils.isEmpty(meta.getAppLogMetaList())) {
+ return CompletableFuture.completedFuture(false);
+ }
+
+ String sendStr = GSON.toJson(meta);
+
+ RemotingCommand req =
RemotingCommand.createRequestCommand(LogCmd.LOG_REQ);
+ req.setBody(sendStr.getBytes());
+
+ if (configCompressValue ||
(StringUtils.isNotBlank(configCompressMachine) &&
+ StringUtils.isNotBlank(meta.getAgentMachine()) &&
+ configCompressMachine.contains(meta.getAgentMachine()))) {
+ req.enableCompression();
+ log.info("The configuration is compressed,agent
ip:{},Configuration information:{}", agentCurrentIp, sendStr);
+ }
+
+ Stopwatch started = Stopwatch.createStarted();
+
+ log.info("Send the configuration asynchronously,agent
ip:{},Configuration information:{}", agentCurrentIp, sendStr);
+
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+
+ try {
+
+ rpcServer.send(
+ logAgentMap.get(agentCurrentIp).getChannel(),
+ req,
+ 10000,
+ response -> {
+
+ started.stop();
+
+ if (response == null
+ || response.getResponseCommand() == null) {
+
+ future.complete(false);
+ return;
+ }
+
+ String resp = new String(
+ response.getResponseCommand().getBody());
+
+ log.info("async send result:{}, cost:{}ms, ip:{}",
+ resp,
+ started.elapsed().toMillis(),
+ agentCurrentIp);
+
+ future.complete(Objects.equals(resp, "ok"));
+ });
+
+ } catch (Exception e) {
+
+ log.error("send exception, agentIp:{}", agentIp, e);
+ future.complete(false);
+ }
+
+ return future;
+ }
+
@Override
public List<String> getAllAgentList() {
List<String> remoteAddress = Lists.newArrayList();
- List<String> ipAddress = Lists.newArrayList();
+// List<String> ipAddress = Lists.newArrayList();
+ List<String> finalRemoteAddress = remoteAddress;
AgentContext.ins().map.forEach((key, value) -> {
- remoteAddress.add(key);
- ipAddress.add(StringUtils.substringBefore(key, SYMBOL_COLON));
+ finalRemoteAddress.add(key);
+// ipAddress.add(StringUtils.substringBefore(key, SYMBOL_COLON));
});
- if (COUNT_INCR.getAndIncrement() % 200 == 0) {
+ if (remoteAddress.size() > 1000) {
+ remoteAddress = remoteAddress.subList(0, 600);
+ }
+ if (COUNT_INCR.getAndIncrement() % 1000 == 0) {
log.info("The set of remote addresses of the connected agent
machine is:{}", GSON.toJson(remoteAddress));
}
return remoteAddress;
diff --git
a/ozhera-log/log-agent-server/src/test/java/org/apache/ozhera/log/server/DefaultLogProcessCollectorTest.java
b/ozhera-log/log-agent-server/src/test/java/org/apache/ozhera/log/server/DefaultLogProcessCollectorTest.java
index 034fd7d5..16f4eb42 100644
---
a/ozhera-log/log-agent-server/src/test/java/org/apache/ozhera/log/server/DefaultLogProcessCollectorTest.java
+++
b/ozhera-log/log-agent-server/src/test/java/org/apache/ozhera/log/server/DefaultLogProcessCollectorTest.java
@@ -44,7 +44,7 @@ public class DefaultLogProcessCollectorTest {
@Before
public void buildBean() {
- Ioc.ins().init("com.xiaomi");
+ Ioc.ins().init("com.xiaomi", "org.apache.ozhera.log");
processCollector = Ioc.ins().getBean(DefaultLogProcessCollector.class);
gson = new GsonBuilder().create();
}
@@ -59,8 +59,10 @@ public class DefaultLogProcessCollectorTest {
progressDetail.setPattern("/home/work/log/test/server.log");
progressDetail.setCollectPercentage("98%");
fileProgressDetails.add(progressDetail);
+ fileProgressDetails.add(progressDetail);
collectDetail.setFileProgressDetails(fileProgressDetails);
collectList.add(collectDetail);
+ collectList.add(collectDetail);
updateLogProcessCmd.setCollectList(collectList);
updateLogProcessCmd.setIp("127.0.0.1");
processCollector.collectLogProcess(updateLogProcessCmd);
diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml
index d0362c05..ac0da4e8 100644
--- a/ozhera-log/log-agent/pom.xml
+++ b/ozhera-log/log-agent/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<modelVersion>4.0.0</modelVersion>
<artifactId>log-agent</artifactId>
- <version>2.2.16-SNAPSHOT</version>
+ <version>2.2.17-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 6ea04c38..8d9cee84 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -530,6 +530,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
Future<?> future = getExecutorServiceByType(logTypeEnum).submit(()
-> {
try {
log.info("filePath:{},is VirtualThread {},
thread:{},id:{}", filePath, Thread.currentThread().isVirtual(),
Thread.currentThread(), Thread.currentThread().threadId());
+ logFile.setStop(false);
logFile.readLine();
// Reset exception count on successful read
fileExceptionCountMap.remove(filePath);
diff --git a/ozhera-log/log-api/pom.xml b/ozhera-log/log-api/pom.xml
index d69b7cf2..bd3b1de2 100644
--- a/ozhera-log/log-api/pom.xml
+++ b/ozhera-log/log-api/pom.xml
@@ -29,6 +29,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<modelVersion>4.0.0</modelVersion>
<artifactId>log-api</artifactId>
+ <version>2.2.7-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
diff --git
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/service/PublishConfigService.java
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/service/PublishConfigService.java
index 8095a68a..bdafa568 100644
---
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/service/PublishConfigService.java
+++
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/service/PublishConfigService.java
@@ -32,5 +32,7 @@ public interface PublishConfigService {
void sengConfigToAgent(final String agentIp, LogCollectMeta
logCollectMeta);
+ void sengConfigToAgentSync(final String agentIp, LogCollectMeta
logCollectMeta);
+
List<String> getAllAgentList();
}
diff --git a/ozhera-log/log-manager/pom.xml b/ozhera-log/log-manager/pom.xml
index 395744dd..a2bdf3c1 100644
--- a/ozhera-log/log-manager/pom.xml
+++ b/ozhera-log/log-manager/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<modelVersion>4.0.0</modelVersion>
<artifactId>log-manager</artifactId>
- <version>2.2.8-SNAPSHOT</version>
+ <version>2.2.9-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
@@ -305,11 +305,13 @@ http://www.apache.org/licenses/LICENSE-2.0
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
+ <version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.knuddels</groupId>
<artifactId>jtokkit</artifactId>
+ <version>1.1.0</version>
</dependency>
</dependencies>
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/agent/MilogAgentServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/agent/MilogAgentServiceImpl.java
index a278c817..af8a519e 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/agent/MilogAgentServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/agent/MilogAgentServiceImpl.java
@@ -22,6 +22,12 @@ import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.xiaomi.data.push.context.AgentContext;
import com.xiaomi.data.push.rpc.netty.AgentChannel;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.common.NamedThreadFactory;
+import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.app.api.response.AppBaseInfo;
import org.apache.ozhera.log.api.enums.LogTypeEnum;
import org.apache.ozhera.log.api.enums.MQSourceEnum;
@@ -48,12 +54,6 @@ import
org.apache.ozhera.log.manager.service.impl.HeraAppServiceImpl;
import org.apache.ozhera.log.manager.service.impl.LogTailServiceImpl;
import org.apache.ozhera.log.manager.service.path.LogPathMapping;
import org.apache.ozhera.log.manager.service.path.LogPathMappingFactory;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.common.NamedThreadFactory;
-import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import javax.annotation.Resource;
@@ -178,11 +178,14 @@ public class MilogAgentServiceImpl implements
MilogAgentService {
}
printMangerInfo();
AppBaseInfo appBaseInfo = heraAppService.queryById(milogAppId);
+ MilogLogTailDo milogLogTailDo = milogLogtailDao.queryById(tailId);
+ MilogLogStoreDO logStore =
logstoreDao.queryById(milogLogTailDo.getStoreId());
ips.forEach(ip -> {
AppLogMeta appLogMeta = assembleSingleConfig(milogAppId,
queryLogPattern(milogAppId, ip, appBaseInfo.getPlatformType()));
LogCollectMeta logCollectMeta = new LogCollectMeta();
logCollectMeta.setAgentIp(ip);
- logCollectMeta.setAppLogMetaList(Arrays.asList(appLogMeta));
+ logCollectMeta.setAgentMachine(null != logStore &&
StringUtils.isNotEmpty(logStore.getMachineRoom()) ? logStore.getMachineRoom() :
StringUtils.EMPTY);
+ logCollectMeta.setAppLogMetaList(List.of(appLogMeta));
AgentDefine agentDefine = new AgentDefine();
agentDefine.setFilters(new ArrayList<>());
logCollectMeta.setAgentDefine(agentDefine);
@@ -199,8 +202,8 @@ public class MilogAgentServiceImpl implements
MilogAgentService {
}
private void printMangerInfo() {
- List<String> remoteAddress = publishConfigService.getAllAgentList();
if (COUNT_INCR.getAndIncrement() % 200 == 0) {
+ List<String> remoteAddress =
publishConfigService.getAllAgentList();
log.info("The set of remote addresses for the connected agent
machine is:{}", gson.toJson(remoteAddress));
}
}
@@ -223,7 +226,7 @@ public class MilogAgentServiceImpl implements
MilogAgentService {
ips.forEach(ip -> {
LogCollectMeta logCollectMeta = new LogCollectMeta();
logCollectMeta.setAgentIp(ip);
- logCollectMeta.setAgentMachine("");
+ logCollectMeta.setAgentMachine(StringUtils.EMPTY);
logCollectMeta.setAgentId("");
logCollectMeta.setAppLogMetaList(Arrays.asList(appLogMeta));
sengConfigToAgent(ip, logCollectMeta);
@@ -297,7 +300,7 @@ public class MilogAgentServiceImpl implements
MilogAgentService {
private LogCollectMeta buildLogCollectMeta(String agentIp) {
LogCollectMeta logCollectMeta = new LogCollectMeta();
logCollectMeta.setAgentIp(agentIp);
- logCollectMeta.setAgentMachine("");
+ logCollectMeta.setAgentMachine(StringUtils.EMPTY);
logCollectMeta.setAgentId("");
return logCollectMeta;
}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MiLogMetaManageServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MiLogMetaManageServiceImpl.java
index 034d7a74..790788d9 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MiLogMetaManageServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MiLogMetaManageServiceImpl.java
@@ -19,6 +19,9 @@
package org.apache.ozhera.log.manager.service.impl;
import com.google.common.collect.Lists;
+import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.log.api.model.meta.AppLogMeta;
import org.apache.ozhera.log.api.model.meta.LogCollectMeta;
import org.apache.ozhera.log.api.model.meta.LogPattern;
@@ -26,8 +29,6 @@ import org.apache.ozhera.log.manager.dao.MilogAppTopicRelDao;
import org.apache.ozhera.log.manager.dao.MilogLogTailDao;
import org.apache.ozhera.log.manager.model.pojo.MilogLogTailDo;
import org.apache.ozhera.log.manager.service.MiLogMetaManageService;
-import com.xiaomi.youpin.docean.anno.Service;
-import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import java.util.ArrayList;
@@ -55,7 +56,7 @@ public class MiLogMetaManageServiceImpl implements
MiLogMetaManageService {
LogCollectMeta meta = new LogCollectMeta();
meta.setAgentId(agentId);
meta.setAgentIp(agentIp);
- meta.setAgentMachine(null);
+ meta.setAgentMachine(StringUtils.EMPTY);
List<AppLogMeta> metaList = new ArrayList<>();
for (Map.Entry<Long, List<MilogLogTailDo>> entry :
miLogTailMap.entrySet()) {
AppLogMeta appLogMeta = new AppLogMeta();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]