This is an automated email from the ASF dual-hosted git repository.
dingtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new 333d6244 fix: fix log statistics and pipeline calling logic (#619)
333d6244 is described below
commit 333d6244741be6d33e42fbf4bb8222b8896435ac
Author: wtt <[email protected]>
AuthorDate: Thu Nov 27 10:56:33 2025 +0800
fix: fix log statistics and pipeline calling logic (#619)
* fix: solve the blocking problem caused by serverless startup
* fix: fix log statistics and pipeline calling logic
---
.../log/agent/channel/ChannelServiceImpl.java | 13 ++++---
.../agent/channel/WildcardChannelServiceImpl.java | 43 +++++++++++-----------
.../log/agent/channel/pipeline/Pipeline.java | 4 +-
3 files changed, 30 insertions(+), 30 deletions(-)
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 6611c0eb..73ce0461 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -51,6 +51,7 @@ import org.apache.ozhera.log.utils.NetUtil;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
@@ -111,7 +112,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private volatile long lastSendTime = System.currentTimeMillis();
- private volatile long logCounts = 0;
+ private final AtomicLong logCounts = new AtomicLong();
private ScheduledFuture<?> scheduledFuture;
@@ -134,7 +135,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private String linePrefix;
- private Pipeline pipeline;
+ private final Pipeline pipeline;
public ChannelServiceImpl(MsgExporter msgExporter, AgentMemoryService
memoryService,
ChannelDefine channelDefine, FilterChain chain,
Pipeline pipeline) {
@@ -478,7 +479,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult>
readResult, String pattern, String patternCode, long ct) {
RequestContext requestContext =
RequestContext.builder().channelDefine(channelDefine).readResult(readResult.get()).lineMsg(lineMsg).build();
- if (pipeline.invoke(requestContext)) {
+ if (!pipeline.invoke(requestContext)) {
return;
}
LineMessage lineMessage = createLineMessage(lineMsg, readResult,
pattern, patternCode, ct);
@@ -704,11 +705,11 @@ public class ChannelServiceImpl extends
AbstractChannelService {
long current = System.currentTimeMillis();
msgExporter.export(subList);
- logCounts += subList.size();
+ logCounts.addAndGet(subList.size());
lastSendTime = System.currentTimeMillis();
channelMemory.setCurrentTime(lastSendTime);
- log.info("doExport channelId:{}, send {} message, cost:{}, total
send:{}, instanceId:{},", channelDefine.getChannelId(), subList.size(),
lastSendTime - current, logCounts, instanceId());
+ log.info("doExport channelId:{}, send {} message, cost:{}, total
send:{}, instanceId:{},", channelDefine.getChannelId(), subList.size(),
lastSendTime - current, logCounts.get(), instanceId());
} catch (Exception e) {
log.error("doExport Exception:{}", e);
} finally {
@@ -969,7 +970,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
@Override
public Long getLogCounts() {
- return this.logCounts;
+ return this.logCounts.get();
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index ef62b742..f4c17222 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -52,6 +52,7 @@ import java.io.IOException;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
@@ -71,7 +72,7 @@ import static org.apache.ozhera.log.common.PathUtils.*;
@Slf4j
public class WildcardChannelServiceImpl extends AbstractChannelService {
- private AgentMemoryService memoryService;
+ private final AgentMemoryService memoryService;
private MsgExporter msgExporter;
@@ -79,41 +80,41 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
private ChannelMemory channelMemory;
- private FilterChain chain;
+ private final FilterChain chain;
private String logPattern;
private String linePrefix;
- private String memoryBasePath;
+ private final String memoryBasePath;
private static final String POINTER_FILENAME_PREFIX = ".ozhera_pointer";
- private List<LineMessage> lineMessageList = new ArrayList<>();
+ private final List<LineMessage> lineMessageList = new ArrayList<>();
private ScheduledFuture<?> scheduledFuture;
private ScheduledFuture<?> lastFileLineScheduledFuture;
- private List<Future<?>> fileCollFutures = Lists.newArrayList();
+ private final List<Future<?>> fileCollFutures = Lists.newArrayList();
private volatile long lastSendTime = System.currentTimeMillis();
- private volatile long logCounts = 0;
+ private final AtomicLong logCounts = new AtomicLong();
- private ReentrantLock reentrantLock = new ReentrantLock();
+ private final ReentrantLock reentrantLock = new ReentrantLock();
private DefaultMonitorListener defaultMonitorListener;
private HeraFileMonitor fileMonitor;
-
+
/**
* Track file truncation check time to avoid duplicate handling
*/
private final Map<String, Long> fileTruncationCheckMap = new
ConcurrentHashMap<>();
- private Pipeline pipeline;
+ private final Pipeline pipeline;
public WildcardChannelServiceImpl(MsgExporter msgExporter,
AgentMemoryService memoryService,
ChannelDefine channelDefine, FilterChain
chain,
@@ -217,7 +218,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
FileInfoCache.ins().remove(cacheKey);
continue;
}
-
+
// Check for file truncation (handles copytruncate log rotation)
checkFileTruncation(filePath);
}
@@ -236,29 +237,29 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
savedPointer = fileProgress.getPointer();
}
}
-
+
if (savedPointer <= 0) {
return;
}
-
+
File file = new File(filePath);
if (!file.exists() || savedPointer <= file.length()) {
return;
}
-
+
long lastCheckTime = fileTruncationCheckMap.getOrDefault(filePath,
0L);
long currentTime = System.currentTimeMillis();
long currentFileLength = file.length();
boolean isImmediateTruncation = currentFileLength == 0 ||
currentFileLength < savedPointer * 0.1;
-
+
if (isImmediateTruncation || (lastCheckTime > 0 && (currentTime -
lastCheckTime) > 15000)) {
log.warn("File truncation detected: pointer {} > length {} for
file:{}, restarting read",
savedPointer, currentFileLength, filePath);
-
+
// Shutdown and restart ReadListener for this file
for (ReadListener readListener :
defaultMonitorListener.getReadListenerList()) {
if (readListener instanceof
com.xiaomi.mone.file.listener.OzHeraReadListener) {
- com.xiaomi.mone.file.LogFile2 logFile =
+ com.xiaomi.mone.file.LogFile2 logFile =
((com.xiaomi.mone.file.listener.OzHeraReadListener) readListener).getLogFile();
if (logFile != null &&
filePath.equals(logFile.getFile())) {
logFile.shutdown();
@@ -273,7 +274,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
log.debug("Error checking file truncation for path:{}", filePath,
e);
}
}
-
+
// Check for inode changes: when a file with the same path is deleted and
recreated with a new inode
// DefaultMonitorListener handles file deletion, but may not detect inode
changes for files with the same path
private void checkAndShutdownInodeChangedReadListeners() {
@@ -445,7 +446,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult>
readResult, String patternCode, long ct) {
RequestContext requestContext =
RequestContext.builder().channelDefine(channelDefine).readResult(readResult.get()).lineMsg(lineMsg).build();
- if (pipeline.invoke(requestContext)) {
+ if (!pipeline.invoke(requestContext)) {
return;
}
String filePathName = readResult.get().getFilePathName();
@@ -472,11 +473,11 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
long current = System.currentTimeMillis();
msgExporter.export(subList);
- logCounts += subList.size();
+ logCounts.addAndGet(subList.size());
lastSendTime = System.currentTimeMillis();
channelMemory.setCurrentTime(lastSendTime);
- log.info("doExport channelId:{}, send {} message, cost:{}, total
send:{}, instanceId:{},", channelDefine.getChannelId(), subList.size(),
lastSendTime - current, logCounts, instanceId());
+ log.info("doExport channelId:{}, send {} message, cost:{}, total
send:{}, instanceId:{},", channelDefine.getChannelId(), subList.size(),
lastSendTime - current, logCounts.get(), instanceId());
} catch (Exception e) {
log.error("doExport Exception", e);
} finally {
@@ -522,7 +523,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
@Override
public Long getLogCounts() {
- return logCounts;
+ return logCounts.get();
}
@Override
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java
index 511496a5..580c852f 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java
@@ -25,8 +25,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.ServiceLoader;
-import static org.apache.ozhera.log.common.Constant.GSON;
-
/**
* @author wtt
* @date 2025/11/21 15:07
@@ -42,7 +40,7 @@ public class Pipeline {
valves.add(valve);
}
valves.sort(Comparator.naturalOrder());
- log.info("Pipeline valves: {}", GSON.toJson(valves));
+ log.info("pipeline valves: {}", valves);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]