This is an automated email from the ASF dual-hosted git repository.

zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b833682 feat: optimize file monitoring and configuration management 
functions (#581)
4b833682 is described below

commit 4b83368267c3f1ec971e8d98533028bd0ea47333
Author: wtt <[email protected]>
AuthorDate: Tue May 13 15:22:44 2025 +0800

    feat: optimize file monitoring and configuration management functions (#581)
---
 .../log/agent/channel/ChannelServiceFactory.java   |  3 +-
 .../log/agent/channel/ChannelServiceImpl.java      |  1 +
 .../agent/channel/file/InodeFileComparator.java    | 39 ++++++++++
 .../listener/DefaultFileMonitorListener.java       |  2 +-
 .../service/impl/MilogConfigNacosServiceImpl.java  | 85 ++++++++++------------
 .../log/stream/config/MilogConfigListener.java     |  6 ++
 .../job/extension/StreamCommonExtension.java       |  2 +
 .../impl/DefaultStreamCommonExtension.java         |  7 ++
 8 files changed, 97 insertions(+), 48 deletions(-)

diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
index 20dc82b2..6801aa6b 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
@@ -43,8 +43,7 @@ public class ChannelServiceFactory {
 
     private final AgentMemoryService agentMemoryService;
     private final String memoryBasePath;
-
-    private static final Pattern regexCharsPattern = 
Pattern.compile("[*+?^${}()|\\[\\]\\\\]");
+    private static final Pattern regexCharsPattern = 
Pattern.compile("[*+?^${}()\\[\\]\\\\]");
 
     public ChannelServiceFactory(AgentMemoryService agentMemoryService, String 
memoryBasePath) {
         this.agentMemoryService = agentMemoryService;
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 6e3a1ece..6a284fee 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -610,6 +610,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         return monitorFileList;
     }
 
+    @Override
     public ChannelMemory getChannelMemory() {
         return channelMemory;
     }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
index 7713c4a4..0399341e 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
@@ -27,6 +27,9 @@ import org.apache.ozhera.log.agent.common.ChannelUtil;
 
 import java.io.File;
 import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author wtt
@@ -47,6 +50,14 @@ public class InodeFileComparator extends 
DefaultFileComparator {
 
     private static final List<String> filePaths = Lists.newArrayList();
 
+    private static final ScheduledExecutorService cleanupExecutor = 
Executors.newSingleThreadScheduledExecutor();
+
+
+    static {
+        
cleanupExecutor.scheduleAtFixedRate(InodeFileComparator::cleanupStaleEntries,
+                1, 1, TimeUnit.MINUTES);
+    }
+
     @Override
     public int compare(File file1, File file2) {
 //        log.info("InodeFileComparator compare 
file1:{},file2:{},filePaths:{}", file1, file2, GSON.toJson(filePaths));
@@ -86,4 +97,32 @@ public class InodeFileComparator extends 
DefaultFileComparator {
         log.info("InodeFileComparator remove file : {}", filePath);
         filePaths.remove(filePath);
     }
+
+    private static void cleanupStaleEntries() {
+        try {
+            log.debug("Starting cleanup of stale INODE_MAP entries");
+            Iterator<Map.Entry<String, Long>> iterator = 
INODE_MAP.entrySet().iterator();
+            int removedCount = 0;
+
+            while (iterator.hasNext()) {
+                Map.Entry<String, Long> entry = iterator.next();
+                String filePath = entry.getKey();
+                File file = new File(filePath);
+
+                if (!file.exists()) {
+                    iterator.remove();
+                    removedCount++;
+                    log.debug("Removed stale entry for file: {}", filePath);
+                }
+            }
+
+            if (removedCount > 0) {
+                log.info("Cleaned up {} stale entries from INODE_MAP", 
removedCount);
+            }
+        } catch (Exception e) {
+            log.error("Error during INODE_MAP cleanup", e);
+        }
+    }
+
+
 }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
index c8a5cc60..5cc6ece7 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
@@ -274,7 +274,7 @@ public class DefaultFileMonitorListener implements 
FileMonitorListener {
         for (Map.Entry<List<MonitorFile>, ChannelService> channelServiceEntry 
: pathChannelServiceMap.entrySet()) {
             for (MonitorFile monitorFile : channelServiceEntry.getKey()) {
                 if (filterSuffixList.stream()
-                        .filter(s -> 
monitorFile.getRealFilePath().contains(s)).findAny().isPresent()
+                        .anyMatch(s -> 
monitorFile.getRealFilePath().contains(s))
                         && 
monitorFile.getFilePattern().matcher(changedFilePath).matches()) {
                     serviceMap.put(monitorFile.getRealFilePath(), 
channelServiceEntry.getValue());
                 }
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
index 354aaf55..3e2261a3 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
@@ -335,63 +335,58 @@ public class MilogConfigNacosServiceImpl implements 
MilogConfigNacosService {
         }
         // Delete configuration -- Delete log-tail
         if (OperateEnum.DELETE_OPERATE.getCode().equals(type) && 
!LOG_STORE.equalsIgnoreCase(changeType)) {
-            if (null != existConfig) {
-                List<SinkConfig> spaceConfig = existConfig.getSpaceConfig();
-                SinkConfig currentStoreConfig = spaceConfig.stream()
-                        .filter(sinkConfig -> 
sinkConfig.getLogstoreId().equals(storeId))
-                        .findFirst()
-                        .orElse(null);
-                if (null != currentStoreConfig) {
-                    List<LogtailConfig> logTailConfigs = 
currentStoreConfig.getLogtailConfigs();
-                    List<LogtailConfig> logtailConfigList = new 
ArrayList<>(logTailConfigs);
-
-                    if (null != tailId && 
CollectionUtils.isNotEmpty(logTailConfigs) &&
-                            logTailConfigs.stream().anyMatch(config -> 
config.getLogtailId().equals(tailId))) {
-                        logtailConfigList.removeIf(logtailConfig -> 
logtailConfig.getLogtailId().equals(tailId));
-                    }
-                    currentStoreConfig.setLogtailConfigs(logtailConfigList);
+            List<SinkConfig> spaceConfig = existConfig.getSpaceConfig();
+            SinkConfig currentStoreConfig = spaceConfig.stream()
+                    .filter(sinkConfig -> 
sinkConfig.getLogstoreId().equals(storeId))
+                    .findFirst()
+                    .orElse(null);
+            if (null != currentStoreConfig) {
+                List<LogtailConfig> logTailConfigs = 
currentStoreConfig.getLogtailConfigs();
+                List<LogtailConfig> logtailConfigList = new 
ArrayList<>(logTailConfigs);
+
+                if (null != tailId && 
CollectionUtils.isNotEmpty(logTailConfigs) &&
+                        logTailConfigs.stream().anyMatch(config -> null != 
config.getLogtailId() && config.getLogtailId().equals(tailId))) {
+                    logtailConfigList.removeIf(logtailConfig -> null != 
logtailConfig.getLogtailId() &&
+                            logtailConfig.getLogtailId().equals(tailId));
                 }
+                currentStoreConfig.setLogtailConfigs(logtailConfigList);
             }
         }
         // Delete configuration -- Delete log-tail
         if (OperateEnum.DELETE_OPERATE.getCode().equals(type) && 
LOG_STORE.equalsIgnoreCase(changeType)) {
-            if (null != existConfig) {
-                List<SinkConfig> sinkConfigListDelStore = 
existConfig.getSpaceConfig().stream()
-                        .filter(sinkConfig -> 
!storeId.equals(sinkConfig.getLogstoreId()))
-                        .collect(Collectors.toList());
-                existConfig.setSpaceConfig(sinkConfigListDelStore);
-            }
+            List<SinkConfig> sinkConfigListDelStore = 
existConfig.getSpaceConfig().stream()
+                    .filter(sinkConfig -> 
!storeId.equals(sinkConfig.getLogstoreId()))
+                    .collect(Collectors.toList());
+            existConfig.setSpaceConfig(sinkConfigListDelStore);
         }
         // Modify the configuration -- find a specific tail under this store 
to make changes
         if (OperateEnum.UPDATE_OPERATE.getCode().equals(type)) {
-            if (null != existConfig) {
-                List<SinkConfig> spaceConfig = existConfig.getSpaceConfig();
-                //Compare whether the store has changed
-                SinkConfig newSinkConfig = assembleSinkConfig(storeId, tailId, 
motorRoomEn);
-                SinkConfig currentStoreConfig = spaceConfig.stream()
-                        .filter(sinkConfig -> 
sinkConfig.getLogstoreId().equals(storeId))
+            List<SinkConfig> spaceConfig = existConfig.getSpaceConfig();
+            //Compare whether the store has changed
+            SinkConfig newSinkConfig = assembleSinkConfig(storeId, tailId, 
motorRoomEn);
+            SinkConfig currentStoreConfig = spaceConfig.stream()
+                    .filter(sinkConfig -> 
sinkConfig.getLogstoreId().equals(storeId))
+                    .findFirst()
+                    .orElse(null);
+            if (null != currentStoreConfig) {
+                if (!newSinkConfig.equals(currentStoreConfig)) {
+                    currentStoreConfig.updateStoreParam(newSinkConfig);
+                }
+                // Find the specific tail under the old store
+                LogtailConfig filterLogTailConfig = 
currentStoreConfig.getLogtailConfigs().stream()
+                        .filter(logTailConfig -> Objects.equals(tailId, 
logTailConfig.getLogtailId()))
                         .findFirst()
                         .orElse(null);
-                if (null != currentStoreConfig) {
-                    if (!newSinkConfig.equals(currentStoreConfig)) {
-                        currentStoreConfig.updateStoreParam(newSinkConfig);
-                    }
-                    // Find the specific tail under the old store
-                    LogtailConfig filterLogTailConfig = 
currentStoreConfig.getLogtailConfigs().stream()
-                            .filter(logTailConfig -> Objects.equals(tailId, 
logTailConfig.getLogtailId()))
-                            .findFirst()
-                            .orElse(null);
-                    if (null != filterLogTailConfig) {
-                        
BeanUtil.copyProperties(assembleLogTailConfigs(tailId), filterLogTailConfig);
-                    } else {
-                        log.info("query logtailConfig no designed 
config,tailId:{},insert", tailId);
-                        
currentStoreConfig.getLogtailConfigs().add(assembleLogTailConfigs(tailId));
-                    }
+                if (null != filterLogTailConfig) {
+                    BeanUtil.copyProperties(assembleLogTailConfigs(tailId), 
filterLogTailConfig);
                 } else {
-                    //Does not exist, new
-                    //New addition to the logstore
-                    spaceConfig.add(assembleSinkConfig(storeId, tailId, 
motorRoomEn));
+                    log.info("query logtailConfig no designed 
config,tailId:{},insert", tailId);
+                    
currentStoreConfig.getLogtailConfigs().add(assembleLogTailConfigs(tailId));
                 }
+            } else {
+                //Does not exist, new
+                //New addition to the logstore
+                spaceConfig.add(assembleSinkConfig(storeId, tailId, 
motorRoomEn));
             }
         }
         return existConfig;
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
index 6b657be1..b0d31c9e 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
@@ -96,6 +96,9 @@ public class MilogConfigListener {
             List<SinkConfig> sinkConfigs = newMilogSpaceData.getSpaceConfig();
             stopUnusedOldStoreJobs(sinkConfigs);
             for (SinkConfig sinkConfig : sinkConfigs) {
+                if (!streamCommonExtension.preSinkConfigExecution(sinkConfig, 
milogSpaceData.getMilogSpaceId())) {
+                    continue;
+                }
                 stopOldJobsForRemovedTailIds(sinkConfig);
                 if (oldSinkConfigMap.containsKey(sinkConfig.getLogstoreId())) {
                     //Whether the submission store information changes, the 
change stops
@@ -246,6 +249,9 @@ public class MilogConfigListener {
         List<SinkConfig> newSpaceConfig = newMilogSpaceData.getSpaceConfig();
         if (newSpaceConfig != null) {
             for (SinkConfig sinkConfig : newSpaceConfig) {
+                if (!streamCommonExtension.preSinkConfigExecution(sinkConfig, 
milogSpaceData.getMilogSpaceId())) {
+                    continue;
+                }
                 List<LogtailConfig> logTailConfigs = 
sinkConfig.getLogtailConfigs();
                 if (logTailConfigs != null) {
                     for (LogtailConfig logTailConfig : logTailConfigs) {
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/StreamCommonExtension.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/StreamCommonExtension.java
index 69b63975..8c3bb0b7 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/StreamCommonExtension.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/StreamCommonExtension.java
@@ -37,4 +37,6 @@ public interface StreamCommonExtension {
     Map<Long, String> getConfigMapByUniqueMark(Map<String, Map<Long, String>> 
config, String uniqueMark);
 
     Boolean preCheckTaskExecution(SinkConfig sinkConfig, LogtailConfig 
logTailConfig, Long logSpaceId);
+
+    Boolean preSinkConfigExecution(SinkConfig sinkConfig, Long logSpaceId);
 }
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
index c3232dbc..a0208446 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
@@ -43,6 +43,7 @@ public class DefaultStreamCommonExtension implements 
StreamCommonExtension {
         return data;
     }
 
+    @Override
     public Boolean checkUniqueMarkExists(String uniqueMarks, Map<String, 
Map<Long, String>> config) {
         String[] split = uniqueMarks.split(SYMBOL_COMMA);
         for (String s : split) {
@@ -53,6 +54,7 @@ public class DefaultStreamCommonExtension implements 
StreamCommonExtension {
         return false;
     }
 
+    @Override
     public Map<Long, String> getConfigMapByUniqueMark(Map<String, Map<Long, 
String>> config, String uniqueMark) {
         return config.get(uniqueMark);
     }
@@ -61,4 +63,9 @@ public class DefaultStreamCommonExtension implements 
StreamCommonExtension {
     public Boolean preCheckTaskExecution(SinkConfig sinkConfig, LogtailConfig 
logTailConfig, Long logSpaceId) {
         return true;
     }
+
+    @Override
+    public Boolean preSinkConfigExecution(SinkConfig sinkConfig, Long 
logSpaceId) {
+        return true;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to