This is an automated email from the ASF dual-hosted git repository.

gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 90b3d621 refactor: optimize log collection and parsing logic (#605)
90b3d621 is described below

commit 90b3d6218edaea9d732bbf7d29415d4aa87a9366
Author: wtt <[email protected]>
AuthorDate: Thu Sep 25 15:14:37 2025 +0800

    refactor: optimize log collection and parsing logic (#605)
    
    * refactor: optimize log collection and parsing logic
    
    * refactor: adding Apache license header for FilterIdEnum.java
---
 .../app/service/impl/HeraAppServiceImpl.java       | 15 +++--
 .../log/agent/channel/AbstractChannelService.java  |  4 +-
 .../log/agent/channel/ChannelServiceFactory.java   | 20 +++++-
 .../log/agent/channel/ChannelServiceImpl.java      | 26 ++++----
 .../agent/channel/WildcardChannelServiceImpl.java  | 26 ++++----
 .../apache/ozhera/log/api/enums/FilterIdEnum.java  | 40 ++++++++++++
 .../ozhera/log/api/model/msg/LineMessage.java      |  2 +-
 .../org/apache/ozhera/log/common/PathUtils.java    | 16 ++---
 .../apache/ozhera/log/parse/AbstractLogParser.java |  4 +-
 .../apache/ozhera/log/parse/PlaceholderParser.java |  4 +-
 .../apache/ozhera/log/common/LogParserTest.java    | 22 ++++++-
 .../apache/ozhera/log/common/PathUtilsTest.java    |  4 +-
 .../extension/common/CommonExtensionService.java   |  3 +
 .../common/DefaultCommonExtensionService.java      | 12 +++-
 .../service/impl/HeralogHomePageServiceImpl.java   | 76 +++++++++++++---------
 .../service/impl/MilogConfigNacosServiceImpl.java  |  7 +-
 .../nacos/impl/SpaceConfigNacosPublisher.java      |  5 ++
 .../nacos/impl/StreamConfigNacosProvider.java      |  7 +-
 .../nacos/impl/StreamConfigNacosPublisher.java     |  8 ++-
 .../ozhera/log/stream/job/LogDataTransfer.java     |  8 +--
 20 files changed, 208 insertions(+), 101 deletions(-)

diff --git 
a/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/impl/HeraAppServiceImpl.java
 
b/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/impl/HeraAppServiceImpl.java
index 4c08b6c6..6d886ac7 100644
--- 
a/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/impl/HeraAppServiceImpl.java
+++ 
b/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/impl/HeraAppServiceImpl.java
@@ -22,6 +22,10 @@ import 
com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.dubbo.config.annotation.Service;
 import org.apache.ozhera.app.api.model.HeraAppBaseInfoModel;
 import org.apache.ozhera.app.api.model.HeraAppBaseInfoParticipant;
 import org.apache.ozhera.app.api.model.HeraAppBaseQuery;
@@ -37,10 +41,6 @@ import org.apache.ozhera.app.model.HeraAppExcessInfo;
 import org.apache.ozhera.app.model.HeraAppRole;
 import org.apache.ozhera.app.service.HeraAppRoleService;
 import org.apache.ozhera.app.service.extension.AppTypeServiceExtension;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.dubbo.config.annotation.Service;
 import org.jetbrains.annotations.Nullable;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -76,6 +76,8 @@ public class HeraAppServiceImpl implements HeraAppService {
 
     private final AppTypeServiceExtension appTypeServiceExtension;
 
+    private final Integer DEFAULT_LIMIT = 2000;
+
     public HeraAppServiceImpl(HeraAppBaseInfoMapper heraAppBaseInfoMapper, 
HeraAppExcessInfoMapper heraAppExcessInfoMapper, HeraAppRoleService 
roleService, HeraAppRoleMapper heraAppRoleMapper, AppTypeServiceExtension 
appTypeServiceExtension) {
         this.heraAppBaseInfoMapper = heraAppBaseInfoMapper;
         this.heraAppExcessInfoMapper = heraAppExcessInfoMapper;
@@ -101,6 +103,9 @@ public class HeraAppServiceImpl implements HeraAppService {
                 return appBaseInfo;
             }).collect(Collectors.toList());
         }
+        if (appBaseInfos.size() > DEFAULT_LIMIT) {
+            return new 
ArrayList<>(appBaseInfos.stream().limit(DEFAULT_LIMIT).toList());
+        }
         return appBaseInfos;
     }
 
@@ -115,7 +120,7 @@ public class HeraAppServiceImpl implements HeraAppService {
                 platformType = appTypeServiceExtension.getAppPlatForm(type);
             }
             appBaseInfos = heraAppBaseInfoMapper.queryAppInfo(appName, 
platformType, appType);
-        }else{
+        } else {
             appBaseInfos = heraAppBaseInfoMapper.queryLatestAppInfo(limit, 
platformType, appType);
         }
         if (CollectionUtils.isNotEmpty(appBaseInfos)) {
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
index a82bbdce..275df882 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
@@ -173,14 +173,14 @@ public abstract class AbstractChannelService implements 
ChannelService {
         }));
     }
 
-    protected LineMessage createLineMessage(String lineMsg, 
AtomicReference<ReadResult> readResult, String pattern, String patternCode, 
String ip, long ct) {
+    protected LineMessage createLineMessage(String lineMsg, 
AtomicReference<ReadResult> readResult, String pattern, String patternCode, 
long ct) {
         LineMessage lineMessage = new LineMessage();
         lineMessage.setMsgBody(lineMsg);
         lineMessage.setPointer(readResult.get().getPointer());
         lineMessage.setLineNumber(readResult.get().getLineNumber());
         lineMessage.setFileName(pattern);
         lineMessage.setProperties(LineMessage.KEY_MQ_TOPIC_TAG, patternCode);
-        lineMessage.setProperties(LineMessage.KEY_IP, ip);
+        lineMessage.setProperties(LineMessage.KEY_IP, getTailPodIp(pattern));
         lineMessage.setProperties(LineMessage.KEY_COLLECT_TIMESTAMP, 
String.valueOf(ct));
 
         String logType = getChannelDefine().getInput().getType();
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
index f49e9876..6d2f042e 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
@@ -30,6 +30,7 @@ import org.apache.ozhera.log.api.enums.LogTypeEnum;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.apache.ozhera.log.common.Constant.SYMBOL_COMMA;
@@ -49,7 +50,7 @@ public class ChannelServiceFactory {
 
     private final AgentMemoryService agentMemoryService;
     private final String memoryBasePath;
-    private static final Pattern regexCharsPattern = 
Pattern.compile("[*+?^${}()\\[\\]\\\\]");
+    private static final Pattern REGEX_CHARS_PATTERN = 
Pattern.compile("[*+?^${}\\[\\]]");
 
     private static List<String> multiSpecialFileSuffix;
 
@@ -115,7 +116,22 @@ public class ChannelServiceFactory {
         if (StringUtils.isEmpty(logPattern)) {
             return false;
         }
-        return regexCharsPattern.matcher(logPattern).find();
+
+        boolean hasGroup = logPattern.contains("(") && 
logPattern.contains(")") && logPattern.contains("|");
+        boolean hasRegexChars = REGEX_CHARS_PATTERN.matcher(logPattern).find();
+
+        // Case 1: Contains regular key symbols (*, ?, ^, $, { }, [ ], \, .) → 
must be regular
+        if (hasRegexChars) {
+            return true;
+        }
+
+        // Case 2: Only include () and | → enumeration, not regular
+        if (hasGroup) {
+            return false;
+        }
+
+        // Case 3: Normal path
+        return false;
     }
 
     private ChannelService createStandardChannelService(MsgExporter exporter,
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 07c5e325..7efdf610 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -234,7 +234,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
     private void startCollectFile(Long channelId, Input input, List<String> 
patterns) {
         for (int i = 0; i < patterns.size(); i++) {
             log.info("startCollectFile,total file:{},start:{},remain:{}", 
patterns.size(), i + 1, patterns.size() - (i + 1));
-            readFile(input.getPatternCode(), getTailPodIp(patterns.get(i)), 
patterns.get(i), channelId);
+            readFile(input.getPatternCode(), getTailPodIp(patterns.get(i)), 
channelId);
             InodeFileComparator.addFile(patterns.get(i));
         }
         lastLineRemainSendSchedule(input.getPatternCode());
@@ -242,13 +242,11 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
 
 
     private void handleAllFileCollectMonitor(String patternCode, String 
newFilePath, Long channelId) {
-        String ip = getTailPodIp(newFilePath);
-
         if (logFileMap.keySet().stream().anyMatch(key -> 
Objects.equals(newFilePath, key))) {
             log.info("collectOnce open file:{}", newFilePath);
             logFileMap.get(newFilePath).setReOpen(true);
         } else {
-            readFile(patternCode, ip, newFilePath, channelId);
+            readFile(patternCode, newFilePath, channelId);
         }
     }
 
@@ -296,7 +294,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         monitorFileList.add(MonitorFile.of(configPath, 
cleanedPathList.getFirst(), logTypeEnum, collectOnce));
     }
 
-    private ReadListener initFileReadListener(MLog mLog, String patternCode, 
String ip, String pattern) {
+    private ReadListener initFileReadListener(MLog mLog, String patternCode, 
String pattern) {
         AtomicReference<ReadResult> readResult = new AtomicReference<>();
         ReadListener listener = new DefaultReadListener(event -> {
             readResult.set(event.getReadResult());
@@ -318,7 +316,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                         if (null != l) {
                             try {
                                 fileColLock.lock();
-                                wrapDataToSend(l, readResult, pattern, 
patternCode, ip, ct);
+                                wrapDataToSend(l, readResult, pattern, 
patternCode, ct);
                             } finally {
                                 fileColLock.unlock();
                             }
@@ -347,7 +345,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                             String remainMsg = mLog.takeRemainMsg2();
                             if (null != remainMsg) {
                                 log.info("start send last 
line,pattern:{},patternCode:{},ip:{},data:{}", pattern, patternCode, 
getTailPodIp(pattern), remainMsg);
-                                wrapDataToSend(remainMsg, 
referenceEntry.getValue().getValue(), pattern, patternCode, 
getTailPodIp(pattern), appendTime);
+                                wrapDataToSend(remainMsg, 
referenceEntry.getValue().getValue(), pattern, patternCode, appendTime);
                             }
                         } finally {
                             fileColLock.unlock();
@@ -358,8 +356,8 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         }), 30, 30, TimeUnit.SECONDS);
     }
 
-    private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult> 
readResult, String pattern, String patternCode, String ip, long ct) {
-        LineMessage lineMessage = createLineMessage(lineMsg, readResult, 
pattern, patternCode, ip, ct);
+    private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult> 
readResult, String pattern, String patternCode, long ct) {
+        LineMessage lineMessage = createLineMessage(lineMsg, readResult, 
pattern, patternCode, ct);
 
         updateChannelMemory(channelMemory, pattern, logTypeEnum, ct, 
readResult);
         lineMessageList.add(lineMessage);
@@ -372,14 +370,12 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         }
     }
 
-    private void readFile(String patternCode, String ip, String filePath, Long 
channelId) {
+    private void readFile(String patternCode, String filePath, Long channelId) 
{
         MLog mLog = new MLog();
         if (StringUtils.isNotBlank(this.linePrefix)) {
             mLog.setCustomLinePattern(this.linePrefix);
         }
-        String usedIp = StringUtils.isBlank(ip) ? NetUtil.getLocalIp() : ip;
-
-        ReadListener listener = initFileReadListener(mLog, patternCode, 
usedIp, filePath);
+        ReadListener listener = initFileReadListener(mLog, patternCode, 
filePath);
         Map<String, ChannelMemory.FileProgress> fileProgressMap = 
channelMemory.getFileProgressMap();
         printMapToJson(fileProgressMap, collectOnce);
 
@@ -399,7 +395,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                     logFile.readLine();
                 } catch (Exception e) {
                     logFile.setExceptionFinish();
-                    log.error("logFile read line 
err,channelId:{},localIp:{},file:{},patternCode:{}", channelId, usedIp, 
fileProgressMap, patternCode, e);
+                    log.error("logFile read line 
err,channelId:{},file:{},patternCode:{}", channelId, fileProgressMap, 
patternCode, e);
                 }
             });
             futureMap.put(filePath, future);
@@ -568,7 +564,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
             String ip = StringUtils.isBlank(tailPodIp) ? NetUtil.getLocalIp() 
: tailPodIp;
             if (null == logFile || logFile.getExceptionFinish()) {
                 // Add new log file
-                readFile(channelDefine.getInput().getPatternCode(), ip, 
filePath, getChannelId());
+                readFile(channelDefine.getInput().getPatternCode(), filePath, 
getChannelId());
                 log.info("watch new file create for 
channelId:{},ip:{},path:{}", getChannelId(), filePath, ip);
             } else {
                 handleExistingLogFileWithRetry(logFile, filePath, ip);
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index 0d261870..0b401efd 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -135,7 +135,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
         }
         memoryService.cleanChannelMemoryContent(channelId, patterns);
 
-        startCollectFile(channelId, input, getTailPodIp(logPattern));
+        startCollectFile(channelId, input);
 
         startExportQueueDataThread();
         memoryService.refreshMemory(channelMemory);
@@ -143,13 +143,13 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
 
     }
 
-    private void startCollectFile(Long channelId, Input input, String ip) {
+    private void startCollectFile(Long channelId, Input input) {
         try {
             // Load the restart file
             String restartFile = buildRestartFilePath();
             FileInfoCache.ins().load(restartFile);
 
-            fileMonitor = createFileMonitor(input.getPatternCode(), ip);
+            fileMonitor = createFileMonitor(input.getPatternCode());
 
             String fileExpression = buildFileExpression(input.getLogPattern());
 
@@ -166,7 +166,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
                 
fileCollFutures.add(getExecutorServiceByType(getLogTypeEnum()).submit(() -> 
monitorFileChanges(fileMonitor, monitorPath, pattern)));
             }
         } catch (Exception e) {
-            log.error("startCollectFile error, channelId: {}, input: {}, ip: 
{}", channelId, GSON.toJson(input), ip, e);
+            log.error("startCollectFile error, channelId: {}, input: {}", 
channelId, GSON.toJson(input), e);
         }
     }
 
@@ -257,7 +257,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
     }
 
 
-    private HeraFileMonitor createFileMonitor(String patternCode, String ip) {
+    private HeraFileMonitor createFileMonitor(String patternCode) {
         MLog mLog = new MLog();
         if (StringUtils.isNotBlank(this.linePrefix)) {
             mLog.setCustomLinePattern(this.linePrefix);
@@ -272,7 +272,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
                 log.info("Empty data");
                 return;
             }
-            processLogLines(readResult, patternCode, ip, mLog);
+            processLogLines(readResult, patternCode, mLog);
         });
 
         monitor.setListener(defaultMonitorListener);
@@ -280,11 +280,11 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
         /**
          * Collect all data in the last row of data that has not been sent for 
more than 10 seconds.
          */
-        scheduleLastLineSender(mLog, readResult, patternCode, ip);
+        scheduleLastLineSender(mLog, readResult, patternCode);
         return monitor;
     }
 
-    private void processLogLines(AtomicReference<ReadResult> readResult, 
String patternCode, String ip, MLog mLog) {
+    private void processLogLines(AtomicReference<ReadResult> readResult, 
String patternCode, MLog mLog) {
         long currentTime = System.currentTimeMillis();
         ReadResult result = readResult.get();
 
@@ -297,7 +297,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
             if (line != null) {
                 try {
                     reentrantLock.lock();
-                    wrapDataToSend(line, readResult, patternCode, ip, 
currentTime);
+                    wrapDataToSend(line, readResult, patternCode, currentTime);
                 } finally {
                     reentrantLock.unlock();
                 }
@@ -307,7 +307,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
         });
     }
 
-    private void scheduleLastLineSender(MLog mLog, AtomicReference<ReadResult> 
readResult, String patternCode, String ip) {
+    private void scheduleLastLineSender(MLog mLog, AtomicReference<ReadResult> 
readResult, String patternCode) {
         lastFileLineScheduledFuture = ExecutorUtil.scheduleAtFixedRate(() -> {
             Long appendTime = mLog.getAppendTime();
             if (appendTime != null && Instant.now().toEpochMilli() - 
appendTime > 10 * 1000) {
@@ -316,7 +316,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
                         String remainMsg = mLog.takeRemainMsg2();
                         if (null != remainMsg) {
                             log.info("start send last line, fileName:{}, 
patternCode:{}, data:{}", readResult.get().getFilePathName(), patternCode, 
remainMsg);
-                            wrapDataToSend(remainMsg, readResult, patternCode, 
ip, Instant.now().toEpochMilli());
+                            wrapDataToSend(remainMsg, readResult, patternCode, 
Instant.now().toEpochMilli());
                         }
                     } finally {
                         reentrantLock.unlock();
@@ -326,9 +326,9 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
         }, 30, 30, TimeUnit.SECONDS);
     }
 
-    private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult> 
readResult, String patternCode, String localIp, long ct) {
+    private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult> 
readResult, String patternCode, long ct) {
         String filePathName = readResult.get().getFilePathName();
-        LineMessage lineMessage = createLineMessage(lineMsg, readResult, 
filePathName, patternCode, localIp, ct);
+        LineMessage lineMessage = createLineMessage(lineMsg, readResult, 
filePathName, patternCode, ct);
         updateChannelMemory(channelMemory, filePathName, getLogTypeEnum(), ct, 
readResult);
 
         lineMessageList.add(lineMessage);
diff --git 
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/enums/FilterIdEnum.java
 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/enums/FilterIdEnum.java
new file mode 100644
index 00000000..402a9dbc
--- /dev/null
+++ 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/enums/FilterIdEnum.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.api.enums;
+
+import lombok.Getter;
+
+/**
+ * @author wtt
+ * @date 2025/9/15 9:58
+ * @version 1.0
+ */
+@Getter
+public enum FilterIdEnum {
+    FILTER_SPACE_ID(1, "space"),
+    FILTER_STORE_ID(2, "store"),
+    FILTER_TAIL_ID(3, "tail");
+    final int code;
+    final String desc;
+
+    FilterIdEnum(int code, String desc) {
+        this.code = code;
+        this.desc = desc;
+    }
+}
diff --git 
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
index ae81d570..3b11a700 100644
--- 
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
+++ 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
@@ -54,7 +54,7 @@ public class LineMessage implements Serializable {
     public LineMessage() {
     }
 
-    public long getTimestamp() {
+    public long extractTimestamp() {
         String value = extMap.get(KEY_COLLECT_TIMESTAMP);
         if (value == null || "".equals(value)) {
             return 0;
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/common/PathUtils.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/common/PathUtils.java
index b3524c7c..8efb2a6b 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/common/PathUtils.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/common/PathUtils.java
@@ -242,12 +242,10 @@ public class PathUtils {
     }
 
     private static void readFile(String filepath, String fileName, 
List<String> list) throws FileNotFoundException, IOException {
-        try {
-            File file = new File(filepath);
-            if (!file.isDirectory()) {
-                return;
-            } else if (file.isDirectory()) {
-                String[] fileList = file.list();
+        File file = new File(filepath);
+        if (file.isDirectory()) {
+            String[] fileList = file.list();
+            if (null != fileList && fileList.length > 0) {
                 for (int i = 0; i < fileList.length; i++) {
                     String subPath;
                     if (filepath.endsWith("/")) {
@@ -263,14 +261,8 @@ public class PathUtils {
                         readFile(subPath, fileName, list);
                     }
                 }
-
             }
-        } catch (FileNotFoundException e) {
-            throw e;
-        } catch (IOException e) {
-            throw e;
         }
-        return;
     }
 
     private static void readFile(String filepath, String fileName, String 
dictionaries, List<String> list) throws FileNotFoundException, IOException {
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
index 50ad6a3f..558d437e 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
@@ -129,7 +129,7 @@ public abstract class AbstractLogParser implements 
LogParser {
         if (!ret.containsKey(esKeyMap_timestamp) && 
logData.startsWith(LOG_PREFIX)) {
             String timeStamp = StringUtils.substringBetween(logData, 
LOG_PREFIX, LOG_SUFFFIX);
             Long time = getTimestampFromString(timeStamp, collectStamp);
-            ret.put(esKeyMap_timestamp, time);
+            ret.put(esKeyMap_timestamp, time != null ? time : 
System.currentTimeMillis());
         }
         /**
          * Special handling, only dates starting with a date in the file such 
as yyyy-mm-dd HH:mm:ss will be extracted
@@ -137,7 +137,7 @@ public abstract class AbstractLogParser implements 
LogParser {
         if (!ret.containsKey(esKeyMap_timestamp) && 
logData.startsWith(specialTimePrefix)) {
             String timeStamp = StringUtils.substring(logData, 0, 
specialTimeLength);
             Long time = getTimestampFromString(timeStamp, collectStamp);
-            ret.put(esKeyMap_timestamp, time);
+            ret.put(esKeyMap_timestamp, time != null ? time : 
System.currentTimeMillis());
         }
     }
 
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
index 6987a1ab..a76dd2ab 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
@@ -165,13 +165,13 @@ public class PlaceholderParser extends AbstractLogParser {
                 } else {
                     // 静态部分严格匹配
                     if (!remaining.startsWith(part.text)) {
-                        return Collections.emptyMap();
+                        return new HashMap<>();
                     }
                     remaining = remaining.substring(part.text.length());
                 }
             }
         } catch (Exception e) {
-            return Collections.emptyMap();
+            return new HashMap<>();
         }
 
         return result;
diff --git 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
index d419cf66..0e809025 100644
--- 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
+++ 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
@@ -151,7 +151,27 @@ public class LogParserTest {
         String parseScript = "[%s %s[2@ ] %s[-]] -[trace_id=%s] %s - %s";
         String logData = "[INFO 2025-09-03 09:47:05.233 http-nio-1115-exec-4] 
-[trace_id=47f8940234d777de50d17685171a6183] 
com.test.update.web.controller.v0.UpdatesController - request info 
d:sapphiren_id_global to test";
         Long collectStamp = Instant.now().toEpochMilli();
-        Integer parserType = 
LogParserFactory.LogParserEnum.PLACEHOLDER_PARSE.getCode();
+        Integer parserType = 
LogParserFactory.LogParserEnum.CUSTOM_PARSE.getCode();
+        LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, 
keyOrderList);
+        Map<String, Object> parse = customParse.parse(logData, "", 1l, 
collectStamp, "");
+        System.out.println(parse);
+        List<String> dataList = customParse.parseLogData(logData);
+
+        System.out.println(customParse.getTimestampFromString("2023-08-25 
10:46:09.239", collectStamp));
+        stopwatch.stop();
+        log.info("cost time:{}", stopwatch.elapsed().toMillis());
+    }
+
+    @Test
+    public void LogPlaceholderParserTest1() throws Exception {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        String keyList = 
"timestamp:date,level:keyword,traceId:keyword,threadName:text,className:text,line:keyword,methodName:keyword,message:text,podName:keyword,ng_id:text,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword";
+        String keyOrderList = 
"timestamp:1,level:1,traceId:1,threadName:1,className:1,line:1,methodName:1,message:1,podName:1,ng_id:2,logstore:3,logsource:3,mqtopic:3,mqtag:3,logip:3,tail:3,linenumber:3,tailId:3,spaceId:3,storeId:3,deploySpace:3";
+        String valueList = "0,2,5,4,-1,6,-1,7,1,3";
+        String parseScript = "[%s]-[%s]-[%s]-[%s]-[%s]-[%s]-[%s]-%s";
+        String logData = "[2025-09-16 14:50:22.718] 
[mit-after-sales-asp-spareparts-service-1201735-5486d457fd-hrcmk] [INFO] [] 
[DubboServerHandler-10.158.197.220:20880-thread-196] 
[fa7ff7698c88a87dd9f5d1541ee33130] [?.?(?:?)] transferTypeValueIllegalNew 
removed, 
values:[C015520011700],rules:[{\\\"id\\\":805868,\\\"ruleId\\\":2674,\\\"type\\\":15,\\\"value\\\":\\\"SCNE010076600\\\"}]\\r\\n";
+        Long collectStamp = Instant.now().toEpochMilli();
+        Integer parserType = 
LogParserFactory.LogParserEnum.CUSTOM_PARSE.getCode();
         LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, 
keyOrderList);
         Map<String, Object> parse = customParse.parse(logData, "", 1l, 
collectStamp, "");
         System.out.println(parse);
diff --git 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
index b2ad8170..f46fb8d5 100644
--- 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
+++ 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
@@ -219,10 +219,10 @@ public class PathUtilsTest {
 
     @Test
     public void test81() {
-        String logPattern = "/home/work/log/error-2022-08-04_05_1.log";
+        String logPattern = "/home/work/logs/.*_total.log";
         Pattern pattern = Pattern.compile(logPattern);
 
-        Assert.assertEquals(false, 
pattern.matcher("/home/work/log/aa/server.log").matches());
+        Assert.assertEquals(true, 
pattern.matcher("/home/work/logs/log_total.log").matches());
         Assert.assertEquals(false, 
pattern.matcher("/home/work/log/bb/server.log").matches());
 
         Assert.assertEquals(false, 
pattern.matcher("/home/work/log/aa/bb/server.log").matches());
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/CommonExtensionService.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/CommonExtensionService.java
index f0e1086b..a2e7436b 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/CommonExtensionService.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/CommonExtensionService.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ozhera.log.manager.service.extension.common;
 
+import org.apache.ozhera.log.api.enums.FilterIdEnum;
 import org.apache.ozhera.log.manager.model.vo.LogQuery;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.TermQueryBuilder;
@@ -56,4 +57,6 @@ public interface CommonExtensionService {
     String getSpaceDataId(Long spaceId);
 
     List<String> queryMachineRegions();
+
+    Boolean matchesCondition(Long space, FilterIdEnum idEnum, Long id);
 }
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/DefaultCommonExtensionService.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/DefaultCommonExtensionService.java
index 947782d5..679b022a 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/DefaultCommonExtensionService.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/DefaultCommonExtensionService.java
@@ -18,12 +18,13 @@
  */
 package org.apache.ozhera.log.manager.service.extension.common;
 
-import org.apache.ozhera.log.api.enums.MQSourceEnum;
-import org.apache.ozhera.log.api.enums.MachineRegionEnum;
-import org.apache.ozhera.log.manager.model.vo.LogQuery;
 import com.xiaomi.youpin.docean.anno.Service;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.ozhera.log.api.enums.FilterIdEnum;
+import org.apache.ozhera.log.api.enums.MQSourceEnum;
+import org.apache.ozhera.log.api.enums.MachineRegionEnum;
+import org.apache.ozhera.log.manager.model.vo.LogQuery;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.TermQueryBuilder;
@@ -106,6 +107,11 @@ public class DefaultCommonExtensionService implements 
CommonExtensionService {
         return 
Arrays.stream(MachineRegionEnum.values()).map(MachineRegionEnum::getEn).collect(Collectors.toList());
     }
 
+    @Override
+    public Boolean matchesCondition(Long spaceId, FilterIdEnum idEnum, Long 
id) {
+        return true;
+    }
+
     @Getter
     public static enum QueryTypeEnum {
         ID,
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/HeralogHomePageServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/HeralogHomePageServiceImpl.java
index d4c8373f..878d6a9d 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/HeralogHomePageServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/HeralogHomePageServiceImpl.java
@@ -19,7 +19,11 @@
 package org.apache.ozhera.log.manager.service.impl;
 
 import com.google.common.collect.Lists;
+import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.ozhera.app.api.response.AppBaseInfo;
+import org.apache.ozhera.log.api.enums.FilterIdEnum;
 import org.apache.ozhera.log.common.Config;
 import org.apache.ozhera.log.common.Result;
 import org.apache.ozhera.log.exception.CommonError;
@@ -34,9 +38,8 @@ import 
org.apache.ozhera.log.manager.model.pojo.MilogLogStoreDO;
 import org.apache.ozhera.log.manager.model.pojo.MilogLogTailDo;
 import org.apache.ozhera.log.manager.model.pojo.MilogStoreSpaceAuth;
 import org.apache.ozhera.log.manager.service.HeralogHomePageService;
-import com.xiaomi.youpin.docean.anno.Service;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
+import 
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionService;
+import 
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionServiceFactory;
 import org.jetbrains.annotations.Nullable;
 
 import javax.annotation.Resource;
@@ -44,27 +47,28 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
 @Slf4j
 @Service
 public class HeralogHomePageServiceImpl implements HeralogHomePageService {
-    
+
     @Resource
     private MilogLogTailDao milogLogtailDao;
-    
+
     @Resource
     private HeraAppServiceImpl heraAppService;
-    
+
     @Resource
     private MilogLogstoreDao milogLogstoreDao;
-    
+
     @Resource
     private MilogStoreSpaceAuthDao milogStoreSpaceAuthDao;
-    
+
     private List<ValueDTO<String>> milogpattern;
-    
+
+    private CommonExtensionService commonExtensionService;
+
     {
         String pattern = Config.ins().get("milogpattern", "");
         String[] split = pattern.split(",");
@@ -74,7 +78,11 @@ public class HeralogHomePageServiceImpl implements 
HeralogHomePageService {
         }
         milogpattern = valueDTOS;
     }
-    
+
+    public void init() {
+        commonExtensionService = 
CommonExtensionServiceFactory.getCommonExtensionService();
+    }
+
     @Override
     public Result<Map<String, Object>> milogAccess() {
         Long total = heraAppService.getAppCount();
@@ -84,7 +92,7 @@ public class HeralogHomePageServiceImpl implements 
HeralogHomePageService {
         map.put("access", access);
         return new Result<>(CommonError.Success.getCode(), 
CommonError.Success.getMessage(), map);
     }
-    
+
     @Override
     public Result<List<UnAccessAppDTO>> unAccessAppList() {
         List<AppBaseInfo> appBaseInfos = heraAppService.queryAllExistsApp();
@@ -99,30 +107,33 @@ public class HeralogHomePageServiceImpl implements 
HeralogHomePageService {
         }
         return new Result<>(CommonError.Success.getCode(), 
CommonError.Success.getMessage(), list);
     }
-    
+
     @Override
     public Result<List<MilogSpaceTreeDTO>> getMilogSpaceTree(Long spaceId) {
         List<MilogLogStoreDO> stores = getMilogLogStoreDOS(spaceId);
-        List<MilogSpaceTreeDTO> spaceTreeDTOS = 
stores.stream().map(milogLogstoreDO -> {
-            Long logstoreDOId = milogLogstoreDO.getId();
+        assert stores != null;
+        List<MilogSpaceTreeDTO> spaceTrees = 
stores.stream().map(milogLogstoreDO -> {
+            Long logStoreDoId = milogLogstoreDO.getId();
             MilogSpaceTreeDTO milogSpaceTreeDTO = new MilogSpaceTreeDTO();
             milogSpaceTreeDTO.setLabel(milogLogstoreDO.getLogstoreName());
-            milogSpaceTreeDTO.setValue(logstoreDOId);
-            List<MilogLogTailDo> logTailDos = 
milogLogtailDao.getMilogLogtailByStoreId(logstoreDOId);
+            milogSpaceTreeDTO.setValue(logStoreDoId);
+            List<MilogLogTailDo> logTailDos = 
milogLogtailDao.getMilogLogtailByStoreId(logStoreDoId);
             if (CollectionUtils.isNotEmpty(logTailDos)) {
-                List<MapDTO<String, Long>> collect = 
logTailDos.stream().map(logTailDo -> {
-                    MapDTO<String, Long> mapDTO = new MapDTO();
-                    mapDTO.setValue(logTailDo.getId());
-                    mapDTO.setLabel(logTailDo.getTail());
-                    return mapDTO;
-                }).collect(Collectors.toList());
+                List<MapDTO<String, Long>> collect = logTailDos.stream()
+                        .filter(logTailDo -> 
commonExtensionService.matchesCondition(spaceId, FilterIdEnum.FILTER_TAIL_ID, 
logTailDo.getId()))
+                        .map(logTailDo -> {
+                            MapDTO<String, Long> mapDTO = new MapDTO<>();
+                            mapDTO.setValue(logTailDo.getId());
+                            mapDTO.setLabel(logTailDo.getTail());
+                            return mapDTO;
+                        }).collect(Collectors.toList());
                 milogSpaceTreeDTO.setChildren(collect);
             }
             return milogSpaceTreeDTO;
         }).collect(Collectors.toList());
-        return Result.success(spaceTreeDTOS);
+        return Result.success(spaceTrees);
     }
-    
+
     /**
      * Query the store that originally belonged to the space, and query the 
authorized store
      *
@@ -132,21 +143,24 @@ public class HeralogHomePageServiceImpl implements 
HeralogHomePageService {
     @Override
     @Nullable
     public List<MilogLogStoreDO> getMilogLogStoreDOS(Long spaceId) {
-        List<MilogLogStoreDO> storeDOS = Lists.newArrayList();
+        List<MilogLogStoreDO> storeDoS = Lists.newArrayList();
         List<MilogLogStoreDO> stores = 
milogLogstoreDao.getMilogLogstoreBySpaceId(spaceId);
         List<MilogStoreSpaceAuth> storeSpaceAuths = 
milogStoreSpaceAuthDao.queryStoreIdsBySpaceId(spaceId);
         if (CollectionUtils.isNotEmpty(stores)) {
-            storeDOS = stores;
+            storeDoS = stores;
         }
         if (CollectionUtils.isNotEmpty(storeSpaceAuths)) {
             List<MilogLogStoreDO> collect = storeSpaceAuths.stream()
                     .map(storeSpaceAuth -> 
milogLogstoreDao.queryById(storeSpaceAuth.getStoreId()))
-                    .filter(Objects::nonNull).collect(Collectors.toList());
-            storeDOS.addAll(collect);
+                    .toList();
+            storeDoS.addAll(collect);
         }
-        return storeDOS;
+        if (CollectionUtils.isNotEmpty(storeDoS)) {
+            storeDoS = storeDoS.stream().filter(store -> null != store && 
commonExtensionService.matchesCondition(spaceId, FilterIdEnum.FILTER_STORE_ID, 
store.getId())).toList();
+        }
+        return storeDoS;
     }
-    
+
     @Override
     public Result<List<ValueDTO<String>>> getMiloglogAccessPattern() {
         return new Result<>(CommonError.Success.getCode(), 
CommonError.Success.getMessage(), milogpattern);
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
index 712ba725..f3bc6d45 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
@@ -302,7 +302,12 @@ public class MilogConfigNacosServiceImpl implements 
MilogConfigNacosService {
 
     private synchronized MilogSpaceData dealSpaceConfigByRule(
             String motorRoomEn, Long spaceId, Long storeId, Long tailId, 
Integer type, String changeType) {
-        MilogSpaceData existConfig = 
getSpaceConfigNacosProvider(motorRoomEn).getConfig(spaceId);
+        SpaceConfigNacosProvider configNacosProvider = 
getSpaceConfigNacosProvider(motorRoomEn);
+        if (null == configNacosProvider) {
+            log.error("configNacosProvider is null,contact 
us,motorRoomEn:{},tailId:{},changeType:{}", motorRoomEn, tailId, changeType);
+            return null;
+        }
+        MilogSpaceData existConfig = configNacosProvider.getConfig(spaceId);
         // new configuration
         if (null == existConfig || 
OperateEnum.ADD_OPERATE.getCode().equals(type)) {
             // The configuration is not configured yet, initialize the 
configuration
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/SpaceConfigNacosPublisher.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/SpaceConfigNacosPublisher.java
index f2aa07b0..b82efba4 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/SpaceConfigNacosPublisher.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/SpaceConfigNacosPublisher.java
@@ -21,6 +21,7 @@ package org.apache.ozhera.log.manager.service.nacos.impl;
 import com.alibaba.nacos.api.config.ConfigService;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionServiceFactory;
 import org.apache.ozhera.log.manager.service.nacos.DynamicConfigPublisher;
 import org.apache.ozhera.log.model.MilogSpaceData;
@@ -47,6 +48,10 @@ public class SpaceConfigNacosPublisher implements 
DynamicConfigPublisher<MilogSp
         String dataId = 
CommonExtensionServiceFactory.getCommonExtensionService().getLogManagePrefix() 
+ TAIL_CONFIG_DATA_ID + uniqueSpace;
         try {
             if (null != configService) {
+                if (StringUtils.isBlank(dataId)) {
+                    log.error("dataId is null,uniqueSpace:{},config:{}", 
uniqueSpace, configJson);
+                    return;
+                }
                 configService.publishConfig(dataId, DEFAULT_GROUP_ID, 
gson.toJson(config));
             } else {
                 log.warn("configService is null,uniqueSpace:{},config:{}", 
uniqueSpace, configJson);
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosProvider.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosProvider.java
index 2768249c..124c7f17 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosProvider.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosProvider.java
@@ -49,7 +49,12 @@ public class StreamConfigNacosProvider implements 
DynamicConfigProvider<MiLogStr
             if (null == spaceId) {
                 rules = 
configService.getConfig(CommonExtensionServiceFactory.getCommonExtensionService().getLogManagePrefix()
 + NAMESPACE_CONFIG_DATA_ID, DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
             } else {
-                rules = 
configService.getConfig(CommonExtensionServiceFactory.getCommonExtensionService().getSpaceDataId(spaceId),
 DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
+                String spaceDataId = 
CommonExtensionServiceFactory.getCommonExtensionService().getSpaceDataId(spaceId);
+                if (StringUtils.isBlank(spaceDataId)) {
+                    log.error("The space does not exist, please check the 
spaceId,spaceId:{}", spaceId);
+                    return null;
+                }
+                rules = configService.getConfig(spaceDataId, DEFAULT_GROUP_ID, 
DEFAULT_TIME_OUT_MS);
 
             }
             log.info("The NACOS query log is initially configured:{}", rules);
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
index 2682d64d..fddd0525 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
@@ -23,6 +23,7 @@ import com.alibaba.nacos.api.exception.NacosException;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionServiceFactory;
 import org.apache.ozhera.log.manager.service.nacos.DynamicConfigPublisher;
 import org.apache.ozhera.log.model.MiLogStreamConfig;
@@ -49,7 +50,12 @@ public class StreamConfigNacosPublisher implements 
DynamicConfigPublisher<MiLogS
             return;
         }
         try {
-            
configService.publishConfig(CommonExtensionServiceFactory.getCommonExtensionService().getSpaceDataId(spaceId),
 DEFAULT_GROUP_ID, gson.toJson(config));
+            String spaceDataId = 
CommonExtensionServiceFactory.getCommonExtensionService().getSpaceDataId(spaceId);
+            if (StringUtils.isBlank(spaceDataId)) {
+                log.error("spaceDataId is null,spaceId:{}", spaceId);
+                return;
+            }
+            configService.publishConfig(spaceDataId, DEFAULT_GROUP_ID, 
gson.toJson(config));
         } catch (NacosException e) {
             log.error("Create namespace push data exceptions, parameters:{}", 
gson.toJson(config), e);
         }
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
index bb31de93..8088df7c 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
@@ -131,7 +131,7 @@ public class LogDataTransfer {
     private Map<String, Object> parseMessage(LineMessage lineMessage) {
         String ip = lineMessage.getProperties(LineMessage.KEY_IP);
         Long lineNumber = lineMessage.getLineNumber();
-        Map<String, Object> dataMap = 
logParser.parse(lineMessage.getMsgBody(), ip, lineNumber, 
lineMessage.getTimestamp(), lineMessage.getFileName());
+        Map<String, Object> dataMap = 
logParser.parse(lineMessage.getMsgBody(), ip, lineNumber, 
lineMessage.extractTimestamp(), lineMessage.getFileName());
         putCommonData(dataMap);
         return dataMap;
     }
@@ -140,12 +140,6 @@ public class LogDataTransfer {
         return objectMapper.readValue(msg, LineMessage.class);
     }
 
-    public static void main(String[] args) throws JsonProcessingException {
-        String message = 
"{\"extMap\":{\"ct\":\"1756780824719\",\"ip\":\"10.7.84.220\",\"tag\":\"tags_392_120935_132193\",\"type\":\"1\"},\"fileName\":\"/home/work/log/nr-promotion-promotion-admin-global-1080348-c-67bbfb479c-27t45/promotion-admin-global/server.log\",\"lineNumber\":2642,\"msgBody\":\"2025-09-02
 10:40:21,610|INFO 
|4a1378575e03c5d38fa9fd0f7351227a|call_client213|c.x.n.p.a.g.i.u.filter.DubboCommonFilter|dubbo
 invoke. service:com.xiaomi.nr.promotion.admin.global.impl.DubboHea [...]
-        LineMessage lineMessage = new ObjectMapper().readValue(message, 
LineMessage.class);
-        System.out.println(lineMessage.getTimestamp());
-    }
-
     private void putCommonData(Map<String, Object> dataMap) {
         dataMap.putIfAbsent(LOG_STREAM_SPACE_ID, 
sinkJobConfig.getLogSpaceId());
         dataMap.putIfAbsent(LOG_STREAM_STORE_ID, 
sinkJobConfig.getLogStoreId());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to