This is an automated email from the ASF dual-hosted git repository.

gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 64c082ed feat: Optimize log collection exception handling and file 
rotation detection mechanism (#614)
64c082ed is described below

commit 64c082eddcacaf5ded7f2edbd1f382c452e5f320
Author: Yandi Lee <[email protected]>
AuthorDate: Tue Nov 25 11:11:30 2025 +0800

    feat: Optimize log collection exception handling and file rotation 
detection mechanism (#614)
    
    * fix: solve the blocking problem caused by serverless startup
    
    * 优化日志采集异常处理和快速切分场景支持
    
    - 新增文件读取异常自动重试机制(最多5次,区分异常类型)
    - 增加运行时文件截断检测(15秒间隔,立即截断立即处理)
    - 优化快速日志切分处理(关键事件1秒频率限制,检测间隔15秒)
    - 改进文件不存在时的监控策略
    - 修复快速切分场景下的数据丢失问题
    
    * Add file truncation detection for copytruncate log rotation
    
    - Add checkFileTruncation() to detect pointer > file.length()
    - Handle immediate truncation immediately, gradual truncation with 15s
      confirmation
    - Automatically restart ReadListener when truncation detected
    - Complement inode detection to cover all log rotation scenarios
    
    * Optimizes file truncation detection
    
    Enhances the file truncation detection mechanism during log rotation by 
directly utilizing the channel memory to retrieve the saved pointer. This 
removes the reliance on iterating through read listeners, resulting in improved 
efficiency.
    
    Adds a schedule method to the ExecutorUtil.
    
    ---------
    
    Co-authored-by: wtt <[email protected]>
    Co-authored-by: wtt <[email protected]>
---
 ozhera-log/log-agent/pom.xml                       |   2 +-
 .../log/agent/channel/ChannelServiceImpl.java      | 202 ++++++++++++++++++++-
 .../agent/channel/WildcardChannelServiceImpl.java  |  61 +++++++
 .../ozhera/log/agent/common/ExecutorUtil.java      |   6 +
 4 files changed, 261 insertions(+), 10 deletions(-)

diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml
index b2145662..3a29698a 100644
--- a/ozhera-log/log-agent/pom.xml
+++ b/ozhera-log/log-agent/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>log-agent</artifactId>
-    <version>2.2.13-SNAPSHOT</version>
+    <version>2.2.14-SNAPSHOT</version>
 
     <properties>
         <maven.compiler.source>21</maven.compiler.source>
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 97ce773b..d4eb0dae 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -86,6 +86,16 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
     private final Map<String, Long> fileReadMap = new ConcurrentHashMap<>();
 
     private final Map<String, Pair<MLog, AtomicReference<ReadResult>>> 
resultMap = new ConcurrentHashMap<>();
+    
+    /**
+     * Track file read exceptions for retry mechanism
+     */
+    private final Map<String, Integer> fileExceptionCountMap = new 
ConcurrentHashMap<>();
+    
+    /**
+     * Track last file truncation check time
+     */
+    private final Map<String, Long> fileTruncationCheckMap = new 
ConcurrentHashMap<>();
 
     private ScheduledFuture<?> lastFileLineScheduledFuture;
 
@@ -245,6 +255,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
 
     private void startDeletedFileCleanupTask() {
         // Periodically cleanup files that no longer exist or have inode 
changes to release file handles
+        // Check more frequently (every 15 seconds) to reduce truncation 
detection delay for fast rotation
         deletedFileCleanupFuture = ExecutorUtil.scheduleAtFixedRate(() -> 
SafeRun.run(() -> {
             for (String path : new ArrayList<>(logFileMap.keySet())) {
                 if (!FileUtil.exist(path)) {
@@ -258,9 +269,80 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                     log.info("deleted file cleanup trigger for inode changed 
path:{}, oldInode:{}, newInode:{}",
                             path, inodeInfo != null ? inodeInfo[0] : 
"unknown", inodeInfo != null ? inodeInfo[1] : "unknown");
                     cleanFile(path::equals);
+                    // Inode change usually means log rotation (move/rename), 
trigger reOpen to restart reading
+                    // reOpen has complete logic including frequency limit and 
state checks
+                    reOpen(path);
+                    continue;
                 }
+                
+                // Check for file truncation during runtime (handles 
copytruncate rotation)
+                checkFileTruncation(path);
+            }
+        }), 15, 15, TimeUnit.SECONDS);
+    }
+    
+    /**
+     * Check if file has been truncated during runtime (handles copytruncate 
log rotation)
+     */
+    private void checkFileTruncation(String filePath) {
+        try {
+            ChannelMemory.FileProgress fileProgress = 
channelMemory.getFileProgressMap().get(filePath);
+            if (fileProgress == null || fileProgress.getPointer() <= 0) {
+                return;
+            }
+            
+            java.io.File file = new java.io.File(filePath);
+            if (!file.exists()) {
+                return;
+            }
+            
+            long savedPointer = fileProgress.getPointer();
+            long currentFileLength = file.length();
+            
+            if (savedPointer > currentFileLength) {
+                long lastCheckTime = 
fileTruncationCheckMap.getOrDefault(filePath, 0L);
+                long currentTime = System.currentTimeMillis();
+                boolean isImmediateTruncation = currentFileLength == 0 || 
currentFileLength < savedPointer * 0.1;
+                
+                // For immediate truncation, handle immediately; for gradual, 
reduce confirmation time to 15 seconds
+                if (isImmediateTruncation || (lastCheckTime > 0 && 
(currentTime - lastCheckTime) > 15000)) {
+                    log.warn("File truncation detected: pointer {} > length {} 
for file:{}, resetting to 0",
+                            savedPointer, currentFileLength, filePath);
+                    fileProgress.setPointer(0L);
+                    fileProgress.setCurrentRowNum(0L);
+                    
+                    ILogFile logFile = logFileMap.get(filePath);
+                    if (logFile != null && !logFile.getExceptionFinish()) {
+                        handleFileTruncationReopen(filePath);
+                    }
+                }
+                fileTruncationCheckMap.put(filePath, currentTime);
+            } else {
+                fileTruncationCheckMap.remove(filePath);
+            }
+        } catch (Exception e) {
+            log.debug("Error checking file truncation for path:{}", filePath, 
e);
+        }
+    }
+    
+    /**
+     * Handle file reopen for truncation cases, bypassing frequency limit
+     * Optimized for fast rotation scenarios
+     */
+    private void handleFileTruncationReopen(String filePath) {
+        try {
+            ILogFile logFile = logFileMap.get(filePath);
+            if (logFile == null || logFile.getExceptionFinish()) {
+                readFile(channelDefine.getInput().getPatternCode(), filePath, 
getChannelId());
+                log.info("Recreated file handle after truncation, file:{}", 
filePath);
+            } else {
+                stopOldCurrentFileThread(filePath);
+                readFile(channelDefine.getInput().getPatternCode(), filePath, 
getChannelId());
+                log.info("Restarted file reading after truncation, file:{}", 
filePath);
             }
-        }), 60, 60, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("Error handling file truncation reopen, file:{}", 
filePath, e);
+        }
     }
 
 
@@ -435,14 +517,94 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                 try {
                     log.info("filePath:{},is VirtualThread {}, 
thread:{},id:{}", filePath, Thread.currentThread().isVirtual(), 
Thread.currentThread(), Thread.currentThread().threadId());
                     logFile.readLine();
+                    // Reset exception count on successful read
+                    fileExceptionCountMap.remove(filePath);
                 } catch (Exception e) {
-                    logFile.setExceptionFinish();
-                    log.error("logFile read line 
err,channelId:{},file:{},patternCode:{}", channelId, fileProgressMap, 
patternCode, e);
+                    if (isTruncationException(e, filePath)) {
+                        log.warn("File truncation detected during read, 
file:{}, will restart reading", filePath, e);
+                        handleFileTruncationReopen(filePath);
+                    } else {
+                        handleFileReadException(filePath, e, patternCode, 
channelId);
+                    }
                 }
             });
             futureMap.put(filePath, future);
         } else {
-            log.info("file not exist,file:{}", filePath);
+            log.warn("file not exist,file:{}, will monitor for file creation", 
filePath);
+        }
+    }
+    
+    /**
+     * Check if exception is related to file truncation
+     */
+    private boolean isTruncationException(Exception e, String filePath) {
+        try {
+            ChannelMemory.FileProgress fileProgress = 
channelMemory.getFileProgressMap().get(filePath);
+            if (fileProgress != null) {
+                java.io.File file = new java.io.File(filePath);
+                if (file.exists() && fileProgress.getPointer() > 
file.length()) {
+                    return true;
+                }
+            }
+            String errorMsg = e.getMessage();
+            if (errorMsg != null) {
+                String lowerMsg = errorMsg.toLowerCase();
+                return lowerMsg.contains("truncat") || 
lowerMsg.contains("invalid position") || lowerMsg.contains("position out of 
range");
+            }
+        } catch (Exception ignored) {
+            // Ignore check errors
+        }
+        return false;
+    }
+    
+    /**
+     * Handle file read exceptions with retry mechanism and error 
classification
+     */
+    private void handleFileReadException(String filePath, Exception e, String 
patternCode, Long channelId) {
+        int exceptionCount = fileExceptionCountMap.getOrDefault(filePath, 0) + 
1;
+        fileExceptionCountMap.put(filePath, exceptionCount);
+        
+        String errorMsg = e.getMessage();
+        boolean isPermissionError = e instanceof 
java.nio.file.AccessDeniedException 
+                || (errorMsg != null && (errorMsg.contains("Permission 
denied") || errorMsg.contains("access denied")));
+        boolean isFileNotFound = e instanceof java.io.FileNotFoundException 
+                || (errorMsg != null && errorMsg.contains("No such file"));
+        
+        if (isPermissionError) {
+            log.error("File permission denied, channelId:{}, file:{}, 
patternCode:{}, exceptionCount:{}", 
+                    channelId, filePath, patternCode, exceptionCount, e);
+            ILogFile logFile = logFileMap.get(filePath);
+            if (logFile != null) {
+                logFile.setExceptionFinish();
+            }
+            return;
+        }
+        
+        if (isFileNotFound) {
+            log.warn("File not found during read, channelId:{}, file:{}, will 
retry when file is created", 
+                    channelId, filePath, exceptionCount);
+            return;
+        }
+        
+        final int MAX_RETRY_COUNT = 5;
+        final long RETRY_DELAY_SECONDS = 30;
+        
+        if (exceptionCount <= MAX_RETRY_COUNT) {
+            log.warn("File read error (retry {}/{}), channelId:{}, file:{}, 
will retry in {} seconds", 
+                    exceptionCount, MAX_RETRY_COUNT, channelId, filePath, 
RETRY_DELAY_SECONDS, e);
+            ExecutorUtil.schedule(() -> SafeRun.run(() -> {
+                if (FileUtil.exist(filePath)) {
+                    log.info("Retrying file read after error, file:{}", 
filePath);
+                    readFile(patternCode, filePath, channelId);
+                }
+            }), RETRY_DELAY_SECONDS, TimeUnit.SECONDS);
+        } else {
+            log.error("File read error exceeded max retries ({}), 
channelId:{}, file:{}, marking as finished", 
+                    MAX_RETRY_COUNT, channelId, filePath, e);
+            ILogFile logFile = logFileMap.get(filePath);
+            if (logFile != null) {
+                logFile.setExceptionFinish();
+            }
         }
     }
 
@@ -573,6 +735,8 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         reOpenMap.clear();
         fileReadMap.clear();
         resultMap.clear();
+        fileExceptionCountMap.clear();
+        fileTruncationCheckMap.clear();
     }
 
     public Long getChannelId() {
@@ -597,17 +761,29 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
     public void reOpen(String filePath) {
         fileReopenLock.lock();
         try {
-            //Judging the number of openings, it can only be reopened once 
within 10 seconds.
-            final long REOPEN_THRESHOLD = 10 * 1000;
+            // Check if this is a critical rotation event (inode change or 
truncation) that needs immediate handling
+            boolean isCriticalRotation = 
ChannelUtil.isInodeChanged(channelMemory, filePath);
+            if (!isCriticalRotation && FileUtil.exist(filePath)) {
+                ChannelMemory.FileProgress fileProgress = 
channelMemory.getFileProgressMap().get(filePath);
+                if (fileProgress != null && fileProgress.getPointer() > 0) {
+                    java.io.File file = new java.io.File(filePath);
+                    // Check if file was truncated (common in fast rotation 
scenarios)
+                    isCriticalRotation = fileProgress.getPointer() > 
file.length();
+                }
+            }
+            
+            // For critical rotation events, bypass frequency limit to avoid 
data loss
+            final long REOPEN_THRESHOLD = isCriticalRotation ? 1000 : 10 * 
1000; // 1 second for critical, 10 seconds for normal
 
-            if (reOpenMap.containsKey(filePath) && 
Instant.now().toEpochMilli() - reOpenMap.get(filePath) < REOPEN_THRESHOLD) {
+            if (!isCriticalRotation && reOpenMap.containsKey(filePath) && 
+                    Instant.now().toEpochMilli() - reOpenMap.get(filePath) < 
REOPEN_THRESHOLD) {
                 log.info("The file has been opened too frequently.Please try 
again in 10 seconds.fileName:{}," +
                         "last time opening time.:{}", filePath, 
reOpenMap.get(filePath));
                 return;
             }
 
             reOpenMap.put(filePath, Instant.now().toEpochMilli());
-            log.info("reOpen file:{}", filePath);
+            log.info("reOpen file:{}, isCriticalRotation:{}", filePath, 
isCriticalRotation);
 
             if (collectOnce) {
                 
handleAllFileCollectMonitor(channelDefine.getInput().getPatternCode(), 
filePath, getChannelId());
@@ -618,7 +794,6 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
             String tailPodIp = getTailPodIp(filePath);
             String ip = StringUtils.isBlank(tailPodIp) ? NetUtil.getLocalIp() 
: tailPodIp;
             if (null == logFile || logFile.getExceptionFinish()) {
-                // Add new log file
                 readFile(channelDefine.getInput().getPatternCode(), filePath, 
getChannelId());
                 log.info("watch new file create for 
channelId:{},ip:{},path:{}", getChannelId(), filePath, ip);
             } else {
@@ -770,6 +945,15 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         for (String delFile : delFiles) {
             resultMap.remove(delFile);
         }
+        
+        // Clean up exception count and truncation check maps
+        delFiles = fileExceptionCountMap.keySet().stream()
+                .filter(filePath -> filter.test(filePath))
+                .collect(Collectors.toList());
+        for (String delFile : delFiles) {
+            fileExceptionCountMap.remove(delFile);
+            fileTruncationCheckMap.remove(delFile);
+        }
     }
 
     @Override
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index a991ecc4..b1778881 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -108,6 +108,11 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
     private DefaultMonitorListener defaultMonitorListener;
 
     private HeraFileMonitor fileMonitor;
+    
+    /**
+     * Track file truncation check time to avoid duplicate handling
+     */
+    private final Map<String, Long> fileTruncationCheckMap = new 
ConcurrentHashMap<>();
 
 
     public WildcardChannelServiceImpl(MsgExporter msgExporter, 
AgentMemoryService memoryService,
@@ -208,10 +213,65 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
             if (!file.exists()) {
                 log.info("cleanUpInvalidFileInfos: file no longer exists, 
removing from cache, cacheKey:{}, filePath:{}", cacheKey, filePath);
                 FileInfoCache.ins().remove(cacheKey);
+                continue;
             }
+            
+            // Check for file truncation (handles copytruncate log rotation)
+            checkFileTruncation(filePath);
         }
     }
 
+    /**
+     * Check if file has been truncated during runtime (handles copytruncate 
log rotation)
+     */
+    private void checkFileTruncation(String filePath) {
+        try {
+            // Get saved pointer from channelMemory
+            long savedPointer = 0L;
+            if (channelMemory != null) {
+                ChannelMemory.FileProgress fileProgress = 
channelMemory.getFileProgressMap().get(filePath);
+                if (fileProgress != null && fileProgress.getPointer() != null) 
{
+                    savedPointer = fileProgress.getPointer();
+                }
+            }
+            
+            if (savedPointer <= 0) {
+                return;
+            }
+            
+            File file = new File(filePath);
+            if (!file.exists() || savedPointer <= file.length()) {
+                return;
+            }
+            
+            long lastCheckTime = fileTruncationCheckMap.getOrDefault(filePath, 
0L);
+            long currentTime = System.currentTimeMillis();
+            long currentFileLength = file.length();
+            boolean isImmediateTruncation = currentFileLength == 0 || 
currentFileLength < savedPointer * 0.1;
+            
+            if (isImmediateTruncation || (lastCheckTime > 0 && (currentTime - 
lastCheckTime) > 15000)) {
+                log.warn("File truncation detected: pointer {} > length {} for 
file:{}, restarting read",
+                        savedPointer, currentFileLength, filePath);
+                
+                // Shutdown and restart ReadListener for this file
+                for (ReadListener readListener : 
defaultMonitorListener.getReadListenerList()) {
+                    if (readListener instanceof 
com.xiaomi.mone.file.listener.OzHeraReadListener) {
+                        com.xiaomi.mone.file.LogFile2 logFile = 
+                                
((com.xiaomi.mone.file.listener.OzHeraReadListener) readListener).getLogFile();
+                        if (logFile != null && 
filePath.equals(logFile.getFile())) {
+                            logFile.shutdown();
+                            
defaultMonitorListener.remove(logFile.getFileKey());
+                            break;
+                        }
+                    }
+                }
+            }
+            fileTruncationCheckMap.put(filePath, currentTime);
+        } catch (Exception e) {
+            log.debug("Error checking file truncation for path:{}", filePath, 
e);
+        }
+    }
+    
     // Check for inode changes: when a file with the same path is deleted and 
recreated with a new inode
     // DefaultMonitorListener handles file deletion, but may not detect inode 
changes for files with the same path
     private void checkAndShutdownInodeChangedReadListeners() {
@@ -522,5 +582,6 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
             fileCollFuture.cancel(false);
         }
         lineMessageList.clear();
+        fileTruncationCheckMap.clear();
     }
 }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
index dce80f31..4c28c01d 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
@@ -49,6 +49,12 @@ public class ExecutorUtil {
         return STP_EXECUTOR.scheduleAtFixedRate(command, initialDelay, period, 
unit);
     }
 
+    public static ScheduledFuture<?> schedule(Runnable command,
+                                              long delay,
+                                              TimeUnit unit) {
+        return STP_EXECUTOR.schedule(command, delay, unit);
+    }
+
     public static ExecutorService createPool(String name) {
         String configValue = 
ConfigUtils.getConfigValue("jdk.virtualThreadScheduler.parallelism");
         if (StringUtils.isEmpty(configValue) || Objects.equals("-1", 
configValue)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to