This is an automated email from the ASF dual-hosted git repository.
zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new 4b833682 feat: optimize file monitoring and configuration management
functions (#581)
4b833682 is described below
commit 4b83368267c3f1ec971e8d98533028bd0ea47333
Author: wtt <[email protected]>
AuthorDate: Tue May 13 15:22:44 2025 +0800
feat: optimize file monitoring and configuration management functions (#581)
---
.../log/agent/channel/ChannelServiceFactory.java | 3 +-
.../log/agent/channel/ChannelServiceImpl.java | 1 +
.../agent/channel/file/InodeFileComparator.java | 39 ++++++++++
.../listener/DefaultFileMonitorListener.java | 2 +-
.../service/impl/MilogConfigNacosServiceImpl.java | 85 ++++++++++------------
.../log/stream/config/MilogConfigListener.java | 6 ++
.../job/extension/StreamCommonExtension.java | 2 +
.../impl/DefaultStreamCommonExtension.java | 7 ++
8 files changed, 97 insertions(+), 48 deletions(-)
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
index 20dc82b2..6801aa6b 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
@@ -43,8 +43,7 @@ public class ChannelServiceFactory {
private final AgentMemoryService agentMemoryService;
private final String memoryBasePath;
-
- private static final Pattern regexCharsPattern =
Pattern.compile("[*+?^${}()|\\[\\]\\\\]");
+ private static final Pattern regexCharsPattern =
Pattern.compile("[*+?^${}()\\[\\]\\\\]");
public ChannelServiceFactory(AgentMemoryService agentMemoryService, String
memoryBasePath) {
this.agentMemoryService = agentMemoryService;
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 6e3a1ece..6a284fee 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -610,6 +610,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
return monitorFileList;
}
+ @Override
public ChannelMemory getChannelMemory() {
return channelMemory;
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
index 7713c4a4..0399341e 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
@@ -27,6 +27,9 @@ import org.apache.ozhera.log.agent.common.ChannelUtil;
import java.io.File;
import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* @author wtt
@@ -47,6 +50,14 @@ public class InodeFileComparator extends
DefaultFileComparator {
private static final List<String> filePaths = Lists.newArrayList();
+ private static final ScheduledExecutorService cleanupExecutor =
Executors.newSingleThreadScheduledExecutor();
+
+
+ static {
+
cleanupExecutor.scheduleAtFixedRate(InodeFileComparator::cleanupStaleEntries,
+ 1, 1, TimeUnit.MINUTES);
+ }
+
@Override
public int compare(File file1, File file2) {
// log.info("InodeFileComparator compare
file1:{},file2:{},filePaths:{}", file1, file2, GSON.toJson(filePaths));
@@ -86,4 +97,32 @@ public class InodeFileComparator extends
DefaultFileComparator {
log.info("InodeFileComparator remove file : {}", filePath);
filePaths.remove(filePath);
}
+
+ private static void cleanupStaleEntries() {
+ try {
+ log.debug("Starting cleanup of stale INODE_MAP entries");
+ Iterator<Map.Entry<String, Long>> iterator =
INODE_MAP.entrySet().iterator();
+ int removedCount = 0;
+
+ while (iterator.hasNext()) {
+ Map.Entry<String, Long> entry = iterator.next();
+ String filePath = entry.getKey();
+ File file = new File(filePath);
+
+ if (!file.exists()) {
+ iterator.remove();
+ removedCount++;
+ log.debug("Removed stale entry for file: {}", filePath);
+ }
+ }
+
+ if (removedCount > 0) {
+ log.info("Cleaned up {} stale entries from INODE_MAP",
removedCount);
+ }
+ } catch (Exception e) {
+ log.error("Error during INODE_MAP cleanup", e);
+ }
+ }
+
+
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
index c8a5cc60..5cc6ece7 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
@@ -274,7 +274,7 @@ public class DefaultFileMonitorListener implements
FileMonitorListener {
for (Map.Entry<List<MonitorFile>, ChannelService> channelServiceEntry
: pathChannelServiceMap.entrySet()) {
for (MonitorFile monitorFile : channelServiceEntry.getKey()) {
if (filterSuffixList.stream()
- .filter(s ->
monitorFile.getRealFilePath().contains(s)).findAny().isPresent()
+ .anyMatch(s ->
monitorFile.getRealFilePath().contains(s))
&&
monitorFile.getFilePattern().matcher(changedFilePath).matches()) {
serviceMap.put(monitorFile.getRealFilePath(),
channelServiceEntry.getValue());
}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
index 354aaf55..3e2261a3 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
@@ -335,63 +335,58 @@ public class MilogConfigNacosServiceImpl implements
MilogConfigNacosService {
}
// Delete configuration -- Delete log-tail
if (OperateEnum.DELETE_OPERATE.getCode().equals(type) &&
!LOG_STORE.equalsIgnoreCase(changeType)) {
- if (null != existConfig) {
- List<SinkConfig> spaceConfig = existConfig.getSpaceConfig();
- SinkConfig currentStoreConfig = spaceConfig.stream()
- .filter(sinkConfig ->
sinkConfig.getLogstoreId().equals(storeId))
- .findFirst()
- .orElse(null);
- if (null != currentStoreConfig) {
- List<LogtailConfig> logTailConfigs =
currentStoreConfig.getLogtailConfigs();
- List<LogtailConfig> logtailConfigList = new
ArrayList<>(logTailConfigs);
-
- if (null != tailId &&
CollectionUtils.isNotEmpty(logTailConfigs) &&
- logTailConfigs.stream().anyMatch(config ->
config.getLogtailId().equals(tailId))) {
- logtailConfigList.removeIf(logtailConfig ->
logtailConfig.getLogtailId().equals(tailId));
- }
- currentStoreConfig.setLogtailConfigs(logtailConfigList);
+ List<SinkConfig> spaceConfig = existConfig.getSpaceConfig();
+ SinkConfig currentStoreConfig = spaceConfig.stream()
+ .filter(sinkConfig ->
sinkConfig.getLogstoreId().equals(storeId))
+ .findFirst()
+ .orElse(null);
+ if (null != currentStoreConfig) {
+ List<LogtailConfig> logTailConfigs =
currentStoreConfig.getLogtailConfigs();
+ List<LogtailConfig> logtailConfigList = new
ArrayList<>(logTailConfigs);
+
+ if (null != tailId &&
CollectionUtils.isNotEmpty(logTailConfigs) &&
+ logTailConfigs.stream().anyMatch(config -> null !=
config.getLogtailId() && config.getLogtailId().equals(tailId))) {
+ logtailConfigList.removeIf(logtailConfig -> null !=
logtailConfig.getLogtailId() &&
+ logtailConfig.getLogtailId().equals(tailId));
}
+ currentStoreConfig.setLogtailConfigs(logtailConfigList);
}
}
// Delete configuration -- Delete log-tail
if (OperateEnum.DELETE_OPERATE.getCode().equals(type) &&
LOG_STORE.equalsIgnoreCase(changeType)) {
- if (null != existConfig) {
- List<SinkConfig> sinkConfigListDelStore =
existConfig.getSpaceConfig().stream()
- .filter(sinkConfig ->
!storeId.equals(sinkConfig.getLogstoreId()))
- .collect(Collectors.toList());
- existConfig.setSpaceConfig(sinkConfigListDelStore);
- }
+ List<SinkConfig> sinkConfigListDelStore =
existConfig.getSpaceConfig().stream()
+ .filter(sinkConfig ->
!storeId.equals(sinkConfig.getLogstoreId()))
+ .collect(Collectors.toList());
+ existConfig.setSpaceConfig(sinkConfigListDelStore);
}
// Modify the configuration -- find a specific tail under this store
to make changes
if (OperateEnum.UPDATE_OPERATE.getCode().equals(type)) {
- if (null != existConfig) {
- List<SinkConfig> spaceConfig = existConfig.getSpaceConfig();
- //Compare whether the store has changed
- SinkConfig newSinkConfig = assembleSinkConfig(storeId, tailId,
motorRoomEn);
- SinkConfig currentStoreConfig = spaceConfig.stream()
- .filter(sinkConfig ->
sinkConfig.getLogstoreId().equals(storeId))
+ List<SinkConfig> spaceConfig = existConfig.getSpaceConfig();
+ //Compare whether the store has changed
+ SinkConfig newSinkConfig = assembleSinkConfig(storeId, tailId,
motorRoomEn);
+ SinkConfig currentStoreConfig = spaceConfig.stream()
+ .filter(sinkConfig ->
sinkConfig.getLogstoreId().equals(storeId))
+ .findFirst()
+ .orElse(null);
+ if (null != currentStoreConfig) {
+ if (!newSinkConfig.equals(currentStoreConfig)) {
+ currentStoreConfig.updateStoreParam(newSinkConfig);
+ }
+ // Find the specific tail under the old store
+ LogtailConfig filterLogTailConfig =
currentStoreConfig.getLogtailConfigs().stream()
+ .filter(logTailConfig -> Objects.equals(tailId,
logTailConfig.getLogtailId()))
.findFirst()
.orElse(null);
- if (null != currentStoreConfig) {
- if (!newSinkConfig.equals(currentStoreConfig)) {
- currentStoreConfig.updateStoreParam(newSinkConfig);
- }
- // Find the specific tail under the old store
- LogtailConfig filterLogTailConfig =
currentStoreConfig.getLogtailConfigs().stream()
- .filter(logTailConfig -> Objects.equals(tailId,
logTailConfig.getLogtailId()))
- .findFirst()
- .orElse(null);
- if (null != filterLogTailConfig) {
-
BeanUtil.copyProperties(assembleLogTailConfigs(tailId), filterLogTailConfig);
- } else {
- log.info("query logtailConfig no designed
config,tailId:{},insert", tailId);
-
currentStoreConfig.getLogtailConfigs().add(assembleLogTailConfigs(tailId));
- }
+ if (null != filterLogTailConfig) {
+ BeanUtil.copyProperties(assembleLogTailConfigs(tailId),
filterLogTailConfig);
} else {
- //Does not exist, new
- //New addition to the logstore
- spaceConfig.add(assembleSinkConfig(storeId, tailId,
motorRoomEn));
+ log.info("query logtailConfig no designed
config,tailId:{},insert", tailId);
+
currentStoreConfig.getLogtailConfigs().add(assembleLogTailConfigs(tailId));
}
+ } else {
+ //Does not exist, new
+ //New addition to the logstore
+ spaceConfig.add(assembleSinkConfig(storeId, tailId,
motorRoomEn));
}
}
return existConfig;
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
index 6b657be1..b0d31c9e 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
@@ -96,6 +96,9 @@ public class MilogConfigListener {
List<SinkConfig> sinkConfigs = newMilogSpaceData.getSpaceConfig();
stopUnusedOldStoreJobs(sinkConfigs);
for (SinkConfig sinkConfig : sinkConfigs) {
+ if (!streamCommonExtension.preSinkConfigExecution(sinkConfig,
milogSpaceData.getMilogSpaceId())) {
+ continue;
+ }
stopOldJobsForRemovedTailIds(sinkConfig);
if (oldSinkConfigMap.containsKey(sinkConfig.getLogstoreId())) {
//Whether the submission store information changes, the
change stops
@@ -246,6 +249,9 @@ public class MilogConfigListener {
List<SinkConfig> newSpaceConfig = newMilogSpaceData.getSpaceConfig();
if (newSpaceConfig != null) {
for (SinkConfig sinkConfig : newSpaceConfig) {
+ if (!streamCommonExtension.preSinkConfigExecution(sinkConfig,
milogSpaceData.getMilogSpaceId())) {
+ continue;
+ }
List<LogtailConfig> logTailConfigs =
sinkConfig.getLogtailConfigs();
if (logTailConfigs != null) {
for (LogtailConfig logTailConfig : logTailConfigs) {
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/StreamCommonExtension.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/StreamCommonExtension.java
index 69b63975..8c3bb0b7 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/StreamCommonExtension.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/StreamCommonExtension.java
@@ -37,4 +37,6 @@ public interface StreamCommonExtension {
Map<Long, String> getConfigMapByUniqueMark(Map<String, Map<Long, String>>
config, String uniqueMark);
Boolean preCheckTaskExecution(SinkConfig sinkConfig, LogtailConfig
logTailConfig, Long logSpaceId);
+
+ Boolean preSinkConfigExecution(SinkConfig sinkConfig, Long logSpaceId);
}
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
index c3232dbc..a0208446 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
@@ -43,6 +43,7 @@ public class DefaultStreamCommonExtension implements
StreamCommonExtension {
return data;
}
+ @Override
public Boolean checkUniqueMarkExists(String uniqueMarks, Map<String,
Map<Long, String>> config) {
String[] split = uniqueMarks.split(SYMBOL_COMMA);
for (String s : split) {
@@ -53,6 +54,7 @@ public class DefaultStreamCommonExtension implements
StreamCommonExtension {
return false;
}
+ @Override
public Map<Long, String> getConfigMapByUniqueMark(Map<String, Map<Long,
String>> config, String uniqueMark) {
return config.get(uniqueMark);
}
@@ -61,4 +63,9 @@ public class DefaultStreamCommonExtension implements
StreamCommonExtension {
public Boolean preCheckTaskExecution(SinkConfig sinkConfig, LogtailConfig
logTailConfig, Long logSpaceId) {
return true;
}
+
+ @Override
+ public Boolean preSinkConfigExecution(SinkConfig sinkConfig, Long
logSpaceId) {
+ return true;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]