This is an automated email from the ASF dual-hosted git repository.

wangtao29 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 862fe35f fix: log agent leak file node may causing file never close 
after deleted (#610)
862fe35f is described below

commit 862fe35f08956dbcdfced0beaf8994926f43c636
Author: Yandi Lee <[email protected]>
AuthorDate: Fri Nov 14 16:56:18 2025 +0800

    fix: log agent leak file node may causing file never close after deleted 
(#610)
    
    * Fixes log-agent file handle leak
    
    Addresses a file handle leak issue by periodically cleaning up file handles 
for deleted files and files with inode changes.
    
    The changes introduce a scheduled task to detect and close file handles 
associated with files that no longer exist or have had their inodes changed 
(e.g., due to log rotation). This ensures that file resources are properly 
released, preventing resource exhaustion and improving the stability of the log 
agent.
    
    Also, adds inode change detection to `handleAllFileCollectMonitor` and 
`reOpen` methods to handle cases where files are recreated with the same name 
but a different inode. Truncated files with same inode are also handled.
    
    * refactor: upgrade log-agent module version to 2.2.12-SNAPSHOT
    
    ---------
    
    Co-authored-by: liyandi <[email protected]>
---
 ozhera-log/log-agent/pom.xml                       |   2 +-
 .../log/agent/channel/ChannelServiceImpl.java      |  82 +++++++++++++--
 .../agent/channel/WildcardChannelServiceImpl.java  |  61 ++++++++++-
 .../log/agent/channel/file/FileListener.java       |   4 +-
 .../listener/DefaultFileMonitorListener.java       |  22 +++-
 .../ozhera/log/agent/common/ChannelUtil.java       |  65 ++++++++++++
 .../channel/listener/InodeChangeDetectionTest.java | 116 +++++++++++++++++++++
 7 files changed, 338 insertions(+), 14 deletions(-)

diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml
index 5eed46e5..5b62ad15 100644
--- a/ozhera-log/log-agent/pom.xml
+++ b/ozhera-log/log-agent/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>log-agent</artifactId>
-    <version>2.2.11-SNAPSHOT</version>
+    <version>2.2.12-SNAPSHOT</version>
 
     <properties>
         <maven.compiler.source>21</maven.compiler.source>
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 67318f71..97ce773b 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -103,6 +103,8 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
 
     private ScheduledFuture<?> scheduledFuture;
 
+    private ScheduledFuture<?> deletedFileCleanupFuture;
+
     /**
      * collect once flag
      */
@@ -193,6 +195,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         startCollectFile(channelId, input, patterns);
 
         startExportQueueDataThread();
+        startDeletedFileCleanupTask();
         memoryService.refreshMemory(channelMemory);
         log.warn("channelId:{}, channelInstanceId:{} start success! 
channelDefine:{}", channelId, instanceId(), gson.toJson(this.channelDefine));
     }
@@ -240,11 +243,39 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         lastLineRemainSendSchedule(input.getPatternCode());
     }
 
+    private void startDeletedFileCleanupTask() {
+        // Periodically cleanup files that no longer exist or have inode 
changes to release file handles
+        deletedFileCleanupFuture = ExecutorUtil.scheduleAtFixedRate(() -> 
SafeRun.run(() -> {
+            for (String path : new ArrayList<>(logFileMap.keySet())) {
+                if (!FileUtil.exist(path)) {
+                    log.info("deleted file cleanup trigger for path:{}", path);
+                    cleanFile(path::equals);
+                    continue;
+                }
+                
+                if (ChannelUtil.isInodeChanged(channelMemory, path)) {
+                    Long[] inodeInfo = ChannelUtil.getInodeInfo(channelMemory, 
path);
+                    log.info("deleted file cleanup trigger for inode changed 
path:{}, oldInode:{}, newInode:{}",
+                            path, inodeInfo != null ? inodeInfo[0] : 
"unknown", inodeInfo != null ? inodeInfo[1] : "unknown");
+                    cleanFile(path::equals);
+                }
+            }
+        }), 60, 60, TimeUnit.SECONDS);
+    }
+
 
     private void handleAllFileCollectMonitor(String patternCode, String 
newFilePath, Long channelId) {
         if (logFileMap.keySet().stream().anyMatch(key -> 
Objects.equals(newFilePath, key))) {
-            log.info("collectOnce open file:{}", newFilePath);
-            logFileMap.get(newFilePath).setReOpen(true);
+            if (ChannelUtil.isInodeChanged(channelMemory, newFilePath)) {
+                Long[] inodeInfo = ChannelUtil.getInodeInfo(channelMemory, 
newFilePath);
+                log.info("handleAllFileCollectMonitor: inode changed for 
path:{}, oldInode:{}, newInode:{}, cleaning up old file handle",
+                        newFilePath, inodeInfo != null ? inodeInfo[0] : 
"unknown", inodeInfo != null ? inodeInfo[1] : "unknown");
+                cleanFile(newFilePath::equals);
+                readFile(patternCode, newFilePath, channelId);
+            } else {
+                log.info("collectOnce open file:{}", newFilePath);
+                logFileMap.get(newFilePath).setReOpen(true);
+            }
         } else {
             readFile(patternCode, newFilePath, channelId);
         }
@@ -384,9 +415,20 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
             log.warn("file:{} marked stop to collect", filePath);
             return;
         }
-        //Determine whether the file exists
         if (FileUtil.exist(filePath)) {
-            stopOldCurrentFileThread(filePath);
+            // Stop old file thread if exists (will close file handle via 
setStop(true))
+            // Note: inode change detection is handled by periodic cleanup task
+            if (logFileMap.containsKey(filePath)) {
+                if (ChannelUtil.isInodeChanged(channelMemory, filePath)) {
+                    Long[] inodeInfo = ChannelUtil.getInodeInfo(channelMemory, 
filePath);
+                    log.info("readFile: inode changed for path:{}, 
oldInode:{}, newInode:{}, stopping old file thread",
+                            filePath, inodeInfo != null ? inodeInfo[0] : 
"unknown", inodeInfo != null ? inodeInfo[1] : "unknown");
+                    // Clean up all related maps for inode change case
+                    cleanFile(filePath::equals);
+                } else {
+                    stopOldCurrentFileThread(filePath);
+                }
+            }
             log.info("start to collect file,channelId:{},fileName:{}", 
channelId, filePath);
             logFileMap.put(filePath, logFile);
             Future<?> future = getExecutorServiceByType(logTypeEnum).submit(() 
-> {
@@ -461,6 +503,16 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                             filePath, gson.toJson(memoryUnixFileNode), 
gson.toJson(currentUnixFileNode));
                 }
             }
+            
+            // Check if pointer exceeds file length (file may have been 
truncated but inode unchanged)
+            // This handles the case where log rotation truncates the file 
without changing inode
+            java.io.File file = new java.io.File(filePath);
+            if (file.exists() && pointer > file.length()) {
+                log.warn("pointer {} exceeds file length {} for file:{}, 
resetting to 0 (file may have been truncated)",
+                        pointer, file.length(), filePath);
+                pointer = 0L;
+                lineNumber = 0L;
+            }
         }
         ChannelEngine channelEngine = Ioc.ins().getBean(ChannelEngine.class);
         ILogFile logFile = channelEngine.logFile();
@@ -510,6 +562,9 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         if (null != lastFileLineScheduledFuture) {
             lastFileLineScheduledFuture.cancel(false);
         }
+        if (null != deletedFileCleanupFuture) {
+            deletedFileCleanupFuture.cancel(false);
+        }
         for (Future future : futureMap.values()) {
             future.cancel(false);
         }
@@ -567,7 +622,16 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                 readFile(channelDefine.getInput().getPatternCode(), filePath, 
getChannelId());
                 log.info("watch new file create for 
channelId:{},ip:{},path:{}", getChannelId(), filePath, ip);
             } else {
-                handleExistingLogFileWithRetry(logFile, filePath, ip);
+                if (ChannelUtil.isInodeChanged(channelMemory, filePath)) {
+                    Long[] inodeInfo = ChannelUtil.getInodeInfo(channelMemory, 
filePath);
+                    log.info("reOpen: inode changed for path:{}, oldInode:{}, 
newInode:{}, cleaning up old file handle",
+                            filePath, inodeInfo != null ? inodeInfo[0] : 
"unknown", inodeInfo != null ? inodeInfo[1] : "unknown");
+                    cleanFile(filePath::equals);
+                    readFile(channelDefine.getInput().getPatternCode(), 
filePath, getChannelId());
+                    log.info("reOpen: file reopened after inode change for 
channelId:{},ip:{},path:{}", getChannelId(), filePath, ip);
+                } else {
+                    handleExistingLogFileWithRetry(logFile, filePath, ip);
+                }
             }
         } finally {
             fileReopenLock.unlock();
@@ -642,7 +706,13 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
     private void delCollFile(String path) {
         boolean shouldRemovePath = false;
         if (logFileMap.containsKey(path) && fileReadMap.containsKey(path)) {
-            if ((Instant.now().toEpochMilli() - fileReadMap.get(path)) > 
TimeUnit.MINUTES.toMillis(1)) {
+            if (ChannelUtil.isInodeChanged(channelMemory, path)) {
+                Long[] inodeInfo = ChannelUtil.getInodeInfo(channelMemory, 
path);
+                log.info("delCollFile trigger for inode changed path:{}, 
oldInode:{}, newInode:{}",
+                        path, inodeInfo != null ? inodeInfo[0] : "unknown", 
inodeInfo != null ? inodeInfo[1] : "unknown");
+                cleanFile(path::equals);
+                shouldRemovePath = true;
+            } else if ((Instant.now().toEpochMilli() - fileReadMap.get(path)) 
> TimeUnit.MINUTES.toMillis(1)) {
                 cleanFile(path::equals);
                 shouldRemovePath = true;
                 log.info("stop coll file:{}", path);
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index 0b401efd..a991ecc4 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -49,6 +49,7 @@ import java.io.File;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
@@ -186,20 +187,74 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
 
     // 清理无效的文件信息的方法
     private void cleanUpInvalidFileInfos() {
+        // Check for inode changes (files with same path but different inode)
+        checkAndShutdownInodeChangedReadListeners();
+
+        // Clean up FileInfoCache for files that no longer exist
         ConcurrentMap<String, FileInfo> caches = FileInfoCache.ins().caches();
 
         for (Iterator<Map.Entry<String, FileInfo>> iterator = 
caches.entrySet().iterator(); iterator.hasNext(); ) {
             Map.Entry<String, FileInfo> entry = iterator.next();
             FileInfo fileInfo = entry.getValue();
-            File file = new File(fileInfo.getFileName());
+            String filePath = fileInfo.getFileName();
+            String cacheKey = entry.getKey();
 
-            if (StringUtils.isEmpty(fileInfo.getFileName())) {
+            if (StringUtils.isEmpty(filePath)) {
                 continue;
             }
 
+            File file = new File(filePath);
+            // Check if file no longer exists
             if (!file.exists()) {
-                FileInfoCache.ins().remove(entry.getKey());
+                log.info("cleanUpInvalidFileInfos: file no longer exists, 
removing from cache, cacheKey:{}, filePath:{}", cacheKey, filePath);
+                FileInfoCache.ins().remove(cacheKey);
+            }
+        }
+    }
+
+    // Check for inode changes: when a file with the same path is deleted and 
recreated with a new inode
+    // DefaultMonitorListener handles file deletion, but may not detect inode 
changes for files with the same path
+    private void checkAndShutdownInodeChangedReadListeners() {
+        try {
+            ConcurrentHashMap<String, com.xiaomi.mone.file.ozhera.HeraFile> 
fileMap = fileMonitor.getFileMap();
+            ConcurrentHashMap<Object, com.xiaomi.mone.file.ozhera.HeraFile> 
map = fileMonitor.getMap();
+            
+            List<ReadListener> listeners = 
defaultMonitorListener.getReadListenerList();
+            
+            for (ReadListener readListener : listeners) {
+                if (readListener instanceof 
com.xiaomi.mone.file.listener.OzHeraReadListener) {
+                    com.xiaomi.mone.file.listener.OzHeraReadListener 
ozHeraReadListener = 
+                            (com.xiaomi.mone.file.listener.OzHeraReadListener) 
readListener;
+                    com.xiaomi.mone.file.LogFile2 logFile = 
ozHeraReadListener.getLogFile();
+                    if (logFile == null) {
+                        continue;
+                    }
+                    
+                    String filePath = logFile.getFile();
+                    Object logFileKey = logFile.getFileKey();
+                    
+                    // Check if file path still exists in fileMap (may be new 
file with same path but different inode)
+                    com.xiaomi.mone.file.ozhera.HeraFile currentHeraFile = 
fileMap.get(filePath);
+                    
+                    // Shutdown old file handle if inode changed (same path, 
different inode)
+                    if (currentHeraFile != null && !Objects.equals(logFileKey, 
currentHeraFile.getFileKey())) {
+                        log.info("inode changed for path:{}, oldInode:{}, 
newInode:{}, shutting down old file handle",
+                                filePath, logFileKey, 
currentHeraFile.getFileKey());
+                        logFile.shutdown();
+                        // Remove old listener from readListenerMap (new file 
with same path has different inode)
+                        defaultMonitorListener.remove(logFileKey);
+                    } else if (currentHeraFile == null && 
ChannelUtil.isInodeChanged(channelMemory, filePath)) {
+                        // File removed from fileMap but ReadListener still 
exists, inode changed
+                        log.info("file removed from fileMap but inode changed, 
path:{}, oldInode:{}, shutting down old file handle",
+                                filePath, logFileKey);
+                        logFile.shutdown();
+                        // Remove listener from readListenerMap
+                        defaultMonitorListener.remove(logFileKey);
+                    }
+                }
             }
+        } catch (Exception e) {
+            log.error("Error checking and shutting down inode changed 
ReadListeners", e);
         }
     }
 
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileListener.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileListener.java
index 67768a34..a8abbf55 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileListener.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileListener.java
@@ -66,7 +66,9 @@ public class FileListener implements FileAlterationListener {
 
     @Override
     public void onFileDelete(File file) {
-
+        log.info("onFileDelete:" + file.getAbsolutePath());
+        // File deletion is handled by startDeletedFileCleanupTask and 
ordinaryFileChanged's existence check
+        // No need to trigger consumer here to avoid duplicate handling
     }
 
     @Override
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
index 0b9c6986..6bdb424d 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
@@ -31,6 +31,7 @@ import org.apache.ozhera.log.agent.channel.ChannelService;
 import org.apache.ozhera.log.agent.channel.WildcardChannelServiceImpl;
 import org.apache.ozhera.log.agent.channel.file.FileMonitor;
 import org.apache.ozhera.log.agent.channel.file.MonitorFile;
+import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
 import org.apache.ozhera.log.agent.common.ChannelUtil;
 import org.apache.ozhera.log.agent.common.ExecutorUtil;
 import org.apache.ozhera.log.api.enums.LogTypeEnum;
@@ -237,11 +238,21 @@ public class DefaultFileMonitorListener implements 
FileMonitorListener {
 
     /**
      * Normal file change event handling
+     * Note: File deletion and inode change detection are handled by:
+     * - startDeletedFileCleanupTask() in ChannelServiceImpl (periodic cleanup)
+     * - reOpen() method in ChannelServiceImpl (inode change check)
      */
     private void ordinaryFileChanged(String changedFilePath) {
         for (Map.Entry<List<MonitorFile>, ChannelService> channelServiceEntry 
: pathChannelServiceMap.entrySet()) {
             for (MonitorFile monitorFile : channelServiceEntry.getKey()) {
                 if 
(monitorFile.getFilePattern().matcher(changedFilePath).matches()) {
+                    ChannelService service = channelServiceEntry.getValue();
+                    if (service == null) {
+                        log.warn("ChannelService is null for monitorFile:{}", 
monitorFile.getMonitorFileExpress());
+                        continue;
+                    }
+                    
+                    // Reopen file (inode change and file deletion are handled 
by reOpen() and periodic cleanup)
                     String reOpenFilePath = monitorFile.getRealFilePath();
                     /**
                      * OPENTELEMETRY Special processing of logs
@@ -255,14 +266,19 @@ public class DefaultFileMonitorListener implements 
FileMonitorListener {
                     }
                     log.info("【change file path 
reopen】started,changedFilePath:{},realFilePath:{},monitorFileExpress:{}",
                             changedFilePath, reOpenFilePath, 
monitorFile.getMonitorFileExpress());
-                    channelServiceEntry.getValue().reOpen(reOpenFilePath);
-                    log.info("【end change file path】 
end,changedFilePath:{},realFilePath:{},monitorFileExpress:{},InstanceId:{}",
-                            changedFilePath, reOpenFilePath, 
monitorFile.getMonitorFileExpress(), 
channelServiceEntry.getValue().instanceId());
+                    try {
+                        service.reOpen(reOpenFilePath);
+                        log.info("【end change file path】 
end,changedFilePath:{},realFilePath:{},monitorFileExpress:{},InstanceId:{}",
+                                changedFilePath, reOpenFilePath, 
monitorFile.getMonitorFileExpress(), service.instanceId());
+                    } catch (Exception e) {
+                        log.error("Error reopening file:{}", reOpenFilePath, 
e);
+                    }
                 }
             }
         }
     }
 
+
     /**
      * Special file suffix change event handling Through actual observation,
      * the go project found that the error log file of the log is 
server.log.wf,
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
index 27a0f1a3..ac7d60fa 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
@@ -101,6 +101,71 @@ public class ChannelUtil {
         return new ChannelMemory.UnixFileNode();
     }
 
+    /**
+     * Get inode pair (memory inode and current inode) for a file
+     *
+     * @param channelMemory the channel memory containing file progress
+     * @param filePath the file path to check
+     * @return array with [memoryInode, currentInode] or null if not available
+     */
+    private static ChannelMemory.UnixFileNode[] getInodePair(ChannelMemory 
channelMemory, String filePath) {
+        if (channelMemory == null || channelMemory.getFileProgressMap() == 
null) {
+            return null;
+        }
+        
+        ChannelMemory.FileProgress fileProgress = 
channelMemory.getFileProgressMap().get(filePath);
+        if (fileProgress == null || fileProgress.getUnixFileNode() == null) {
+            return null;
+        }
+        
+        ChannelMemory.UnixFileNode memoryInode = 
fileProgress.getUnixFileNode();
+        ChannelMemory.UnixFileNode currentInode = buildUnixFileNode(filePath);
+        
+        return new ChannelMemory.UnixFileNode[]{memoryInode, currentInode};
+    }
+
+    /**
+     * Check if file inode has changed by comparing memory inode with current 
file inode
+     *
+     * @param channelMemory the channel memory containing file progress
+     * @param filePath the file path to check
+     * @return true if inode has changed, false otherwise
+     */
+    public static boolean isInodeChanged(ChannelMemory channelMemory, String 
filePath) {
+        ChannelMemory.UnixFileNode[] inodePair = getInodePair(channelMemory, 
filePath);
+        if (inodePair == null) {
+            return false;
+        }
+        
+        ChannelMemory.UnixFileNode memoryInode = inodePair[0];
+        ChannelMemory.UnixFileNode currentInode = inodePair[1];
+        
+        return memoryInode.getSt_ino() != null && currentInode.getSt_ino() != 
null &&
+               !java.util.Objects.equals(memoryInode.getSt_ino(), 
currentInode.getSt_ino());
+    }
+
+    /**
+     * Get inode information for logging
+     *
+     * @param channelMemory the channel memory
+     * @param filePath the file path
+     * @return array with [oldInode, newInode] or null if not available
+     */
+    public static Long[] getInodeInfo(ChannelMemory channelMemory, String 
filePath) {
+        ChannelMemory.UnixFileNode[] inodePair = getInodePair(channelMemory, 
filePath);
+        if (inodePair == null) {
+            return null;
+        }
+        
+        ChannelMemory.UnixFileNode memoryInode = inodePair[0];
+        ChannelMemory.UnixFileNode currentInode = inodePair[1];
+        
+        if (memoryInode.getSt_ino() != null && currentInode.getSt_ino() != 
null) {
+            return new Long[]{memoryInode.getSt_ino(), 
currentInode.getSt_ino()};
+        }
+        return null;
+    }
+
     public static long countFilesRecursive(File directory) {
         long count = 0;
         File[] files = directory.listFiles();
diff --git 
a/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/channel/listener/InodeChangeDetectionTest.java
 
b/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/channel/listener/InodeChangeDetectionTest.java
new file mode 100644
index 00000000..acedefaa
--- /dev/null
+++ 
b/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/channel/listener/InodeChangeDetectionTest.java
@@ -0,0 +1,116 @@
+package org.apache.ozhera.log.agent.channel.listener;
+
+import org.apache.ozhera.log.agent.channel.AbstractChannelService;
+import org.apache.ozhera.log.agent.channel.ChannelDefine;
+import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
+import org.apache.ozhera.log.agent.channel.file.MonitorFile;
+import org.apache.ozhera.log.agent.common.ChannelUtil;
+import org.apache.ozhera.log.api.enums.LogTypeEnum;
+import org.apache.ozhera.log.api.model.meta.FilterConf;
+import org.apache.ozhera.log.agent.export.MsgExporter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Test for inode change detection when file is deleted and recreated with 
same name
+ * This simulates the scenario where log rotation deletes and recreates files 
with same name
+ */
+public class InodeChangeDetectionTest {
+
+    static class TestChannelService extends AbstractChannelService {
+        volatile CopyOnWriteArrayList<String> canceled = new 
CopyOnWriteArrayList<>();
+        volatile CopyOnWriteArrayList<String> reopened = new 
CopyOnWriteArrayList<>();
+        List<MonitorFile> monitors;
+        ChannelMemory channelMemory;
+        String filePath;
+
+        TestChannelService(List<MonitorFile> monitors, String filePath, 
ChannelMemory.UnixFileNode initialInode) {
+            this.monitors = monitors;
+            this.filePath = filePath;
+            this.channelMemory = new ChannelMemory();
+            Map<String, ChannelMemory.FileProgress> fileProgressMap = new 
HashMap<>();
+            ChannelMemory.FileProgress fileProgress = new 
ChannelMemory.FileProgress();
+            fileProgress.setUnixFileNode(initialInode);
+            fileProgressMap.put(filePath, fileProgress);
+            this.channelMemory.setFileProgressMap(fileProgressMap);
+        }
+
+        @Override public void start() {}
+        @Override public void close() {}
+        @Override public void deleteCollFile(String directory) {}
+        @Override public void cancelFile(String file) { canceled.add(file); }
+        @Override public void reOpen(String filePath) { 
reopened.add(filePath); }
+        @Override public void cleanCollectFiles() {}
+        @Override public void filterRefresh(List<FilterConf> confs) {}
+        @Override public void refresh(ChannelDefine channelDefine, MsgExporter 
msgExporter) {}
+        @Override public void stopFile(List<String> filePrefixList) {}
+        @Override public org.apache.ozhera.log.agent.channel.ChannelState 
state() { return null; }
+        @Override public List<MonitorFile> getMonitorPathList() { return 
monitors; }
+        @Override public ChannelDefine getChannelDefine() { ChannelDefine d = 
new ChannelDefine(); d.setChannelId(1L); return d; }
+        @Override public String instanceId() { return "test-inode"; }
+        @Override public ChannelMemory getChannelMemory() { return 
channelMemory; }
+        @Override public Map<String, Long> getExpireFileMap() { return new 
HashMap<>(); }
+        @Override public Long getLogCounts() { return 0L; }
+    }
+
+    @Test
+    public void ordinaryFileChanged_shouldCallReOpen() throws Exception {
+        Path temp = Files.createTempFile("agent-inode", ".log");
+        String realPath = temp.toString();
+        
+        TestChannelService service = new TestChannelService(
+                List.of(MonitorFile.of(realPath, realPath + ".*", 
LogTypeEnum.APP_LOG_SIGNAL, false)),
+                realPath, ChannelUtil.buildUnixFileNode(realPath));
+        DefaultFileMonitorListener listener = new DefaultFileMonitorListener();
+        listener.addChannelService(service);
+        
+        // Trigger file change event (ordinaryFileChanged now just calls 
reOpen)
+        Method m = 
DefaultFileMonitorListener.class.getDeclaredMethod("ordinaryFileChanged", 
String.class);
+        m.setAccessible(true);
+        m.invoke(listener, realPath);
+        
+        // Verify reOpen was called (inode change detection is now handled in 
reOpen() method)
+        Assert.assertTrue("File should be reopened",
+                service.reopened.stream().anyMatch(p -> p.equals(realPath) || 
p.contains(realPath)));
+        
+        Files.deleteIfExists(temp);
+    }
+
+    @Test
+    public void channelUtil_shouldDetectInodeChange() throws Exception {
+        Path temp = Files.createTempFile("agent-inode-util", ".log");
+        String realPath = temp.toString();
+        ChannelMemory.UnixFileNode initialInode = 
ChannelUtil.buildUnixFileNode(realPath);
+        
+        if (initialInode.getSt_ino() == null) {
+            Files.deleteIfExists(temp);
+            return;
+        }
+        
+        ChannelMemory channelMemory = new ChannelMemory();
+        Map<String, ChannelMemory.FileProgress> fileProgressMap = new 
HashMap<>();
+        ChannelMemory.FileProgress fileProgress = new 
ChannelMemory.FileProgress();
+        fileProgress.setUnixFileNode(initialInode);
+        fileProgressMap.put(realPath, fileProgress);
+        channelMemory.setFileProgressMap(fileProgressMap);
+        
+        Assert.assertFalse("Inode should not be changed initially",
+                ChannelUtil.isInodeChanged(channelMemory, realPath));
+        
+        Files.deleteIfExists(temp);
+        Files.createFile(temp);
+        
+        Assert.assertTrue("Inode should be changed after file recreation",
+                ChannelUtil.isInodeChanged(channelMemory, realPath));
+        
+        Files.deleteIfExists(temp);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to