This is an automated email from the ASF dual-hosted git repository.
gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new 64c082ed feat: Optimize log collection exception handling and file
rotation detection mechanism (#614)
64c082ed is described below
commit 64c082eddcacaf5ded7f2edbd1f382c452e5f320
Author: Yandi Lee <[email protected]>
AuthorDate: Tue Nov 25 11:11:30 2025 +0800
feat: Optimize log collection exception handling and file rotation
detection mechanism (#614)
* fix: solve the blocking problem caused by serverless startup
* 优化日志采集异常处理和快速切分场景支持
- 新增文件读取异常自动重试机制(最多5次,区分异常类型)
- 增加运行时文件截断检测(15秒间隔,立即截断立即处理)
- 优化快速日志切分处理(关键事件1秒频率限制,检测间隔15秒)
- 改进文件不存在时的监控策略
- 修复快速切分场景下的数据丢失问题
* Add file truncation detection for copytruncate log rotation
- Add checkFileTruncation() to detect pointer > file.length()
- Handle immediate truncation immediately, gradual truncation with 15s
confirmation
- Automatically restart ReadListener when truncation detected
- Complement inode detection to cover all log rotation scenarios
* Optimizes file truncation detection
Enhances the file truncation detection mechanism during log rotation by
directly utilizing the channel memory to retrieve the saved pointer. This
removes the reliance on iterating through read listeners, resulting in improved
efficiency.
Adds a schedule method to the ExecutorUtil.
---------
Co-authored-by: wtt <[email protected]>
Co-authored-by: wtt <[email protected]>
---
ozhera-log/log-agent/pom.xml | 2 +-
.../log/agent/channel/ChannelServiceImpl.java | 202 ++++++++++++++++++++-
.../agent/channel/WildcardChannelServiceImpl.java | 61 +++++++
.../ozhera/log/agent/common/ExecutorUtil.java | 6 +
4 files changed, 261 insertions(+), 10 deletions(-)
diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml
index b2145662..3a29698a 100644
--- a/ozhera-log/log-agent/pom.xml
+++ b/ozhera-log/log-agent/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<modelVersion>4.0.0</modelVersion>
<artifactId>log-agent</artifactId>
- <version>2.2.13-SNAPSHOT</version>
+ <version>2.2.14-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 97ce773b..d4eb0dae 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -86,6 +86,16 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private final Map<String, Long> fileReadMap = new ConcurrentHashMap<>();
private final Map<String, Pair<MLog, AtomicReference<ReadResult>>>
resultMap = new ConcurrentHashMap<>();
+
+ /**
+ * Track file read exceptions for retry mechanism
+ */
+ private final Map<String, Integer> fileExceptionCountMap = new
ConcurrentHashMap<>();
+
+ /**
+ * Track last file truncation check time
+ */
+ private final Map<String, Long> fileTruncationCheckMap = new
ConcurrentHashMap<>();
private ScheduledFuture<?> lastFileLineScheduledFuture;
@@ -245,6 +255,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private void startDeletedFileCleanupTask() {
// Periodically cleanup files that no longer exist or have inode
changes to release file handles
+ // Check more frequently (every 15 seconds) to reduce truncation
detection delay for fast rotation
deletedFileCleanupFuture = ExecutorUtil.scheduleAtFixedRate(() ->
SafeRun.run(() -> {
for (String path : new ArrayList<>(logFileMap.keySet())) {
if (!FileUtil.exist(path)) {
@@ -258,9 +269,80 @@ public class ChannelServiceImpl extends
AbstractChannelService {
log.info("deleted file cleanup trigger for inode changed
path:{}, oldInode:{}, newInode:{}",
path, inodeInfo != null ? inodeInfo[0] :
"unknown", inodeInfo != null ? inodeInfo[1] : "unknown");
cleanFile(path::equals);
+ // Inode change usually means log rotation (move/rename),
trigger reOpen to restart reading
+ // reOpen has complete logic including frequency limit and
state checks
+ reOpen(path);
+ continue;
}
+
+ // Check for file truncation during runtime (handles
copytruncate rotation)
+ checkFileTruncation(path);
+ }
+ }), 15, 15, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Check if file has been truncated during runtime (handles copytruncate
log rotation)
+ */
+ private void checkFileTruncation(String filePath) {
+ try {
+ ChannelMemory.FileProgress fileProgress =
channelMemory.getFileProgressMap().get(filePath);
+ if (fileProgress == null || fileProgress.getPointer() <= 0) {
+ return;
+ }
+
+ java.io.File file = new java.io.File(filePath);
+ if (!file.exists()) {
+ return;
+ }
+
+ long savedPointer = fileProgress.getPointer();
+ long currentFileLength = file.length();
+
+ if (savedPointer > currentFileLength) {
+ long lastCheckTime =
fileTruncationCheckMap.getOrDefault(filePath, 0L);
+ long currentTime = System.currentTimeMillis();
+ boolean isImmediateTruncation = currentFileLength == 0 ||
currentFileLength < savedPointer * 0.1;
+
+ // For immediate truncation, handle immediately; for gradual,
reduce confirmation time to 15 seconds
+ if (isImmediateTruncation || (lastCheckTime > 0 &&
(currentTime - lastCheckTime) > 15000)) {
+ log.warn("File truncation detected: pointer {} > length {}
for file:{}, resetting to 0",
+ savedPointer, currentFileLength, filePath);
+ fileProgress.setPointer(0L);
+ fileProgress.setCurrentRowNum(0L);
+
+ ILogFile logFile = logFileMap.get(filePath);
+ if (logFile != null && !logFile.getExceptionFinish()) {
+ handleFileTruncationReopen(filePath);
+ }
+ }
+ fileTruncationCheckMap.put(filePath, currentTime);
+ } else {
+ fileTruncationCheckMap.remove(filePath);
+ }
+ } catch (Exception e) {
+ log.debug("Error checking file truncation for path:{}", filePath,
e);
+ }
+ }
+
+ /**
+ * Handle file reopen for truncation cases, bypassing frequency limit
+ * Optimized for fast rotation scenarios
+ */
+ private void handleFileTruncationReopen(String filePath) {
+ try {
+ ILogFile logFile = logFileMap.get(filePath);
+ if (logFile == null || logFile.getExceptionFinish()) {
+ readFile(channelDefine.getInput().getPatternCode(), filePath,
getChannelId());
+ log.info("Recreated file handle after truncation, file:{}",
filePath);
+ } else {
+ stopOldCurrentFileThread(filePath);
+ readFile(channelDefine.getInput().getPatternCode(), filePath,
getChannelId());
+ log.info("Restarted file reading after truncation, file:{}",
filePath);
}
- }), 60, 60, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("Error handling file truncation reopen, file:{}",
filePath, e);
+ }
}
@@ -435,14 +517,94 @@ public class ChannelServiceImpl extends
AbstractChannelService {
try {
log.info("filePath:{},is VirtualThread {},
thread:{},id:{}", filePath, Thread.currentThread().isVirtual(),
Thread.currentThread(), Thread.currentThread().threadId());
logFile.readLine();
+ // Reset exception count on successful read
+ fileExceptionCountMap.remove(filePath);
} catch (Exception e) {
- logFile.setExceptionFinish();
- log.error("logFile read line
err,channelId:{},file:{},patternCode:{}", channelId, fileProgressMap,
patternCode, e);
+ if (isTruncationException(e, filePath)) {
+ log.warn("File truncation detected during read,
file:{}, will restart reading", filePath, e);
+ handleFileTruncationReopen(filePath);
+ } else {
+ handleFileReadException(filePath, e, patternCode,
channelId);
+ }
}
});
futureMap.put(filePath, future);
} else {
- log.info("file not exist,file:{}", filePath);
+ log.warn("file not exist,file:{}, will monitor for file creation",
filePath);
+ }
+ }
+
+ /**
+ * Check if exception is related to file truncation
+ */
+ private boolean isTruncationException(Exception e, String filePath) {
+ try {
+ ChannelMemory.FileProgress fileProgress =
channelMemory.getFileProgressMap().get(filePath);
+ if (fileProgress != null) {
+ java.io.File file = new java.io.File(filePath);
+ if (file.exists() && fileProgress.getPointer() >
file.length()) {
+ return true;
+ }
+ }
+ String errorMsg = e.getMessage();
+ if (errorMsg != null) {
+ String lowerMsg = errorMsg.toLowerCase();
+ return lowerMsg.contains("truncat") ||
lowerMsg.contains("invalid position") || lowerMsg.contains("position out of
range");
+ }
+ } catch (Exception ignored) {
+ // Ignore check errors
+ }
+ return false;
+ }
+
+ /**
+ * Handle file read exceptions with retry mechanism and error
classification
+ */
+ private void handleFileReadException(String filePath, Exception e, String
patternCode, Long channelId) {
+ int exceptionCount = fileExceptionCountMap.getOrDefault(filePath, 0) +
1;
+ fileExceptionCountMap.put(filePath, exceptionCount);
+
+ String errorMsg = e.getMessage();
+ boolean isPermissionError = e instanceof
java.nio.file.AccessDeniedException
+ || (errorMsg != null && (errorMsg.contains("Permission
denied") || errorMsg.contains("access denied")));
+ boolean isFileNotFound = e instanceof java.io.FileNotFoundException
+ || (errorMsg != null && errorMsg.contains("No such file"));
+
+ if (isPermissionError) {
+ log.error("File permission denied, channelId:{}, file:{},
patternCode:{}, exceptionCount:{}",
+ channelId, filePath, patternCode, exceptionCount, e);
+ ILogFile logFile = logFileMap.get(filePath);
+ if (logFile != null) {
+ logFile.setExceptionFinish();
+ }
+ return;
+ }
+
+ if (isFileNotFound) {
+ log.warn("File not found during read, channelId:{}, file:{}, will
retry when file is created",
+ channelId, filePath, exceptionCount);
+ return;
+ }
+
+ final int MAX_RETRY_COUNT = 5;
+ final long RETRY_DELAY_SECONDS = 30;
+
+ if (exceptionCount <= MAX_RETRY_COUNT) {
+ log.warn("File read error (retry {}/{}), channelId:{}, file:{},
will retry in {} seconds",
+ exceptionCount, MAX_RETRY_COUNT, channelId, filePath,
RETRY_DELAY_SECONDS, e);
+ ExecutorUtil.schedule(() -> SafeRun.run(() -> {
+ if (FileUtil.exist(filePath)) {
+ log.info("Retrying file read after error, file:{}",
filePath);
+ readFile(patternCode, filePath, channelId);
+ }
+ }), RETRY_DELAY_SECONDS, TimeUnit.SECONDS);
+ } else {
+ log.error("File read error exceeded max retries ({}),
channelId:{}, file:{}, marking as finished",
+ MAX_RETRY_COUNT, channelId, filePath, e);
+ ILogFile logFile = logFileMap.get(filePath);
+ if (logFile != null) {
+ logFile.setExceptionFinish();
+ }
}
}
@@ -573,6 +735,8 @@ public class ChannelServiceImpl extends
AbstractChannelService {
reOpenMap.clear();
fileReadMap.clear();
resultMap.clear();
+ fileExceptionCountMap.clear();
+ fileTruncationCheckMap.clear();
}
public Long getChannelId() {
@@ -597,17 +761,29 @@ public class ChannelServiceImpl extends
AbstractChannelService {
public void reOpen(String filePath) {
fileReopenLock.lock();
try {
- //Judging the number of openings, it can only be reopened once
within 10 seconds.
- final long REOPEN_THRESHOLD = 10 * 1000;
+ // Check if this is a critical rotation event (inode change or
truncation) that needs immediate handling
+ boolean isCriticalRotation =
ChannelUtil.isInodeChanged(channelMemory, filePath);
+ if (!isCriticalRotation && FileUtil.exist(filePath)) {
+ ChannelMemory.FileProgress fileProgress =
channelMemory.getFileProgressMap().get(filePath);
+ if (fileProgress != null && fileProgress.getPointer() > 0) {
+ java.io.File file = new java.io.File(filePath);
+ // Check if file was truncated (common in fast rotation
scenarios)
+ isCriticalRotation = fileProgress.getPointer() >
file.length();
+ }
+ }
+
+ // For critical rotation events, bypass frequency limit to avoid
data loss
+ final long REOPEN_THRESHOLD = isCriticalRotation ? 1000 : 10 *
1000; // 1 second for critical, 10 seconds for normal
- if (reOpenMap.containsKey(filePath) &&
Instant.now().toEpochMilli() - reOpenMap.get(filePath) < REOPEN_THRESHOLD) {
+ if (!isCriticalRotation && reOpenMap.containsKey(filePath) &&
+ Instant.now().toEpochMilli() - reOpenMap.get(filePath) <
REOPEN_THRESHOLD) {
log.info("The file has been opened too frequently.Please try
again in 10 seconds.fileName:{}," +
"last time opening time.:{}", filePath,
reOpenMap.get(filePath));
return;
}
reOpenMap.put(filePath, Instant.now().toEpochMilli());
- log.info("reOpen file:{}", filePath);
+ log.info("reOpen file:{}, isCriticalRotation:{}", filePath,
isCriticalRotation);
if (collectOnce) {
handleAllFileCollectMonitor(channelDefine.getInput().getPatternCode(),
filePath, getChannelId());
@@ -618,7 +794,6 @@ public class ChannelServiceImpl extends
AbstractChannelService {
String tailPodIp = getTailPodIp(filePath);
String ip = StringUtils.isBlank(tailPodIp) ? NetUtil.getLocalIp()
: tailPodIp;
if (null == logFile || logFile.getExceptionFinish()) {
- // Add new log file
readFile(channelDefine.getInput().getPatternCode(), filePath,
getChannelId());
log.info("watch new file create for
channelId:{},ip:{},path:{}", getChannelId(), filePath, ip);
} else {
@@ -770,6 +945,15 @@ public class ChannelServiceImpl extends
AbstractChannelService {
for (String delFile : delFiles) {
resultMap.remove(delFile);
}
+
+ // Clean up exception count and truncation check maps
+ delFiles = fileExceptionCountMap.keySet().stream()
+ .filter(filePath -> filter.test(filePath))
+ .collect(Collectors.toList());
+ for (String delFile : delFiles) {
+ fileExceptionCountMap.remove(delFile);
+ fileTruncationCheckMap.remove(delFile);
+ }
}
@Override
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index a991ecc4..b1778881 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -108,6 +108,11 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
private DefaultMonitorListener defaultMonitorListener;
private HeraFileMonitor fileMonitor;
+
+ /**
+ * Track file truncation check time to avoid duplicate handling
+ */
+ private final Map<String, Long> fileTruncationCheckMap = new
ConcurrentHashMap<>();
public WildcardChannelServiceImpl(MsgExporter msgExporter,
AgentMemoryService memoryService,
@@ -208,10 +213,65 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
if (!file.exists()) {
log.info("cleanUpInvalidFileInfos: file no longer exists,
removing from cache, cacheKey:{}, filePath:{}", cacheKey, filePath);
FileInfoCache.ins().remove(cacheKey);
+ continue;
}
+
+ // Check for file truncation (handles copytruncate log rotation)
+ checkFileTruncation(filePath);
}
}
+ /**
+ * Check if file has been truncated during runtime (handles copytruncate
log rotation)
+ */
+ private void checkFileTruncation(String filePath) {
+ try {
+ // Get saved pointer from channelMemory
+ long savedPointer = 0L;
+ if (channelMemory != null) {
+ ChannelMemory.FileProgress fileProgress =
channelMemory.getFileProgressMap().get(filePath);
+ if (fileProgress != null && fileProgress.getPointer() != null)
{
+ savedPointer = fileProgress.getPointer();
+ }
+ }
+
+ if (savedPointer <= 0) {
+ return;
+ }
+
+ File file = new File(filePath);
+ if (!file.exists() || savedPointer <= file.length()) {
+ return;
+ }
+
+ long lastCheckTime = fileTruncationCheckMap.getOrDefault(filePath,
0L);
+ long currentTime = System.currentTimeMillis();
+ long currentFileLength = file.length();
+ boolean isImmediateTruncation = currentFileLength == 0 ||
currentFileLength < savedPointer * 0.1;
+
+ if (isImmediateTruncation || (lastCheckTime > 0 && (currentTime -
lastCheckTime) > 15000)) {
+ log.warn("File truncation detected: pointer {} > length {} for
file:{}, restarting read",
+ savedPointer, currentFileLength, filePath);
+
+ // Shutdown and restart ReadListener for this file
+ for (ReadListener readListener :
defaultMonitorListener.getReadListenerList()) {
+ if (readListener instanceof
com.xiaomi.mone.file.listener.OzHeraReadListener) {
+ com.xiaomi.mone.file.LogFile2 logFile =
+
((com.xiaomi.mone.file.listener.OzHeraReadListener) readListener).getLogFile();
+ if (logFile != null &&
filePath.equals(logFile.getFile())) {
+ logFile.shutdown();
+
defaultMonitorListener.remove(logFile.getFileKey());
+ break;
+ }
+ }
+ }
+ }
+ fileTruncationCheckMap.put(filePath, currentTime);
+ } catch (Exception e) {
+ log.debug("Error checking file truncation for path:{}", filePath,
e);
+ }
+ }
+
// Check for inode changes: when a file with the same path is deleted and
recreated with a new inode
// DefaultMonitorListener handles file deletion, but may not detect inode
changes for files with the same path
private void checkAndShutdownInodeChangedReadListeners() {
@@ -522,5 +582,6 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
fileCollFuture.cancel(false);
}
lineMessageList.clear();
+ fileTruncationCheckMap.clear();
}
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
index dce80f31..4c28c01d 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
@@ -49,6 +49,12 @@ public class ExecutorUtil {
return STP_EXECUTOR.scheduleAtFixedRate(command, initialDelay, period,
unit);
}
+ public static ScheduledFuture<?> schedule(Runnable command,
+ long delay,
+ TimeUnit unit) {
+ return STP_EXECUTOR.schedule(command, delay, unit);
+ }
+
public static ExecutorService createPool(String name) {
String configValue =
ConfigUtils.getConfigValue("jdk.virtualThreadScheduler.parallelism");
if (StringUtils.isEmpty(configValue) || Objects.equals("-1",
configValue)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]