This is an automated email from the ASF dual-hosted git repository.
wangtao29 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new 862fe35f fix: log agent leak file node may causing file never close
after deleted (#610)
862fe35f is described below
commit 862fe35f08956dbcdfced0beaf8994926f43c636
Author: Yandi Lee <[email protected]>
AuthorDate: Fri Nov 14 16:56:18 2025 +0800
fix: log agent leak file node may causing file never close after deleted
(#610)
* Fixes log-agent file handle leak
Addresses a file handle leak issue by periodically cleaning up file handles
for deleted files and files with inode changes.
The changes introduce a scheduled task to detect and close file handles
associated with files that no longer exist or have had their inodes changed
(e.g., due to log rotation). This ensures that file resources are properly
released, preventing resource exhaustion and improving the stability of the log
agent.
Also, adds inode change detection to `handleAllFileCollectMonitor` and
`reOpen` methods to handle cases where files are recreated with the same name
but a different inode. Truncated files with same inode are also handled.
* refactor: upgrade log-agent module version to 2.2.12-SNAPSHOT
---------
Co-authored-by: liyandi <[email protected]>
---
ozhera-log/log-agent/pom.xml | 2 +-
.../log/agent/channel/ChannelServiceImpl.java | 82 +++++++++++++--
.../agent/channel/WildcardChannelServiceImpl.java | 61 ++++++++++-
.../log/agent/channel/file/FileListener.java | 4 +-
.../listener/DefaultFileMonitorListener.java | 22 +++-
.../ozhera/log/agent/common/ChannelUtil.java | 65 ++++++++++++
.../channel/listener/InodeChangeDetectionTest.java | 116 +++++++++++++++++++++
7 files changed, 338 insertions(+), 14 deletions(-)
diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml
index 5eed46e5..5b62ad15 100644
--- a/ozhera-log/log-agent/pom.xml
+++ b/ozhera-log/log-agent/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<modelVersion>4.0.0</modelVersion>
<artifactId>log-agent</artifactId>
- <version>2.2.11-SNAPSHOT</version>
+ <version>2.2.12-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 67318f71..97ce773b 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -103,6 +103,8 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private ScheduledFuture<?> scheduledFuture;
+ private ScheduledFuture<?> deletedFileCleanupFuture;
+
/**
* collect once flag
*/
@@ -193,6 +195,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
startCollectFile(channelId, input, patterns);
startExportQueueDataThread();
+ startDeletedFileCleanupTask();
memoryService.refreshMemory(channelMemory);
log.warn("channelId:{}, channelInstanceId:{} start success!
channelDefine:{}", channelId, instanceId(), gson.toJson(this.channelDefine));
}
@@ -240,11 +243,39 @@ public class ChannelServiceImpl extends
AbstractChannelService {
lastLineRemainSendSchedule(input.getPatternCode());
}
+ private void startDeletedFileCleanupTask() {
+ // Periodically cleanup files that no longer exist or have inode
changes to release file handles
+ deletedFileCleanupFuture = ExecutorUtil.scheduleAtFixedRate(() ->
SafeRun.run(() -> {
+ for (String path : new ArrayList<>(logFileMap.keySet())) {
+ if (!FileUtil.exist(path)) {
+ log.info("deleted file cleanup trigger for path:{}", path);
+ cleanFile(path::equals);
+ continue;
+ }
+
+ if (ChannelUtil.isInodeChanged(channelMemory, path)) {
+ Long[] inodeInfo = ChannelUtil.getInodeInfo(channelMemory,
path);
+ log.info("deleted file cleanup trigger for inode changed
path:{}, oldInode:{}, newInode:{}",
+ path, inodeInfo != null ? inodeInfo[0] :
"unknown", inodeInfo != null ? inodeInfo[1] : "unknown");
+ cleanFile(path::equals);
+ }
+ }
+ }), 60, 60, TimeUnit.SECONDS);
+ }
+
private void handleAllFileCollectMonitor(String patternCode, String
newFilePath, Long channelId) {
if (logFileMap.keySet().stream().anyMatch(key ->
Objects.equals(newFilePath, key))) {
- log.info("collectOnce open file:{}", newFilePath);
- logFileMap.get(newFilePath).setReOpen(true);
+ if (ChannelUtil.isInodeChanged(channelMemory, newFilePath)) {
+ Long[] inodeInfo = ChannelUtil.getInodeInfo(channelMemory,
newFilePath);
+ log.info("handleAllFileCollectMonitor: inode changed for
path:{}, oldInode:{}, newInode:{}, cleaning up old file handle",
+ newFilePath, inodeInfo != null ? inodeInfo[0] :
"unknown", inodeInfo != null ? inodeInfo[1] : "unknown");
+ cleanFile(newFilePath::equals);
+ readFile(patternCode, newFilePath, channelId);
+ } else {
+ log.info("collectOnce open file:{}", newFilePath);
+ logFileMap.get(newFilePath).setReOpen(true);
+ }
} else {
readFile(patternCode, newFilePath, channelId);
}
@@ -384,9 +415,20 @@ public class ChannelServiceImpl extends
AbstractChannelService {
log.warn("file:{} marked stop to collect", filePath);
return;
}
- //Determine whether the file exists
if (FileUtil.exist(filePath)) {
- stopOldCurrentFileThread(filePath);
+ // Stop old file thread if exists (will close file handle via
setStop(true))
+ // Note: inode change detection is handled by periodic cleanup task
+ if (logFileMap.containsKey(filePath)) {
+ if (ChannelUtil.isInodeChanged(channelMemory, filePath)) {
+ Long[] inodeInfo = ChannelUtil.getInodeInfo(channelMemory,
filePath);
+ log.info("readFile: inode changed for path:{},
oldInode:{}, newInode:{}, stopping old file thread",
+ filePath, inodeInfo != null ? inodeInfo[0] :
"unknown", inodeInfo != null ? inodeInfo[1] : "unknown");
+ // Clean up all related maps for inode change case
+ cleanFile(filePath::equals);
+ } else {
+ stopOldCurrentFileThread(filePath);
+ }
+ }
log.info("start to collect file,channelId:{},fileName:{}",
channelId, filePath);
logFileMap.put(filePath, logFile);
Future<?> future = getExecutorServiceByType(logTypeEnum).submit(()
-> {
@@ -461,6 +503,16 @@ public class ChannelServiceImpl extends
AbstractChannelService {
filePath, gson.toJson(memoryUnixFileNode),
gson.toJson(currentUnixFileNode));
}
}
+
+ // Check if pointer exceeds file length (file may have been
truncated but inode unchanged)
+ // This handles the case where log rotation truncates the file
without changing inode
+ java.io.File file = new java.io.File(filePath);
+ if (file.exists() && pointer > file.length()) {
+ log.warn("pointer {} exceeds file length {} for file:{},
resetting to 0 (file may have been truncated)",
+ pointer, file.length(), filePath);
+ pointer = 0L;
+ lineNumber = 0L;
+ }
}
ChannelEngine channelEngine = Ioc.ins().getBean(ChannelEngine.class);
ILogFile logFile = channelEngine.logFile();
@@ -510,6 +562,9 @@ public class ChannelServiceImpl extends
AbstractChannelService {
if (null != lastFileLineScheduledFuture) {
lastFileLineScheduledFuture.cancel(false);
}
+ if (null != deletedFileCleanupFuture) {
+ deletedFileCleanupFuture.cancel(false);
+ }
for (Future future : futureMap.values()) {
future.cancel(false);
}
@@ -567,7 +622,16 @@ public class ChannelServiceImpl extends
AbstractChannelService {
readFile(channelDefine.getInput().getPatternCode(), filePath,
getChannelId());
log.info("watch new file create for
channelId:{},ip:{},path:{}", getChannelId(), filePath, ip);
} else {
- handleExistingLogFileWithRetry(logFile, filePath, ip);
+ if (ChannelUtil.isInodeChanged(channelMemory, filePath)) {
+ Long[] inodeInfo = ChannelUtil.getInodeInfo(channelMemory,
filePath);
+ log.info("reOpen: inode changed for path:{}, oldInode:{},
newInode:{}, cleaning up old file handle",
+ filePath, inodeInfo != null ? inodeInfo[0] :
"unknown", inodeInfo != null ? inodeInfo[1] : "unknown");
+ cleanFile(filePath::equals);
+ readFile(channelDefine.getInput().getPatternCode(),
filePath, getChannelId());
+ log.info("reOpen: file reopened after inode change for
channelId:{},ip:{},path:{}", getChannelId(), filePath, ip);
+ } else {
+ handleExistingLogFileWithRetry(logFile, filePath, ip);
+ }
}
} finally {
fileReopenLock.unlock();
@@ -642,7 +706,13 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private void delCollFile(String path) {
boolean shouldRemovePath = false;
if (logFileMap.containsKey(path) && fileReadMap.containsKey(path)) {
- if ((Instant.now().toEpochMilli() - fileReadMap.get(path)) >
TimeUnit.MINUTES.toMillis(1)) {
+ if (ChannelUtil.isInodeChanged(channelMemory, path)) {
+ Long[] inodeInfo = ChannelUtil.getInodeInfo(channelMemory,
path);
+ log.info("delCollFile trigger for inode changed path:{},
oldInode:{}, newInode:{}",
+ path, inodeInfo != null ? inodeInfo[0] : "unknown",
inodeInfo != null ? inodeInfo[1] : "unknown");
+ cleanFile(path::equals);
+ shouldRemovePath = true;
+ } else if ((Instant.now().toEpochMilli() - fileReadMap.get(path))
> TimeUnit.MINUTES.toMillis(1)) {
cleanFile(path::equals);
shouldRemovePath = true;
log.info("stop coll file:{}", path);
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index 0b401efd..a991ecc4 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -49,6 +49,7 @@ import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
@@ -186,20 +187,74 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
// 清理无效的文件信息的方法
private void cleanUpInvalidFileInfos() {
+ // Check for inode changes (files with same path but different inode)
+ checkAndShutdownInodeChangedReadListeners();
+
+ // Clean up FileInfoCache for files that no longer exist
ConcurrentMap<String, FileInfo> caches = FileInfoCache.ins().caches();
for (Iterator<Map.Entry<String, FileInfo>> iterator =
caches.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, FileInfo> entry = iterator.next();
FileInfo fileInfo = entry.getValue();
- File file = new File(fileInfo.getFileName());
+ String filePath = fileInfo.getFileName();
+ String cacheKey = entry.getKey();
- if (StringUtils.isEmpty(fileInfo.getFileName())) {
+ if (StringUtils.isEmpty(filePath)) {
continue;
}
+ File file = new File(filePath);
+ // Check if file no longer exists
if (!file.exists()) {
- FileInfoCache.ins().remove(entry.getKey());
+ log.info("cleanUpInvalidFileInfos: file no longer exists,
removing from cache, cacheKey:{}, filePath:{}", cacheKey, filePath);
+ FileInfoCache.ins().remove(cacheKey);
+ }
+ }
+ }
+
+ // Check for inode changes: when a file with the same path is deleted and
recreated with a new inode
+ // DefaultMonitorListener handles file deletion, but may not detect inode
changes for files with the same path
+ private void checkAndShutdownInodeChangedReadListeners() {
+ try {
+ ConcurrentHashMap<String, com.xiaomi.mone.file.ozhera.HeraFile>
fileMap = fileMonitor.getFileMap();
+ ConcurrentHashMap<Object, com.xiaomi.mone.file.ozhera.HeraFile>
map = fileMonitor.getMap();
+
+ List<ReadListener> listeners =
defaultMonitorListener.getReadListenerList();
+
+ for (ReadListener readListener : listeners) {
+ if (readListener instanceof
com.xiaomi.mone.file.listener.OzHeraReadListener) {
+ com.xiaomi.mone.file.listener.OzHeraReadListener
ozHeraReadListener =
+ (com.xiaomi.mone.file.listener.OzHeraReadListener)
readListener;
+ com.xiaomi.mone.file.LogFile2 logFile =
ozHeraReadListener.getLogFile();
+ if (logFile == null) {
+ continue;
+ }
+
+ String filePath = logFile.getFile();
+ Object logFileKey = logFile.getFileKey();
+
+ // Check if file path still exists in fileMap (may be new
file with same path but different inode)
+ com.xiaomi.mone.file.ozhera.HeraFile currentHeraFile =
fileMap.get(filePath);
+
+ // Shutdown old file handle if inode changed (same path,
different inode)
+ if (currentHeraFile != null && !Objects.equals(logFileKey,
currentHeraFile.getFileKey())) {
+ log.info("inode changed for path:{}, oldInode:{},
newInode:{}, shutting down old file handle",
+ filePath, logFileKey,
currentHeraFile.getFileKey());
+ logFile.shutdown();
+ // Remove old listener from readListenerMap (new file
with same path has different inode)
+ defaultMonitorListener.remove(logFileKey);
+ } else if (currentHeraFile == null &&
ChannelUtil.isInodeChanged(channelMemory, filePath)) {
+ // File removed from fileMap but ReadListener still
exists, inode changed
+ log.info("file removed from fileMap but inode changed,
path:{}, oldInode:{}, shutting down old file handle",
+ filePath, logFileKey);
+ logFile.shutdown();
+ // Remove listener from readListenerMap
+ defaultMonitorListener.remove(logFileKey);
+ }
+ }
}
+ } catch (Exception e) {
+ log.error("Error checking and shutting down inode changed
ReadListeners", e);
}
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileListener.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileListener.java
index 67768a34..a8abbf55 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileListener.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileListener.java
@@ -66,7 +66,9 @@ public class FileListener implements FileAlterationListener {
@Override
public void onFileDelete(File file) {
-
+ log.info("onFileDelete:" + file.getAbsolutePath());
+ // File deletion is handled by startDeletedFileCleanupTask and
ordinaryFileChanged's existence check
+ // No need to trigger consumer here to avoid duplicate handling
}
@Override
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
index 0b9c6986..6bdb424d 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
@@ -31,6 +31,7 @@ import org.apache.ozhera.log.agent.channel.ChannelService;
import org.apache.ozhera.log.agent.channel.WildcardChannelServiceImpl;
import org.apache.ozhera.log.agent.channel.file.FileMonitor;
import org.apache.ozhera.log.agent.channel.file.MonitorFile;
+import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
import org.apache.ozhera.log.agent.common.ChannelUtil;
import org.apache.ozhera.log.agent.common.ExecutorUtil;
import org.apache.ozhera.log.api.enums.LogTypeEnum;
@@ -237,11 +238,21 @@ public class DefaultFileMonitorListener implements
FileMonitorListener {
/**
* Normal file change event handling
+ * Note: File deletion and inode change detection are handled by:
+ * - startDeletedFileCleanupTask() in ChannelServiceImpl (periodic cleanup)
+ * - reOpen() method in ChannelServiceImpl (inode change check)
*/
private void ordinaryFileChanged(String changedFilePath) {
for (Map.Entry<List<MonitorFile>, ChannelService> channelServiceEntry
: pathChannelServiceMap.entrySet()) {
for (MonitorFile monitorFile : channelServiceEntry.getKey()) {
if
(monitorFile.getFilePattern().matcher(changedFilePath).matches()) {
+ ChannelService service = channelServiceEntry.getValue();
+ if (service == null) {
+ log.warn("ChannelService is null for monitorFile:{}",
monitorFile.getMonitorFileExpress());
+ continue;
+ }
+
+ // Reopen file (inode change and file deletion are handled
by reOpen() and periodic cleanup)
String reOpenFilePath = monitorFile.getRealFilePath();
/**
* OPENTELEMETRY Special processing of logs
@@ -255,14 +266,19 @@ public class DefaultFileMonitorListener implements
FileMonitorListener {
}
log.info("【change file path
reopen】started,changedFilePath:{},realFilePath:{},monitorFileExpress:{}",
changedFilePath, reOpenFilePath,
monitorFile.getMonitorFileExpress());
- channelServiceEntry.getValue().reOpen(reOpenFilePath);
- log.info("【end change file path】
end,changedFilePath:{},realFilePath:{},monitorFileExpress:{},InstanceId:{}",
- changedFilePath, reOpenFilePath,
monitorFile.getMonitorFileExpress(),
channelServiceEntry.getValue().instanceId());
+ try {
+ service.reOpen(reOpenFilePath);
+ log.info("【end change file path】
end,changedFilePath:{},realFilePath:{},monitorFileExpress:{},InstanceId:{}",
+ changedFilePath, reOpenFilePath,
monitorFile.getMonitorFileExpress(), service.instanceId());
+ } catch (Exception e) {
+ log.error("Error reopening file:{}", reOpenFilePath,
e);
+ }
}
}
}
}
+
/**
* Special file suffix change event handling Through actual observation,
* the go project found that the error log file of the log is
server.log.wf,
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
index 27a0f1a3..ac7d60fa 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
@@ -101,6 +101,71 @@ public class ChannelUtil {
return new ChannelMemory.UnixFileNode();
}
+ /**
+ * Get inode pair (memory inode and current inode) for a file
+ *
+ * @param channelMemory the channel memory containing file progress
+ * @param filePath the file path to check
+ * @return array with [memoryInode, currentInode] or null if not available
+ */
+ private static ChannelMemory.UnixFileNode[] getInodePair(ChannelMemory
channelMemory, String filePath) {
+ if (channelMemory == null || channelMemory.getFileProgressMap() ==
null) {
+ return null;
+ }
+
+ ChannelMemory.FileProgress fileProgress =
channelMemory.getFileProgressMap().get(filePath);
+ if (fileProgress == null || fileProgress.getUnixFileNode() == null) {
+ return null;
+ }
+
+ ChannelMemory.UnixFileNode memoryInode =
fileProgress.getUnixFileNode();
+ ChannelMemory.UnixFileNode currentInode = buildUnixFileNode(filePath);
+
+ return new ChannelMemory.UnixFileNode[]{memoryInode, currentInode};
+ }
+
+ /**
+ * Check if file inode has changed by comparing memory inode with current
file inode
+ *
+ * @param channelMemory the channel memory containing file progress
+ * @param filePath the file path to check
+ * @return true if inode has changed, false otherwise
+ */
+ public static boolean isInodeChanged(ChannelMemory channelMemory, String
filePath) {
+ ChannelMemory.UnixFileNode[] inodePair = getInodePair(channelMemory,
filePath);
+ if (inodePair == null) {
+ return false;
+ }
+
+ ChannelMemory.UnixFileNode memoryInode = inodePair[0];
+ ChannelMemory.UnixFileNode currentInode = inodePair[1];
+
+ return memoryInode.getSt_ino() != null && currentInode.getSt_ino() !=
null &&
+ !java.util.Objects.equals(memoryInode.getSt_ino(),
currentInode.getSt_ino());
+ }
+
+ /**
+ * Get inode information for logging
+ *
+ * @param channelMemory the channel memory
+ * @param filePath the file path
+ * @return array with [oldInode, newInode] or null if not available
+ */
+ public static Long[] getInodeInfo(ChannelMemory channelMemory, String
filePath) {
+ ChannelMemory.UnixFileNode[] inodePair = getInodePair(channelMemory,
filePath);
+ if (inodePair == null) {
+ return null;
+ }
+
+ ChannelMemory.UnixFileNode memoryInode = inodePair[0];
+ ChannelMemory.UnixFileNode currentInode = inodePair[1];
+
+ if (memoryInode.getSt_ino() != null && currentInode.getSt_ino() !=
null) {
+ return new Long[]{memoryInode.getSt_ino(),
currentInode.getSt_ino()};
+ }
+ return null;
+ }
+
public static long countFilesRecursive(File directory) {
long count = 0;
File[] files = directory.listFiles();
diff --git
a/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/channel/listener/InodeChangeDetectionTest.java
b/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/channel/listener/InodeChangeDetectionTest.java
new file mode 100644
index 00000000..acedefaa
--- /dev/null
+++
b/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/channel/listener/InodeChangeDetectionTest.java
@@ -0,0 +1,116 @@
+package org.apache.ozhera.log.agent.channel.listener;
+
+import org.apache.ozhera.log.agent.channel.AbstractChannelService;
+import org.apache.ozhera.log.agent.channel.ChannelDefine;
+import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
+import org.apache.ozhera.log.agent.channel.file.MonitorFile;
+import org.apache.ozhera.log.agent.common.ChannelUtil;
+import org.apache.ozhera.log.api.enums.LogTypeEnum;
+import org.apache.ozhera.log.api.model.meta.FilterConf;
+import org.apache.ozhera.log.agent.export.MsgExporter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Test for inode change detection when file is deleted and recreated with
same name
+ * This simulates the scenario where log rotation deletes and recreates files
with same name
+ */
+public class InodeChangeDetectionTest {
+
+ static class TestChannelService extends AbstractChannelService {
+ volatile CopyOnWriteArrayList<String> canceled = new
CopyOnWriteArrayList<>();
+ volatile CopyOnWriteArrayList<String> reopened = new
CopyOnWriteArrayList<>();
+ List<MonitorFile> monitors;
+ ChannelMemory channelMemory;
+ String filePath;
+
+ TestChannelService(List<MonitorFile> monitors, String filePath,
ChannelMemory.UnixFileNode initialInode) {
+ this.monitors = monitors;
+ this.filePath = filePath;
+ this.channelMemory = new ChannelMemory();
+ Map<String, ChannelMemory.FileProgress> fileProgressMap = new
HashMap<>();
+ ChannelMemory.FileProgress fileProgress = new
ChannelMemory.FileProgress();
+ fileProgress.setUnixFileNode(initialInode);
+ fileProgressMap.put(filePath, fileProgress);
+ this.channelMemory.setFileProgressMap(fileProgressMap);
+ }
+
+ @Override public void start() {}
+ @Override public void close() {}
+ @Override public void deleteCollFile(String directory) {}
+ @Override public void cancelFile(String file) { canceled.add(file); }
+ @Override public void reOpen(String filePath) {
reopened.add(filePath); }
+ @Override public void cleanCollectFiles() {}
+ @Override public void filterRefresh(List<FilterConf> confs) {}
+ @Override public void refresh(ChannelDefine channelDefine, MsgExporter
msgExporter) {}
+ @Override public void stopFile(List<String> filePrefixList) {}
+ @Override public org.apache.ozhera.log.agent.channel.ChannelState
state() { return null; }
+ @Override public List<MonitorFile> getMonitorPathList() { return
monitors; }
+ @Override public ChannelDefine getChannelDefine() { ChannelDefine d =
new ChannelDefine(); d.setChannelId(1L); return d; }
+ @Override public String instanceId() { return "test-inode"; }
+ @Override public ChannelMemory getChannelMemory() { return
channelMemory; }
+ @Override public Map<String, Long> getExpireFileMap() { return new
HashMap<>(); }
+ @Override public Long getLogCounts() { return 0L; }
+ }
+
+ @Test
+ public void ordinaryFileChanged_shouldCallReOpen() throws Exception {
+ Path temp = Files.createTempFile("agent-inode", ".log");
+ String realPath = temp.toString();
+
+ TestChannelService service = new TestChannelService(
+ List.of(MonitorFile.of(realPath, realPath + ".*",
LogTypeEnum.APP_LOG_SIGNAL, false)),
+ realPath, ChannelUtil.buildUnixFileNode(realPath));
+ DefaultFileMonitorListener listener = new DefaultFileMonitorListener();
+ listener.addChannelService(service);
+
+ // Trigger file change event (ordinaryFileChanged now just calls
reOpen)
+ Method m =
DefaultFileMonitorListener.class.getDeclaredMethod("ordinaryFileChanged",
String.class);
+ m.setAccessible(true);
+ m.invoke(listener, realPath);
+
+ // Verify reOpen was called (inode change detection is now handled in
reOpen() method)
+ Assert.assertTrue("File should be reopened",
+ service.reopened.stream().anyMatch(p -> p.equals(realPath) ||
p.contains(realPath)));
+
+ Files.deleteIfExists(temp);
+ }
+
+ @Test
+ public void channelUtil_shouldDetectInodeChange() throws Exception {
+ Path temp = Files.createTempFile("agent-inode-util", ".log");
+ String realPath = temp.toString();
+ ChannelMemory.UnixFileNode initialInode =
ChannelUtil.buildUnixFileNode(realPath);
+
+ if (initialInode.getSt_ino() == null) {
+ Files.deleteIfExists(temp);
+ return;
+ }
+
+ ChannelMemory channelMemory = new ChannelMemory();
+ Map<String, ChannelMemory.FileProgress> fileProgressMap = new
HashMap<>();
+ ChannelMemory.FileProgress fileProgress = new
ChannelMemory.FileProgress();
+ fileProgress.setUnixFileNode(initialInode);
+ fileProgressMap.put(realPath, fileProgress);
+ channelMemory.setFileProgressMap(fileProgressMap);
+
+ Assert.assertFalse("Inode should not be changed initially",
+ ChannelUtil.isInodeChanged(channelMemory, realPath));
+
+ Files.deleteIfExists(temp);
+ Files.createFile(temp);
+
+ Assert.assertTrue("Inode should be changed after file recreation",
+ ChannelUtil.isInodeChanged(channelMemory, realPath));
+
+ Files.deleteIfExists(temp);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]