This is an automated email from the ASF dual-hosted git repository.

gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new fc39363a refactor: log monitoring modification and content parsing 
optimization (#561)
fc39363a is described below

commit fc39363a941a23355e95622d9feb9817ab6cdbf9
Author: wtt <[email protected]>
AuthorDate: Wed Mar 12 18:58:16 2025 +0800

    refactor: log monitoring modification and content parsing optimization 
(#561)
    
    * refactor(log): fefactoring log parsing logic
    
    * refactor: reconstruct file monitoring logic and optimize log parsing 
function
    
    * refactor: Optimize code comments for EsDataServiceImpl class
    
    * refactor: optimize code comments for EsDataServiceImpl class
---
 .../ozhera/log/agent/channel/ChannelEngine.java    |  58 ++++---
 .../log/agent/channel/ChannelServiceImpl.java      |  43 ++---
 .../agent/channel/WildcardChannelServiceImpl.java  |   2 +-
 .../ozhera/log/agent/channel/file/FileMonitor.java |   2 +-
 .../agent/channel/file/InodeFileComparator.java    |  42 +++--
 .../channel/file/LogFileAlterationObserver.java    |  49 ++++--
 .../listener/DefaultFileMonitorListener.java       |  56 +++---
 .../channel/listener/FileMonitorListener.java      |   6 +-
 .../log/agent/channel/memory/ChannelMemory.java    |   2 +-
 .../ozhera/log/agent/common/ChannelUtil.java       |   2 +-
 .../ozhera/log/agent/common/ExecutorUtil.java      |   4 +-
 .../apache/ozhera/log/agent/FilterMonitorTest.java |  10 +-
 .../org/apache/ozhera/log/model/LogtailConfig.java |   3 +
 .../org/apache/ozhera/log/model/SinkConfig.java    |   1 +
 .../apache/ozhera/log/parse/AbstractLogParser.java |  14 +-
 .../apache/ozhera/log/parse/CustomLogParser.java   |  10 +-
 .../org/apache/ozhera/log/parse/JsonLogParser.java |  32 ++--
 .../org/apache/ozhera/log/parse/LogParser.java     |   8 +-
 .../apache/ozhera/log/parse/LogParserFactory.java  |   8 +-
 .../ozhera/log/parse/SeparatorLogParser.java       |   7 +-
 .../org/apache/ozhera/log/utils/DateUtils.java     | 188 ++++++++++++++-------
 .../apache/ozhera/log/common/LogParserTest.java    |  39 +++--
 .../manager/service/impl/EsDataServiceImpl.java    | 107 ++++++++++--
 .../log/stream/config/MilogConfigListener.java     |  11 +-
 .../apache/ozhera/log/stream/job/JobManager.java   |   8 +-
 .../ozhera/log/stream/job/LogDataTransfer.java     |  13 +-
 .../ozhera/log/stream/plugin/es/EsPlugin.java      |   2 +-
 27 files changed, 477 insertions(+), 250 deletions(-)

diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
index a860f1c7..c6312672 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ozhera.log.agent.channel;
 
+import cn.hutool.core.io.FileUtil;
 import cn.hutool.core.lang.Pair;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -26,6 +27,14 @@ import com.xiaomi.data.push.common.SafeRun;
 import com.xiaomi.data.push.rpc.RpcClient;
 import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
 import com.xiaomi.mone.file.ILogFile;
+import com.xiaomi.youpin.docean.Ioc;
+import com.xiaomi.youpin.docean.anno.Lookup;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.plugin.config.Config;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.log.agent.channel.comparator.*;
 import org.apache.ozhera.log.agent.channel.listener.DefaultFileMonitorListener;
 import org.apache.ozhera.log.agent.channel.listener.FileMonitorListener;
@@ -45,14 +54,6 @@ import org.apache.ozhera.log.api.enums.OperateEnum;
 import org.apache.ozhera.log.api.model.vo.UpdateLogProcessCmd;
 import org.apache.ozhera.log.common.Constant;
 import org.apache.ozhera.log.utils.NetUtil;
-import com.xiaomi.youpin.docean.Ioc;
-import com.xiaomi.youpin.docean.anno.Lookup;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.plugin.config.Config;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 
 import java.text.NumberFormat;
 import java.util.*;
@@ -63,6 +64,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.ozhera.log.common.Constant.GSON;
+import static org.apache.ozhera.log.common.Constant.SYMBOL_COMMA;
 import static org.apache.ozhera.log.common.PathUtils.PATH_WILDCARD;
 import static org.apache.ozhera.log.common.PathUtils.SEPARATOR;
 
@@ -214,7 +216,7 @@ public class ChannelEngine {
             log.info("realChannelService,id:{}", channelId);
             try {
                 channelService.start();
-                fileMonitorListener.addChannelService(channelService);
+                fileMonitorListener.addChannelService(abstractChannelService);
                 successChannelIds.add(channelId);
             } catch (RejectedExecutionException e) {
                 log.error("The thread pool is full.id:{}", channelId, e);
@@ -266,21 +268,32 @@ public class ChannelEngine {
             if (null == agentMemoryService) {
                 agentMemoryService = new 
AgentMemoryServiceImpl(org.apache.ozhera.log.common.Config.ins().get("agent.memory.path",
 AgentMemoryService.DEFAULT_BASE_PATH));
             }
-            ChannelService channelService;
-            Input input = channelDefine.getInput();
-            boolean matchWildcard = 
Arrays.stream(input.getLogPattern().split(",")).anyMatch(data -> 
StringUtils.substringAfterLast(data, SEPARATOR).contains(PATH_WILDCARD));
-            if (matchWildcard) {
-                channelService = new WildcardChannelServiceImpl(exporter, 
agentMemoryService, channelDefine, filterChain, memoryBasePath);
-            } else {
-                channelService = new ChannelServiceImpl(exporter, 
agentMemoryService, channelDefine, filterChain);
-            }
-            return channelService;
+            return createChannelService(channelDefine, exporter, filterChain);
         } catch (Throwable e) {
-            log.error("channelServiceTrans exception, channelDefine:{}, 
exception:{}", gson.toJson(channelDefine), e);
+            log.error("channelServiceTrans exception, channelDefine:{}", 
gson.toJson(channelDefine), e);
         }
         return null;
     }
 
+    private ChannelService createChannelService(ChannelDefine channelDefine, 
MsgExporter exporter, FilterChain filterChain) {
+        Input input = channelDefine.getInput();
+        String logType = channelDefine.getInput().getType();
+        boolean containsWildcard = 
isWildcardAllowedForLogType(input.getLogPattern(), logType);
+        if (containsWildcard || FileUtil.exist(input.getLogPattern())) {
+            return new ChannelServiceImpl(exporter, agentMemoryService, 
channelDefine, filterChain);
+        } else {
+            return new WildcardChannelServiceImpl(exporter, 
agentMemoryService, channelDefine, filterChain, memoryBasePath);
+        }
+    }
+
+    private boolean isWildcardAllowedForLogType(String logPattern, String 
logType) {
+        if (LogTypeEnum.OPENTELEMETRY == LogTypeEnum.name2enum(logType)) {
+            return true;
+        }
+        return Arrays.stream(logPattern.split(SYMBOL_COMMA))
+                .noneMatch(data -> StringUtils.substringAfterLast(data, 
SEPARATOR).contains(PATH_WILDCARD));
+    }
+
     private void preCheckChannelDefine(ChannelDefine channelDefine) {
         Preconditions.checkArgument(null != channelDefine, "channelDefine can 
not be null");
         Preconditions.checkArgument(null != channelDefine.getInput(), 
"channelDefine.input can not be null");
@@ -434,14 +447,15 @@ public class ChannelEngine {
         try {
             if (directDel || CollectionUtils.isNotEmpty(channelDels)) {
                 log.info("[delete config]data:{}", gson.toJson(channelDels));
-                List<Long> channelIdDels = 
channelDels.stream().map(ChannelDefine::getChannelId).collect(Collectors.toList());
+                List<Long> channelIdDels = 
channelDels.stream().map(ChannelDefine::getChannelId).toList();
                 List<ChannelService> tempChannelServiceList = 
Lists.newArrayList();
                 channelServiceList.forEach(channelService -> {
-                    Long channelId = ((AbstractChannelService) 
channelService).getChannelDefine().getChannelId();
+                    AbstractChannelService abstractChannelService = 
(AbstractChannelService) channelService;
+                    Long channelId = 
abstractChannelService.getChannelDefine().getChannelId();
                     if (channelIdDels.contains(channelId)) {
                         log.info("[delete config]channelService:{}", 
channelId);
                         channelService.close();
-                        
fileMonitorListener.removeChannelService(channelService);
+                        
fileMonitorListener.removeChannelService(abstractChannelService);
                         tempChannelServiceList.add(channelService);
                         this.channelDefineList.removeIf(channelDefine -> {
                             if 
(channelDefine.getChannelId().equals(channelId)) {
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index bcf8c3aa..3d976ddd 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -42,7 +42,6 @@ import org.apache.ozhera.log.api.enums.K8sPodTypeEnum;
 import org.apache.ozhera.log.api.enums.LogTypeEnum;
 import org.apache.ozhera.log.api.model.meta.FilterConf;
 import org.apache.ozhera.log.api.model.msg.LineMessage;
-import org.apache.ozhera.log.common.Config;
 import org.apache.ozhera.log.common.Constant;
 import org.apache.ozhera.log.common.PathUtils;
 import org.apache.ozhera.log.utils.NetUtil;
@@ -66,10 +65,11 @@ import static 
org.apache.ozhera.log.common.PathUtils.SEPARATOR;
 @Slf4j
 public class ChannelServiceImpl extends AbstractChannelService {
 
-    private AgentMemoryService memoryService;
+    private final AgentMemoryService memoryService;
 
     private MsgExporter msgExporter;
 
+    @Getter
     private ChannelDefine channelDefine;
 
     private ChannelMemory channelMemory;
@@ -80,7 +80,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
     @Getter
     private final ConcurrentHashMap<String, Future> futureMap = new 
ConcurrentHashMap<>();
 
-    private Set<String> delFileCollList = new CopyOnWriteArraySet<>();
+    private final Set<String> delFileCollList = new CopyOnWriteArraySet<>();
 
     private final Map<String, Long> reOpenMap = new HashMap<>();
     private final Map<String, Long> fileReadMap = new ConcurrentHashMap<>();
@@ -89,13 +89,13 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
 
     private ScheduledFuture<?> lastFileLineScheduledFuture;
 
-    private Gson gson = Constant.GSON;
+    private final Gson gson = Constant.GSON;
 
-    private List<LineMessage> lineMessageList = new ArrayList<>();
+    private final List<LineMessage> lineMessageList = new ArrayList<>();
 
-    private ReentrantLock fileColLock = new ReentrantLock();
+    private final ReentrantLock fileColLock = new ReentrantLock();
 
-    private ReentrantLock fileReopenLock = new ReentrantLock();
+    private final ReentrantLock fileReopenLock = new ReentrantLock();
 
     private volatile long lastSendTime = System.currentTimeMillis();
 
@@ -108,19 +108,15 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
      */
     private boolean collectOnce;
 
-    private FilterChain chain;
+    private final FilterChain chain;
 
     /**
      * The file path to monitor
      */
-    private List<MonitorFile> monitorFileList;
+    private final List<MonitorFile> monitorFileList;
 
     private LogTypeEnum logTypeEnum;
 
-    private String logPattern;
-
-    private String logSplitExpress;
-
     private String linePrefix;
 
     public ChannelServiceImpl(MsgExporter msgExporter, AgentMemoryService 
memoryService, ChannelDefine channelDefine, FilterChain chain) {
@@ -171,8 +167,8 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         Long channelId = channelDefine.getChannelId();
         Input input = channelDefine.getInput();
 
-        this.logPattern = input.getLogPattern();
-        this.logSplitExpress = input.getLogSplitExpress();
+        String logPattern = input.getLogPattern();
+        String logSplitExpress = input.getLogSplitExpress();
         this.linePrefix = input.getLinePrefix();
 
         String logType = channelDefine.getInput().getType();
@@ -312,9 +308,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                 return;
             }
             long ct = System.currentTimeMillis();
-            readResult.get().getLines().stream()
-                    .filter(l -> 
!shouldFilterLogs(channelDefine.getFilterLogLevelList(), l))
-                    .forEach(l -> {
+            readResult.get().getLines().stream().forEach(l -> {
                 String logType = channelDefine.getInput().getType();
                 LogTypeEnum logTypeEnum = LogTypeEnum.name2enum(logType);
                 // Multi-line application log type and opentelemetry type are 
used to determine the exception stack
@@ -354,7 +348,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                         try {
                             String remainMsg = mLog.takeRemainMsg2();
                             if (null != remainMsg) {
-                                log.info("start send last 
line,pattern:{},patternCode:{},data:{}", pattern, patternCode, remainMsg);
+                                log.info("start send last 
line,pattern:{},patternCode:{},ip:{},data:{}", pattern, patternCode, 
getTailPodIp(pattern), remainMsg);
                                 wrapDataToSend(remainMsg, 
referenceEntry.getValue().getValue(), pattern, patternCode, 
getTailPodIp(pattern), appendTime);
                             }
                         } finally {
@@ -441,7 +435,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         }
 
         if (!collectOnce && !snapshot.isEmpty()) {
-            String jsonMap = gson.toJson(snapshot);
+            String jsonMap = gson.toJson(snapshot.keySet());
             log.info("fileProgressMap: {}", jsonMap);
         }
     }
@@ -505,7 +499,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
 
     @Override
     public void close() {
-        log.info("Delete the current collection task,channelId:{}", 
getChannelId());
+        log.info("Delete the current collection 
task,channelId:{},logPattern:{}", getChannelId(), 
getChannelDefine().getInput().getLogPattern());
         //1.Stop log capture
         for (Map.Entry<String, ILogFile> fileEntry : logFileMap.entrySet()) {
             fileEntry.getValue().setStop(true);
@@ -525,7 +519,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         for (Future future : futureMap.values()) {
             future.cancel(false);
         }
-        log.info("stop file monitor,fileName:", 
logFileMap.keySet().stream().collect(Collectors.joining(SYMBOL_COMMA)));
+        log.info("stop file monitor,fileName:{}", String.join(SYMBOL_COMMA, 
logFileMap.keySet()));
         lineMessageList.clear();
         reOpenMap.clear();
         fileReadMap.clear();
@@ -615,11 +609,6 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         return monitorFileList;
     }
 
-    @Override
-    public ChannelDefine getChannelDefine() {
-        return channelDefine;
-    }
-
     public ChannelMemory getChannelMemory() {
         return channelMemory;
     }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index b9ad7963..2db94d8f 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -222,7 +222,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
                 .distinct()
                 .toList();
         return expressions.size() == 1 ?
-                expressions.get(0) :
+                expressions.getFirst() :
                 expressions.stream().collect(Collectors.joining("|", 
MULTI_FILE_PREFIX, MULTI_FILE_SUFFIX));
     }
 
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileMonitor.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileMonitor.java
index f6b31c76..8af23de3 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileMonitor.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileMonitor.java
@@ -42,7 +42,7 @@ public class FileMonitor implements FileWatcher {
         FileAlterationMonitor monitor = new FileAlterationMonitor(10000);
         log.info("agent monitor files:{}", GSON.toJson(watchList));
         for (String watch : watchList) {
-            FileAlterationObserver observer = new 
LogFileAlterationObserver(new File(watch));
+            FileAlterationObserver observer = new 
LogFileAlterationObserver(new File(watch), File::exists);
             observer.addListener(new FileListener(consumer));
             log.info("## agent monitor file:{}, filePattern:{}", watch, 
filePattern);
             monitor.addObserver(observer);
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
index 117cbb53..7713c4a4 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
@@ -19,11 +19,11 @@
 package org.apache.ozhera.log.agent.channel.file;
 
 import com.google.common.collect.Lists;
-import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
-import org.apache.ozhera.log.agent.common.ChannelUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.comparator.DefaultFileComparator;
 import org.apache.commons.io.comparator.NameFileComparator;
+import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
+import org.apache.ozhera.log.agent.common.ChannelUtil;
 
 import java.io.File;
 import java.util.*;
@@ -49,33 +49,41 @@ public class InodeFileComparator extends 
DefaultFileComparator {
 
     @Override
     public int compare(File file1, File file2) {
+//        log.info("InodeFileComparator compare 
file1:{},file2:{},filePaths:{}", file1, file2, GSON.toJson(filePaths));
         int sort = fileComparator.compare(file1, file2);
-        if (file1.isDirectory() || file2.isDirectory()) {
-            return sort;
-        }
-        if (sort == 0 && filePaths.contains(file1.getAbsolutePath())) {
-            //The file name is the same
-            Long oldInode;
-            if (INODE_MAP.containsKey(file1.getAbsolutePath())) {
-                oldInode = INODE_MAP.get(file1.getAbsolutePath());
-            } else {
-                oldInode = 
ChannelUtil.buildUnixFileNode(file1.getAbsolutePath()).getSt_ino();
-                INODE_MAP.put(file1.getAbsolutePath(), oldInode);
+        try {
+            if (file1.isDirectory() || file2.isDirectory()) {
+                return sort;
             }
-            ChannelMemory.UnixFileNode unixFileNode2 = 
ChannelUtil.buildUnixFileNode(file2.getAbsolutePath());
-            if (!Objects.equals(oldInode, unixFileNode2.getSt_ino())) {
-                INODE_MAP.put(file2.getAbsolutePath(), 
unixFileNode2.getSt_ino());
-                return 1;
+            if (sort == 0 && filePaths.contains(file1.getAbsolutePath())) {
+                //The file name is the same
+//                log.info("INODE_MAP:{}", GSON.toJson(INODE_MAP));
+                Long oldInode;
+                if (INODE_MAP.containsKey(file1.getAbsolutePath())) {
+                    oldInode = INODE_MAP.get(file1.getAbsolutePath());
+                } else {
+                    oldInode = 
ChannelUtil.buildUnixFileNode(file1.getAbsolutePath()).getSt_ino();
+                    INODE_MAP.put(file1.getAbsolutePath(), oldInode);
+                }
+                ChannelMemory.UnixFileNode unixFileNode2 = 
ChannelUtil.buildUnixFileNode(file2.getAbsolutePath());
+                if (!Objects.equals(oldInode, unixFileNode2.getSt_ino())) {
+                    INODE_MAP.put(file2.getAbsolutePath(), 
unixFileNode2.getSt_ino());
+                    return 1;
+                }
             }
+        } catch (Exception e) {
+            log.error("InodeFileComparator compare error,file1:{},file2:{}", 
file1, file2, e);
         }
         return sort;
     }
 
     public static void addFile(String filePath) {
+        log.info("InodeFileComparator add file : {}", filePath);
         filePaths.add(filePath);
     }
 
     public static void removeFile(String filePath) {
+        log.info("InodeFileComparator remove file : {}", filePath);
         filePaths.remove(filePath);
     }
 }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/LogFileAlterationObserver.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/LogFileAlterationObserver.java
index 42d5a0bb..5fda5b01 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/LogFileAlterationObserver.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/LogFileAlterationObserver.java
@@ -84,26 +84,31 @@ public class LogFileAlterationObserver extends 
FileAlterationObserver {
 
     @Override
     public void checkAndNotify() {
+        try {
+            /* fire onStart() */
+            for (FileAlterationListener listener : getListeners()) {
+                listener.onStart(this);
+            }
 
-        /* fire onStart() */
-        for (FileAlterationListener listener : getListeners()) {
-            listener.onStart(this);
-        }
-
-        /* fire directory/file events */
-        File rootFile = getDirectory();
-        if (null == childRootEntry) {
-            childRootEntry = (FileEntry) ReflectUtil.getFieldValue(this, 
"rootEntry");
-        }
-        if (rootFile.exists()) {
-            checkAndNotify(childRootEntry, childRootEntry.getChildren(), 
listFiles(rootFile));
-        } else if (childRootEntry.isExists()) {
-            checkAndNotify(childRootEntry, childRootEntry.getChildren(), 
FileUtils.EMPTY_FILE_ARRAY);
-        }
+            /* fire directory/file events */
+            File rootFile = getDirectory();
+            if (null == childRootEntry) {
+                childRootEntry = (FileEntry) ReflectUtil.getFieldValue(this, 
"rootEntry");
+            }
+            if (rootFile.exists()) {
+//                log.info("LogFileAlterationObserver checkAndNotify 
rootFile:{}", rootFile.getAbsolutePath());
+                checkAndNotify(childRootEntry, childRootEntry.getChildren(), 
listFiles(rootFile));
+            } else if (childRootEntry.isExists()) {
+//                log.info("LogFileAlterationObserver checkAndNotify 
rootFile:{}", childRootEntry.getFile().getAbsolutePath());
+                checkAndNotify(childRootEntry, childRootEntry.getChildren(), 
FileUtils.EMPTY_FILE_ARRAY);
+            }
 
-        /* fire onStop() */
-        for (FileAlterationListener listener : getListeners()) {
-            listener.onStop(this);
+            /* fire onStop() */
+            for (FileAlterationListener listener : getListeners()) {
+                listener.onStop(this);
+            }
+        } catch (Exception e) {
+            log.error("LogFileAlterationObserver checkAndNotify", e);
         }
     }
 
@@ -112,6 +117,9 @@ public class LogFileAlterationObserver extends 
FileAlterationObserver {
         if (null == childComparator) {
             childComparator = (Comparator) ReflectUtil.getFieldValue(this, 
"comparator");
         }
+//        List<File> oldFileList = 
Arrays.stream(previous).map(FileEntry::getFile).collect(Collectors.toList());
+//        List<String> newFileList = 
Arrays.stream(files).map(File::getAbsolutePath).collect(Collectors.toList());
+//        log.info("childComparator:{},previous:{},files:{}", childComparator, 
GSON.toJson(oldFileList), GSON.toJson(newFileList));
         FileEntry[] current = files.length > 0 ? new FileEntry[files.length] : 
EMPTY_ENTRIES;
         for (FileEntry entry : previous) {
             while (c < files.length && 
childComparator.compare(entry.getFile(), files[c]) > 0) {
@@ -120,6 +128,7 @@ public class LogFileAlterationObserver extends 
FileAlterationObserver {
                 c++;
             }
             if (c < files.length && childComparator.compare(entry.getFile(), 
files[c]) == 0) {
+                doMatch(entry, files[c]);
                 checkAndNotify(entry, entry.getChildren(), 
listFiles(files[c]));
                 current[c] = entry;
                 c++;
@@ -134,6 +143,10 @@ public class LogFileAlterationObserver extends 
FileAlterationObserver {
         parent.setChildren(current);
     }
 
+    private void doMatch(final FileEntry entry, final File file) {
+        entry.refresh(file);
+    }
+
     private File[] listFiles(File file) {
         File[] children = null;
         if (file.isDirectory()) {
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
index ab56b129..c8a5cc60 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
@@ -22,23 +22,22 @@ import cn.hutool.core.io.FileUtil;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.agent.channel.AbstractChannelService;
 import org.apache.ozhera.log.agent.channel.ChannelService;
+import org.apache.ozhera.log.agent.channel.WildcardChannelServiceImpl;
 import org.apache.ozhera.log.agent.channel.file.FileMonitor;
 import org.apache.ozhera.log.agent.channel.file.MonitorFile;
 import org.apache.ozhera.log.agent.common.ChannelUtil;
 import org.apache.ozhera.log.agent.common.ExecutorUtil;
 import org.apache.ozhera.log.api.enums.LogTypeEnum;
 import org.apache.ozhera.log.common.PathUtils;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.monitor.FileAlterationMonitor;
-import org.apache.commons.lang3.StringUtils;
 
 import java.io.File;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
@@ -65,9 +64,9 @@ public class DefaultFileMonitorListener implements 
FileMonitorListener {
      */
     List<String> pathList = new CopyOnWriteArrayList<>();
     /**
-     * Actual listener list
+     * Actual listener map
      */
-    private List<FileAlterationMonitor> monitorList = new 
CopyOnWriteArrayList();
+    private final Map<Long, List<FileAlterationMonitor>> monitorMap = new 
HashMap<>();
     /**
      * Each listening thread
      */
@@ -75,11 +74,13 @@ public class DefaultFileMonitorListener implements 
FileMonitorListener {
     /**
      * Each ChannelService corresponds to the monitored file
      */
-    Map<List<MonitorFile>, ChannelService> pathChannelServiceMap = new 
ConcurrentHashMap<>();
+    private Map<List<MonitorFile>, ChannelService> pathChannelServiceMap = new 
ConcurrentHashMap<>();
 
     private final List<String> specialFileNameSuffixList = 
Lists.newArrayList("wf");
 
-    private static final int DEFAULT_FILE_SIZE = 100000;
+    private static final int DEFAULT_FILE_SIZE = 9000;
+
+    private static final long DEFAULT_CHANNEL_ID = 0L;
 
     public DefaultFileMonitorListener() {
         //Check if there are too many files, if there are more than 50,000 
files,
@@ -87,7 +88,8 @@ public class DefaultFileMonitorListener implements 
FileMonitorListener {
         long size = getDefaultFileSize();
         log.info("defaultMonitorPath:{} file size:{}", defaultMonitorPath, 
size);
         if (size < DEFAULT_FILE_SIZE) {
-            this.startFileMonitor(defaultMonitorPath);
+            monitorMap.putIfAbsent(DEFAULT_CHANNEL_ID, new ArrayList<>());
+            this.startFileMonitor(DEFAULT_CHANNEL_ID, defaultMonitorPath);
             pathList.add(defaultMonitorPath);
         }
     }
@@ -110,15 +112,17 @@ public class DefaultFileMonitorListener implements 
FileMonitorListener {
     }
 
     @Override
-    public void addChannelService(ChannelService channelService) {
+    public void addChannelService(AbstractChannelService channelService) {
         List<MonitorFile> monitorPathList = 
channelService.getMonitorPathList();
         if (CollectionUtils.isEmpty(monitorPathList)) {
             return;
         }
+        Long channelId = channelService.getChannelDefine().getChannelId();
+        monitorMap.putIfAbsent(channelId, new ArrayList<>());
         List<String> newMonitorDirectories = 
newMonitorDirectories(monitorPathList);
         for (String watchDirectory : newMonitorDirectories) {
             if (isValidWatch(watchDirectory)) {
-                startFileMonitor(watchDirectory);
+                startFileMonitor(channelId, watchDirectory);
                 pathList.add(watchDirectory);
             }
         }
@@ -171,35 +175,46 @@ public class DefaultFileMonitorListener implements 
FileMonitorListener {
             }
         }
         newMonitorDirectories = 
newMonitorDirectories.stream().distinct().collect(Collectors.toList());
-        log.info("end newMonitorDirectories:", 
gson.toJson(newMonitorDirectories));
+        log.info("end newMonitorDirectories:{}", 
gson.toJson(newMonitorDirectories));
         return newMonitorDirectories;
     }
 
     @Override
-    public void removeChannelService(ChannelService channelService) {
+    public void removeChannelService(AbstractChannelService channelService) {
         try {
+            if (channelService instanceof WildcardChannelServiceImpl) {
+                return;
+            }
             pathChannelServiceMap.remove(channelService.getMonitorPathList());
             List<MonitorFile> monitorPathList = 
channelService.getMonitorPathList();
             List<String> newMonitorDirectories = 
newMonitorDirectories(monitorPathList);
             for (String watchDirectory : newMonitorDirectories) {
+                log.info("remove watchDirectory:{}", watchDirectory);
                 pathList.remove(watchDirectory);
                 if (scheduledFutureMap.containsKey(watchDirectory)) {
                     scheduledFutureMap.get(watchDirectory).cancel(true);
                 }
             }
+            List<FileAlterationMonitor> fileAlterationMonitors = 
monitorMap.get(channelService.getChannelDefine().getChannelId());
+            if (CollectionUtils.isNotEmpty(fileAlterationMonitors)) {
+                for (FileAlterationMonitor fileAlterationMonitor : 
fileAlterationMonitors) {
+                    fileAlterationMonitor.stop();
+                }
+            }
         } catch (Exception e) {
             log.error("removeChannelService file listener,monitorPathList:{}", 
gson.toJson(channelService.getMonitorPathList()), e);
         }
     }
 
-    public void startFileMonitor(String monitorFilePath) {
+    public void startFileMonitor(Long channelId, String monitorFilePath) {
         log.debug("startFileMonitor,monitorFilePath:{}", monitorFilePath);
         if (pathList.stream().anyMatch(monitorFilePath::startsWith)) {
             log.info("current path has 
started,monitorFilePath:{},pathList:{}", monitorFilePath, 
String.join(SYMBOL_COMMA, pathList));
             return;
         }
+        List<FileAlterationMonitor> fileAlterationMonitors = 
monitorMap.get(channelId);
         Future<?> fileMonitorFuture = ExecutorUtil.submit(() -> {
-            new FileMonitor().watch(monitorFilePath, monitorList, 
changedFilePath -> {
+            new FileMonitor().watch(monitorFilePath, fileAlterationMonitors, 
changedFilePath -> {
                 try {
                     if (FileUtil.isDirectory(changedFilePath)) {
                         return;
@@ -215,14 +230,13 @@ public class DefaultFileMonitorListener implements 
FileMonitorListener {
                     log.error("FileMonitor 
error,monitorFilePath:{},changedFilePath:{}", monitorFilePath, changedFilePath, 
e);
                 }
             });
+            monitorMap.put(channelId, fileAlterationMonitors);
         });
         scheduledFutureMap.put(monitorFilePath, fileMonitorFuture);
     }
 
     /**
      * Normal file change event handling
-     *
-     * @param changedFilePath
      */
     private void ordinaryFileChanged(String changedFilePath) {
         for (Map.Entry<List<MonitorFile>, ChannelService> channelServiceEntry 
: pathChannelServiceMap.entrySet()) {
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/FileMonitorListener.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/FileMonitorListener.java
index ea8726ac..4512342f 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/FileMonitorListener.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/FileMonitorListener.java
@@ -18,7 +18,7 @@
  */
 package org.apache.ozhera.log.agent.channel.listener;
 
-import org.apache.ozhera.log.agent.channel.ChannelService;
+import org.apache.ozhera.log.agent.channel.AbstractChannelService;
 
 /**
  * @author wtt
@@ -30,10 +30,10 @@ public interface FileMonitorListener {
     /**
      * Add
      */
-    void addChannelService(ChannelService channelService);
+    void addChannelService(AbstractChannelService channelService);
 
     /**
      * Delete
      */
-    void removeChannelService(ChannelService channelService);
+    void removeChannelService(AbstractChannelService channelService);
 }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/ChannelMemory.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/ChannelMemory.java
index 0a5523b1..b8c409ec 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/ChannelMemory.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/ChannelMemory.java
@@ -34,7 +34,7 @@ import java.util.Map;
 @Data
 public class ChannelMemory implements Serializable {
 
-    public transient static final String DEFAULT_VERSION = "2.0";
+    public static final String DEFAULT_VERSION = "2.0";
 
     private Long channelId;
 
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
index bce4362a..27a0f1a3 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
@@ -96,7 +96,7 @@ public class ChannelUtil {
                 return unixFileNode;
             }
         } catch (Exception e) {
-            log.info("buildUnixFileNode error,filePath:{}", filePath, e);
+            log.error("buildUnixFileNode error,filePath:{}", filePath, e);
         }
         return new ChannelMemory.UnixFileNode();
     }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
index c7c78ffe..a9e94da3 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
@@ -47,9 +47,7 @@ public class ExecutorUtil {
     public static ExecutorService createPool() {
         System.setProperty("jdk.virtualThreadScheduler.parallelism", 
String.valueOf(Runtime.getRuntime().availableProcessors() + 1));
         ThreadFactory factory = 
Thread.ofVirtual().name("ExecutorUtil-TP-Virtual-Thread", 0)
-                .uncaughtExceptionHandler((t, e) -> {
-                    log.error("ExecutorUtil-TP-Virtual-Thread 
uncaughtException:{}", e.getMessage(), e);
-                }).factory();
+                .uncaughtExceptionHandler((t, e) -> 
log.error("ExecutorUtil-TP-Virtual-Thread uncaughtException:{}", 
e.getMessage(), e)).factory();
         return Executors.newThreadPerTaskExecutor(factory);
     }
 
diff --git 
a/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/FilterMonitorTest.java
 
b/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/FilterMonitorTest.java
index 2e61fd89..902d0fcc 100644
--- 
a/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/FilterMonitorTest.java
+++ 
b/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/FilterMonitorTest.java
@@ -20,10 +20,11 @@ package org.apache.ozhera.log.agent;
 
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
-import org.apache.ozhera.log.agent.channel.file.FileListener;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
 import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.apache.ozhera.log.agent.channel.file.FileListener;
+import org.apache.ozhera.log.agent.channel.file.LogFileAlterationObserver;
 import org.junit.Test;
 
 import java.io.File;
@@ -74,16 +75,17 @@ public class FilterMonitorTest {
 //                continue;
 //            }
 
-            FileAlterationObserver observer = new FileAlterationObserver(new 
File(watch));
+//            FileAlterationObserver observer = new FileAlterationObserver(new 
File(watch));
+            FileAlterationObserver observer = new 
LogFileAlterationObserver(new File(watch), file -> file.exists() && 
file.isFile());
             observer.addListener(new FileListener(consumer));
 
-            log.info("## agent monitor file:{}, filePattern:{}", watch);
+            log.info("## agent monitor file:{}", watch);
             monitor.addObserver(observer);
         }
 
         try {
             monitor.start();
-            log.info("## agent monitor filePattern:{} started");
+            log.info("## agent monitor started");
         } catch (Exception e) {
             log.error(String.format("agent file monitor start err,monitor 
filePattern:%s"), e);
         }
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/LogtailConfig.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/LogtailConfig.java
index 5dbf8aea..b25f85ec 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/LogtailConfig.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/LogtailConfig.java
@@ -32,6 +32,7 @@ public class LogtailConfig {
     private String ak;
     private String sk;
     private String clusterInfo;
+    @EqualsAndHashCode.Exclude
     private String consumerGroup;
     private String topic;
     private String tag;
@@ -41,6 +42,8 @@ public class LogtailConfig {
     private String deploySpace;
 
     private Integer parseType;
+
+    @EqualsAndHashCode.Exclude
     private String tail;
     /**
      * Log delimiter
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
index ce991a0e..4ca08b0c 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
@@ -30,6 +30,7 @@ import java.util.List;
 @EqualsAndHashCode
 public class SinkConfig {
     private Long logstoreId;
+    @EqualsAndHashCode.Exclude
     private String logstoreName;
     /**
      * timestamp is required
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
index 09acb66c..50ad6a3f 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.log.utils.IndexUtils;
+import org.apache.ozhera.log.utils.NetUtil;
 
 import java.util.*;
 import java.util.function.Function;
@@ -43,6 +44,8 @@ public abstract class AbstractLogParser implements LogParser {
 
     private List<FieldInterceptor> fieldInterceptors = Lists.newArrayList();
 
+    private String localIp = NetUtil.getLocalIp();
+
     protected Map<String, Integer> valueMap;
 
     public AbstractLogParser(LogParserData parserData) {
@@ -88,13 +91,20 @@ public abstract class AbstractLogParser implements 
LogParser {
         validRet(parseData, logData);
 
         fieldInterceptors.forEach(fieldInterceptor -> 
fieldInterceptor.postProcess(parseData));
+        addCommonData(parseData);
+
         return parseData;
     }
 
+    private void addCommonData(Map<String, Object> data) {
+        data.putIfAbsent(parse_time, System.currentTimeMillis());
+        data.putIfAbsent(parse_ip, localIp);
+    }
+
     @Override
     public Map<String, Object> parseSimple(String logData, Long collectStamp) {
         Map<String, Object> parseData = doParseSimple(logData, collectStamp);
-        fieldInterceptors.stream().forEach(fieldInterceptor -> 
fieldInterceptor.postProcess(parseData));
+        fieldInterceptors.forEach(fieldInterceptor -> 
fieldInterceptor.postProcess(parseData));
         return parseData;
     }
 
@@ -133,7 +143,7 @@ public abstract class AbstractLogParser implements 
LogParser {
 
     void wrapMap(Map<String, Object> ret, LogParserData parserData, String ip,
                  Long lineNum, String fileName, Long collectStamp) {
-        ret.putIfAbsent(esKeyMap_timestamp, null == collectStamp ? 
getTimestampFromString("", collectStamp) : collectStamp);
+        ret.putIfAbsent(esKeyMap_timestamp, null == collectStamp ? 
System.currentTimeMillis() : collectStamp);
         ret.put(esKeyMap_topic, parserData.getTopicName());
         ret.put(esKeyMap_tag, parserData.getMqTag());
         ret.put(esKeyMap_logstoreName, parserData.getLogStoreName());
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/CustomLogParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/CustomLogParser.java
index a74b51ce..6352e404 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/CustomLogParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/CustomLogParser.java
@@ -21,10 +21,10 @@ package org.apache.ozhera.log.parse;
 import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.util.StrUtil;
 import com.google.gson.Gson;
-import org.apache.ozhera.log.utils.IndexUtils;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.utils.IndexUtils;
 
 import java.util.*;
 import java.util.stream.Collectors;
@@ -89,16 +89,10 @@ public class CustomLogParser extends AbstractLogParser {
             if 
(ret.values().stream().map(String::valueOf).anyMatch(StringUtils::isEmpty)) {
                 ret.put(ES_KEY_MAP_LOG_SOURCE, originLog);
             }
-            /**
-             * Pocket does not include esKeyMap_timestamp, esKeyMap_topic, 
esKeyMap_tag, esKeyMap_logstoreName
-             */
-            if (ret.containsKey(esKeyMap_timestamp)) {
-                Long time = 
getTimestampFromString(ret.get(esKeyMap_timestamp).toString(), collectStamp);
-                ret.put(esKeyMap_timestamp, time);
-            }
         } catch (Exception e) {
             ret.put(ES_KEY_MAP_LOG_SOURCE, originData);
         }
+        validTimestamp(ret, collectStamp);
         return ret;
     }
 
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/JsonLogParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/JsonLogParser.java
index 9e744c11..74b0943f 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/JsonLogParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/JsonLogParser.java
@@ -61,25 +61,31 @@ public class JsonLogParser extends AbstractLogParser {
         try {
 //            Map<String, Object> rawLogMap = GSON.fromJson(logData, 
token.getType());
             Map<String, Object> rawLogMap = flattenJson(logData);
-            // The complete set of index column names
-            List<String> keyNameList = 
IndexUtils.getKeyListSlice(parserData.getKeyList());
-            // An index subset that marks whether the index column name at the 
corresponding location is referenced in the current tail
-            int[] valueIndexList = 
Arrays.stream(parserData.getValueList().split(",")).mapToInt(Integer::parseInt).toArray();
-            for (int i = 0; i < keyNameList.size(); i++) {
-                // Skip unreferenced keys
-                if (i >= valueIndexList.length || valueIndexList[i] == -1) {
-                    continue;
+            if (null != valueMap && !valueMap.isEmpty()) {
+                for (String key : valueMap.keySet()) {
+                    ret.put(key, rawLogMap.getOrDefault(key, ""));
+                }
+            } else {
+                // The complete set of index column names
+                List<String> keyNameList = 
IndexUtils.getKeyListSlice(parserData.getKeyList());
+                // An index subset that marks whether the index column name at 
the corresponding location is referenced in the current tail
+                int[] valueIndexList = 
Arrays.stream(parserData.getValueList().split(",")).mapToInt(Integer::parseInt).toArray();
+                for (int i = 0; i < keyNameList.size(); i++) {
+                    // Skip unreferenced keys
+                    if (i >= valueIndexList.length || valueIndexList[i] == -1) 
{
+                        continue;
+                    }
+                    String currentKey = keyNameList.get(i);
+                    String value = rawLogMap.getOrDefault(currentKey, 
"").toString();
+                    ret.put(currentKey, StringUtils.isNotEmpty(value) ? 
value.trim() : value);
                 }
-                String currentKey = keyNameList.get(i);
-                String value = rawLogMap.getOrDefault(currentKey, 
"").toString();
-                ret.put(currentKey, StringUtils.isNotEmpty(value) ? 
value.trim() : value);
             }
-            //timestamp
-            validTimestamp(ret, collectStamp);
         } catch (Exception e) {
             // If an exception occurs, the original log is kept to the 
logsource field
             ret.put(ES_KEY_MAP_LOG_SOURCE, logData);
         }
+        //timestamp
+        validTimestamp(ret, collectStamp);
         return ret;
     }
 
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParser.java
index 946bbdae..3db5b237 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParser.java
@@ -21,10 +21,12 @@ package org.apache.ozhera.log.parse;
 import cn.hutool.core.date.DateUtil;
 import org.apache.commons.lang3.time.DateParser;
 import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.ozhera.log.utils.DateUtils;
 
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * @Author: wtt
@@ -49,6 +51,8 @@ public interface LogParser {
     String specialTimePrefix = "20";
 
     String esKeyMap_timestamp = "timestamp";
+    String parse_time = "parse_time";
+    String parse_ip = "parse_ip";
     String esKeyMap_Date = "Date";
     String esKeyMap_topic = "mqtopic";
     String esKeyMap_tag = "mqtag";
@@ -79,11 +83,11 @@ public interface LogParser {
     default Long getTimestampFromString(String logTime, Long collectStamp) {
         Long timeStamp;
         try {
-            timeStamp = DateUtil.parse(logTime).getTime();
+            timeStamp = 
Objects.requireNonNull(DateUtils.parse(logTime)).getTime();
         } catch (Exception e) {
             try {
                 logTime = String.format("%s%s", 
String.valueOf(DateUtil.thisYear()).substring(0, 2), logTime);
-                timeStamp = DateUtil.parse(logTime).getTime();
+                timeStamp = DateUtils.parse(logTime).getTime();
             } catch (Exception ex) {
                 timeStamp = collectStamp;
             }
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
index 6e4ea33f..0c0a634e 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
@@ -34,10 +34,6 @@ public class LogParserFactory {
     private LogParserFactory() {
     }
 
-    public static LogParser getLogParser(Integer parseType, String keyList, 
String valueList, String parseScript, String keyOrderList) {
-        return LogParserFactory.getLogParser(parseType, keyList, valueList, 
parseScript, "", "", "", "", keyOrderList);
-    }
-
     public static LogParser getLogParser(Integer parseType, String keyList, 
String valueList, String parseScript,
                                          String topicName, String tailName, 
String mqTag, String logStoreName, String keyOrderList) {
         LogParserData logParserData = LogParserData.builder().keyList(keyList)
@@ -68,6 +64,10 @@ public class LogParserFactory {
         }
     }
 
+    public static LogParser getLogParser(Integer parseType, String keyList, 
String valueList, String parseScript, String keyOrderList) {
+        return LogParserFactory.getLogParser(parseType, keyList, valueList, 
parseScript, "", "", "", "", keyOrderList);
+    }
+
     @Getter
     public enum LogParserEnum {
 
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
index 0baa3a9f..0a630f96 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
@@ -20,12 +20,9 @@ package org.apache.ozhera.log.parse;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.ozhera.log.utils.IndexUtils;
 
 import java.util.*;
-import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * @author wtt
@@ -128,8 +125,7 @@ public class SeparatorLogParser extends AbstractLogParser {
                         value = "";
                     }
                     if (kTsplit[0].equals(esKeyMap_timestamp) || 
kTsplit[1].equalsIgnoreCase(esKeyMap_Date)) {
-                        Long time = getTimestampFromString(value, 
collectStamp);
-                        ret.put(esKeyMap_timestamp, time);
+                        ret.put(esKeyMap_timestamp, value);
                     } else {
                         ret.put(kTsplit[0], StringUtils.isNotEmpty(value) ? 
value.trim() : value);
                     }
@@ -146,6 +142,7 @@ public class SeparatorLogParser extends AbstractLogParser {
         } catch (Exception e) {
             ret.put(ES_KEY_MAP_LOG_SOURCE, logData);
         }
+        validTimestamp(ret, collectStamp);
         return ret;
     }
 
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/DateUtils.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/DateUtils.java
index 8dd1019a..d4dcfe27 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/DateUtils.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/DateUtils.java
@@ -18,6 +18,16 @@
  */
 package org.apache.ozhera.log.utils;
 
+import cn.hutool.core.date.CalendarUtil;
+import cn.hutool.core.date.DateException;
+import cn.hutool.core.date.DatePattern;
+import cn.hutool.core.date.DateTime;
+import cn.hutool.core.date.format.DateParser;
+import cn.hutool.core.lang.PatternPool;
+import cn.hutool.core.util.CharUtil;
+import cn.hutool.core.util.NumberUtil;
+import cn.hutool.core.util.ReUtil;
+import cn.hutool.core.util.StrUtil;
 import org.apache.commons.lang3.StringUtils;
 
 import java.text.ParseException;
@@ -25,18 +35,40 @@ import java.text.SimpleDateFormat;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
-import java.util.Calendar;
+import java.time.format.DateTimeParseException;
 import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+
+import static cn.hutool.core.date.DateUtil.*;
 
 /**
  * @author wtt
  * @version 1.0
  * @description
- * @date 2021/9/29 10:30
+ * @date 2024/12/29 12:40
  */
-public class DateUtils {
+public class DateUtils extends CalendarUtil {
 
     public static long dayms = 86400000L;
+    private final static String[] wtb = {
+            "sun", "mon", "tue", "wed", "thu", "fri", "sat",
+            "jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", 
"oct", "nov", "dec",
+            "gmt", "ut", "utc", "est", "edt", "cst", "cdt", "mst", "mdt", 
"pst", "pdt"
+    };
+
+    public static String getDaysAgo(int n) {
+        return new 
SimpleDateFormat("yyyy-MM-dd").format(System.currentTimeMillis() - n * dayms);
+    }
+
+    public static String timeStamp2Date(String millSeconds, String format) {
+        if (StringUtils.isBlank(millSeconds)) {
+            return "";
+        }
+        if (format == null || format.isEmpty()) format = "yyyy-MM-dd HH:mm:ss";
+        SimpleDateFormat sdf = new SimpleDateFormat(format);
+        return sdf.format(new Date(Long.valueOf(millSeconds)));
+    }
 
     public static String getTime() {
         DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyy.MM.dd");
@@ -53,77 +85,115 @@ public class DateUtils {
         return tomorrow.format(formatter);
     }
 
-    /**
-     * Get the first millisecond today
-     *
-     * @return
-     */
-    public static long getTodayFirstMillisecond() {
-        Date date = new Date();
-        Calendar calendar = Calendar.getInstance();
-        calendar.setTime(date);
-        date = calendar.getTime();
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+    public static Long getThisDayFirstMillisecond(String thisDay) {
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+        Date date = null;
         try {
-            date = sdf.parse(sdf.format(date));
+            date = format.parse(thisDay);
         } catch (ParseException e) {
             e.printStackTrace();
         }
         return date.getTime();
     }
 
-    /**
-     * Get yesterday's first millisecond
-     *
-     * @return]
-     */
-    public static long getYesterdayFirstMillisecond() {
-        return getTodayFirstMillisecond() - dayms;
-    }
+    private static String normalize(CharSequence dateStr) {
+        if (StrUtil.isBlank(dateStr)) {
+            return StrUtil.str(dateStr);
+        }
 
-    /**
-     * Get the first millisecond of the day before yesterday
-     *
-     * @return]
-     */
-    public static long getBeforeYesterdayFirstMillisecond() {
-        return getTodayFirstMillisecond() - dayms * 2;
-    }
+        final List<String> dateAndTime = StrUtil.splitTrim(dateStr, ' ');
+        final int size = dateAndTime.size();
+        if (size < 1 || size > 2) {
+            return StrUtil.str(dateStr);
+        }
 
-    /**
-     * Gets the first millisecond of the day
-     *
-     * @param thisDay
-     * @return
-     */
-    public static Long getThisDayFirstMillisecond(String thisDay) {
-        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
-        Date date = null;
-        try {
-            date = format.parse(thisDay);
-        } catch (ParseException e) {
-            e.printStackTrace();
+        final StringBuilder builder = StrUtil.builder();
+
+        String datePart = dateAndTime.get(0).replaceAll("[/.年月]", "-");
+        datePart = StrUtil.removeSuffix(datePart, "日");
+        builder.append(datePart);
+
+        if (size == 2) {
+            builder.append(' ');
+            String timePart = dateAndTime.get(1).replaceAll("[时分秒]", ":");
+            timePart = StrUtil.removeSuffix(timePart, ":");
+            timePart = timePart.replace(',', '.');
+            builder.append(timePart);
         }
-        return date.getTime();
+
+        return builder.toString();
     }
 
-    /**
-     * Get n days ago date (yyyy-MM-dd)
-     *
-     * @param n
-     * @return
-     */
-    public static String getDaysAgo(int n) {
-        return new 
SimpleDateFormat("yyyy-MM-dd").format(System.currentTimeMillis() - n * dayms);
+    public static DateTime parse(CharSequence dateStr, DateParser parser) {
+        return new DateTime(dateStr, parser);
     }
 
-    public static String timeStamp2Date(String millSeconds, String format) {
-        if (StringUtils.isBlank(millSeconds)) {
-            return "";
+    public static DateTime parse(CharSequence dateCharSequence) {
+        if (StrUtil.isBlank(dateCharSequence)) {
+            return null;
         }
-        if (format == null || format.isEmpty()) format = "yyyy-MM-dd HH:mm:ss";
-        SimpleDateFormat sdf = new SimpleDateFormat(format);
-        return sdf.format(new Date(Long.valueOf(millSeconds)));
+        String dateStr = dateCharSequence.toString();
+        dateStr = StrUtil.removeAll(dateStr.trim(), '日', '秒');
+        int length = dateStr.length();
+        if (NumberUtil.isNumber(dateStr)) {
+            if (length == DatePattern.PURE_DATETIME_PATTERN.length()) {
+                return parse(dateStr, DatePattern.PURE_DATETIME_FORMAT);
+            } else if (length == 
DatePattern.PURE_DATETIME_MS_PATTERN.length()) {
+                return parse(dateStr, DatePattern.PURE_DATETIME_MS_FORMAT);
+            } else if (length == DatePattern.PURE_DATE_PATTERN.length()) {
+                return parse(dateStr, DatePattern.PURE_DATE_FORMAT);
+            } else if (length == DatePattern.PURE_TIME_PATTERN.length()) {
+                return parse(dateStr, DatePattern.PURE_TIME_FORMAT);
+            }
+        } else if (ReUtil.isMatch(PatternPool.TIME, dateStr)) {
+            return parseTimeToday(dateStr);
+        } else if (StrUtil.containsAnyIgnoreCase(dateStr, wtb)) {
+            return parseCST(dateStr);
+        } else if (StrUtil.contains(dateStr, 'T')) {
+            // UTC时间
+            return parseUTC(dateStr);
+        }
+
+        dateStr = normalize(dateStr);
+//        if (ReUtil.isMatch(DatePattern.REGEX_NORM, dateStr)) {
+        if (parseDateTime(dateStr).isPresent()) {
+            final int colonCount = StrUtil.count(dateStr, CharUtil.COLON);
+            switch (colonCount) {
+                case 0:
+                    return parse(dateStr, DatePattern.NORM_DATE_FORMAT);
+                case 1:
+                    return parse(dateStr, 
DatePattern.NORM_DATETIME_MINUTE_FORMAT);
+                case 2:
+                    final int indexOfDot = StrUtil.indexOf(dateStr, 
CharUtil.DOT);
+                    if (indexOfDot > 0) {
+                        final int length1 = dateStr.length();
+                        if (length1 - indexOfDot > 4) {
+                            dateStr = StrUtil.subPre(dateStr, indexOfDot + 4);
+                        }
+                        return parse(dateStr, 
DatePattern.NORM_DATETIME_MS_FORMAT);
+                    }
+                    return parse(dateStr, DatePattern.NORM_DATETIME_FORMAT);
+            }
+        }
+        throw new DateException("No format fit for date String [{}] !", 
dateStr);
     }
 
+    private static final List<DateTimeFormatter> FORMATTERS = List.of(
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"),
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"),
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SS"),
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"),
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"),
+            DateTimeFormatter.ofPattern("yyyy-MM-dd")
+    );
+
+    public static Optional<LocalDateTime> parseDateTime(String dateStr) {
+        for (DateTimeFormatter formatter : FORMATTERS) {
+            try {
+                return Optional.of(LocalDateTime.parse(dateStr, formatter));
+            } catch (DateTimeParseException ignored) {
+            }
+        }
+        return Optional.empty();
+    }
 }
diff --git 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
index c6ab6ba4..e2867cd3 100644
--- 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
+++ 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ozhera.log.common;
 
+import cn.hutool.core.date.DateUtil;
 import com.google.common.base.Stopwatch;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.time.DateParser;
@@ -47,7 +48,7 @@ public class LogParserTest {
     @Test
     public void test1() {
         Stopwatch stopwatch = Stopwatch.createStarted();
-        String keyList = 
"timestamp:date,podName:keyword,level:keyword,threadName:text,className:text,line:keyword,methodName:keyword,traceId:keyword,message:text,ip:ip,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
+        String keyList = 
"timestamp:date,level:keyword,traceId:keyword,threadName:text,className:text,line:keyword,methodName:keyword,message:keyword,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
         String valueList = 
"0,-1,16,-1,-1,-1,-1,-1,-1,1,2,3,4,5,6,7,8,9,10,11,13,12,17,14,15";
         String parseScript = "|";
         String logData = "";
@@ -66,14 +67,14 @@ public class LogParserTest {
     @Test
     public void test2() {
         Stopwatch stopwatch = Stopwatch.createStarted();
-        String keyList = 
"timestamp:date,mqtopic:keyword,mqtag:keyword,logstore:keyword,logsource:keyword,message:text,tail:keyword,logip:keyword,linenumber:long,filename:keyword,time:keyword,log_level:keyword,thread_name:keyword,log_name:keyword,trace_id:keyword,user_login_name:keyword,marker:keyword,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword";
-        String keyOrderList = 
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,time:1,log_level:1,thread_name:1,log_name:1,trace_id:1,user_login_name:1,marker:1,tailId:3,spaceId:3,storeId:3,deploySpace:3";
-        String valueList = "-1,7,0,1,2,3,4,5,6";
-        String parseScript = 
"(?s)(?s)(?s)(?s)(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\s\\+\\d{4})\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s([\\s\\S]*)";
-        String logData = "";
+        String keyList = 
"timestamp:date,level:keyword,traceId:keyword,threadName:text,className:text,line:keyword,methodName:keyword,message:keyword,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
+        String keyOrderList = 
"timestamp:1,level:1,traceId:1,threadName:1,className:1,line:1,methodName:1,message:1,logstore:3,logsource:3,mqtopic:3,mqtag:3,logip:3,tail:3,linenumber:3";
+        String valueList = "0,1,2,3,4,5,6,7";
+        String parseScript = "|";
+        String logData = 
"{\"lineNumber\":142713,\"fileName\":\"/home/work/log/nr-trade-pay-992063-585c564f9b-2b8jq/pay/server.log\",\"pointer\":46974955,\"msgBody\":\"2025-03-04
 
15:32:12,412|DEBUG|ee839f4c4c5a9a8788fa54929f6ef20f|DubboServerHandler-10.159.32.249:20880-thread-797|c.x.n.p.i.r.d.m.L.selectLoanCreditList|143|==>
 Parameters: 
5255102265996170(String)\",\"extMap\":{\"ct\":\"1741073532486\",\"ip\":\"10.159.32.249\",\"tag\":\"tags_60006_48_96833\",\"type\":\"1\"}}";
         String ip = "127.0.0.1";
         Long currentStamp = Instant.now().toEpochMilli();
-        Integer parserType = 
LogParserFactory.LogParserEnum.REGEX_PARSE.getCode();
+        Integer parserType = 
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
         LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, 
keyOrderList);
         Map<String, Object> parse = customParse.parse(logData, ip, 1l, 
currentStamp, "");
         System.out.println(parse);
@@ -97,23 +98,31 @@ public class LogParserTest {
         Map<String, Object> parse = customParse.parse(logData, ip, 1l, 
currentStamp, "");
         System.out.println(parse);
 
-        System.out.println(customParse.getTimestampFromString("2023-08-25 
10:46:09.239", currentStamp));
+        System.out.println(customParse.getTimestampFromString("2025-02-26 
19:25:00,35", currentStamp));
         stopwatch.stop();
         log.info("cost time:{}", stopwatch.elapsed().toMillis());
     }
 
     @Test
     public void parseSimpleTest() {
+        Stopwatch stopwatch = Stopwatch.createStarted();
         Integer parserType = 
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
-        String keyList = 
"timestamp:date,mqtopic:keyword,mqtag:keyword,logstore:keyword,logsource:keyword,message:text,tail:keyword,logip:keyword,linenumber:long,filename:keyword,datetime:date,project_name:keyword,client_ip:keyword,level:keyword,log_id:keyword,url:keyword,up_ip:keyword,logger_line:keyword,thread:keyword,biz_id:keyword,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword";
-        String leyOrderList = 
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,datetime:1,project_name:1,client_ip:1,level:1,log_id:1,url:1,up_ip:1,logger_line:1,thread:1,biz_id:1,tailId:3,spaceId:3,storeId:3,deploySpace:3";
-        String valueList = "-1,10,0,1,2,3,4,5,6,7,8,9";
-        String parseScript = "]|[";
-        String message = "2025-02-12T20:11:02.501+0800]";
+        String keyList = 
"timestamp:date,level:keyword,threadName:text,traceId:keyword,className:text,line:keyword,message:keyword,methodName:keyword,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
+        String leyOrderList = 
"timestamp:1,level:1,threadName:1,traceId:1,className:1,line:1,message:1,methodName:1,logstore:3,logsource:3,mqtopic:3,mqtag:3,logip:3,tail:3,linenumber:3";
+        String valueList = "0,1,2,3,4,5,6,-1";
+        String parseScript = "|";
+        String message = "2025-03-03 10:03:34,128|INFO 
|DubboServerHandler-10.157.62.39:20880-thread-203|afb5a702e39130c59f89198d83026ace|c.x.n.p.a.pool.CarActivitySearcher|?|car
 activity search result:[]";
         Long collectStamp = Instant.now().toEpochMilli();
         LogParser logParser = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, leyOrderList);
-        Map<String, Object> parseMsg = logParser.parseSimple(message, 
collectStamp);
-        Assert.assertNotNull(parseMsg);
+        for (int i = 0; i < 10000; i++) {
+            Map<String, Object> parseMsg = logParser.parse(message, "", 1l, 
collectStamp, "");
+
+//            System.out.println(logParser.getTimestampFromString("2025-02-26 
19:25:00,35", System.currentTimeMillis()));
+            System.out.println(DateUtil.parse("2025-02-26 
19:25:00,35").getTime());
+            Assert.assertNotNull(parseMsg);
+        }
+        stopwatch.stop();
+        log.info("cost time:{}", stopwatch.elapsed().toMillis());
     }
 
     @Test
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
index 601c8e4c..8c5a07cf 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
@@ -22,8 +22,15 @@ import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.lang.Pair;
 import com.alibaba.fastjson.JSON;
 import com.google.common.collect.Lists;
-import org.apache.ozhera.app.api.model.HeraAppEnvData;
 import com.xiaomi.mone.es.EsClient;
+import com.xiaomi.youpin.docean.Ioc;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.common.StringUtils;
+import com.xiaomi.youpin.docean.plugin.config.anno.Value;
+import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
+import com.xiaomi.youpin.docean.plugin.es.EsService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.ozhera.app.api.model.HeraAppEnvData;
 import org.apache.ozhera.log.api.enums.LogStorageTypeEnum;
 import org.apache.ozhera.log.api.model.dto.TraceLogDTO;
 import org.apache.ozhera.log.api.model.vo.TraceLogQuery;
@@ -58,20 +65,15 @@ import 
org.apache.ozhera.log.manager.service.HeraAppEnvService;
 import 
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionService;
 import 
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionServiceFactory;
 import org.apache.ozhera.log.parse.LogParser;
-import com.xiaomi.youpin.docean.Ioc;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.common.StringUtils;
-import com.xiaomi.youpin.docean.plugin.config.anno.Value;
-import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
-import com.xiaomi.youpin.docean.plugin.es.EsService;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.poi.hssf.usermodel.HSSFWorkbook;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.RangeQueryBuilder;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
@@ -188,7 +190,7 @@ public class EsDataServiceImpl implements EsDataService, 
LogDataService, EsDataB
             if (LogStorageTypeEnum.DORIS == storageTypeEnum) {
                 return dorisDataQuery(logQuery, milogLogstoreDO, dto);
             } else {
-                return elasticDataQuery(milogLogstoreDO, logQuery, dto, 
keyList, stopWatch);
+                return elasticDataQuery(milogLogstoreDO, logQuery, dto, 
keyList, stopWatch, operator);
             }
         } catch (Throwable e) {
             log.error("Log query error, log search 
error,logQuery:[{}],user:[{}]", logQuery, MoneUserContext.getCurrentUser(), e);
@@ -215,7 +217,7 @@ public class EsDataServiceImpl implements EsDataService, 
LogDataService, EsDataB
     }
 
     private Result<LogDTO> elasticDataQuery(MilogLogStoreDO milogLogstoreDO, 
LogQuery logQuery,
-                                            LogDTO dto, List<String> keyList, 
StopWatch stopWatch) throws IOException {
+                                            LogDTO dto, List<String> keyList, 
StopWatch stopWatch, String operator) throws IOException {
         EsService esService = 
esCluster.getEsService(milogLogstoreDO.getEsClusterId());
 
         String esIndexName = 
commonExtensionService.getSearchIndex(logQuery.getStoreId(), 
milogLogstoreDO.getEsIndex());
@@ -228,6 +230,12 @@ public class EsDataServiceImpl implements EsDataService, 
LogDataService, EsDataB
         SearchSourceBuilder builder = assembleSearchSourceBuilder(logQuery, 
keyList, boolQueryBuilder);
 
         SearchRequest searchRequest = new SearchRequest(new 
String[]{esIndexName}, builder);
+
+        boolean isTimestampMissing = isTimestampMissingInQuery(searchRequest);
+        if (isTimestampMissing) {
+            log.warn("searchRequest is missing timestamp field, add timestamp 
field,logQuery:{}, operator:{}", GSON.toJson(logQuery), operator);
+        }
+
         // query
         stopWatch.start("search-query");
         SearchResponse searchResponse = esService.search(searchRequest);
@@ -235,7 +243,7 @@ public class EsDataServiceImpl implements EsDataService, 
LogDataService, EsDataB
 
         dto.setSourceBuilder(builder);
         if (stopWatch.getLastTaskTimeMillis() > 7 * 1000) {
-            log.warn("##LONG-COST-QUERY##{} cost:{} ms, msg:{}", 
stopWatch.getLastTaskName(), stopWatch.getLastTaskTimeMillis());
+            log.warn("##LONG-COST-QUERY##{} cost:{} ms, msg:{}", 
stopWatch.getLastTaskName(), stopWatch.getLastTaskTimeMillis(), 
GSON.toJson(logQuery));
         }
         //Result transformation
         stopWatch.start("data-assemble");
@@ -243,11 +251,82 @@ public class EsDataServiceImpl implements EsDataService, 
LogDataService, EsDataB
         stopWatch.stop();
 
         if (stopWatch.getTotalTimeMillis() > 15 * 1000) {
-            log.warn("##LONG-COST-QUERY##{} cost:{} ms, msg:{}", "gt15s", 
stopWatch.getTotalTimeMillis());
+            log.warn("##LONG-COST-QUERY##{} cost:{} ms, msg:{}", "gt15s", 
stopWatch.getTotalTimeMillis(), GSON.toJson(logQuery));
         }
         return Result.success(dto);
     }
 
+    public static boolean isTimestampMissingInQuery(SearchRequest 
searchRequest) {
+        if (searchRequest == null) {
+            return true;
+        }
+
+        SearchSourceBuilder sourceBuilder = searchRequest.source();
+        if (sourceBuilder == null || sourceBuilder.query() == null) {
+            return true;
+        }
+
+        // Check whether the query condition contains the timestamp field
+        if (sourceBuilder.query() instanceof BoolQueryBuilder) {
+            BoolQueryBuilder boolQuery = (BoolQueryBuilder) 
sourceBuilder.query();
+            return !containsTimestampField(boolQuery);
+        }
+
+        return true;
+    }
+
+    /**
+     * Check if the timestamp field is included in the BoolQueryBuilder
+     *
+     * @param boolQuery BoolQueryBuilder to be checked
+     * @return Return true if the timestamp field is included; otherwise 
return false
+     */
+    public static boolean containsTimestampField(BoolQueryBuilder boolQuery) {
+        // check must conditions
+        for (QueryBuilder query : boolQuery.must()) {
+            if (isTimestampFieldInQuery(query)) {
+                return true;
+            }
+        }
+
+        // check filter conditions
+        for (QueryBuilder query : boolQuery.filter()) {
+            if (isTimestampFieldInQuery(query)) {
+                return true;
+            }
+        }
+
+        // check the should conditions
+        for (QueryBuilder query : boolQuery.should()) {
+            if (isTimestampFieldInQuery(query)) {
+                return true;
+            }
+        }
+
+        // check the must_not condition
+        for (QueryBuilder query : boolQuery.mustNot()) {
+            if (isTimestampFieldInQuery(query)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Check whether a single QueryBuilder contains timestamp fields
+     *
+     * @param query QueryBuilder to be checked
+     * @return Return true if the timestamp field is included; otherwise 
return false
+     */
+    private static boolean isTimestampFieldInQuery(QueryBuilder query) {
+        if (query instanceof RangeQueryBuilder) {
+            RangeQueryBuilder rangeQuery = (RangeQueryBuilder) query;
+            return "timestamp".equals(rangeQuery.fieldName());
+        }
+        return false;
+    }
+
     private void dorisDataToLog(List<Map<String, Object>> tableColumnDTOS, 
LogDTO logDTO) {
         List<LogDataDTO> logDataList = Lists.newArrayList();
         for (Map<String, Object> columnMap : tableColumnDTOS) {
@@ -471,6 +550,10 @@ public class EsDataServiceImpl implements EsDataService, 
LogDataService, EsDataB
         if (!StringUtils.isEmpty(interval)) {
             BoolQueryBuilder queryBuilder = 
searchLog.getQueryBuilder(logQuery, getKeyColonPrefix(logStore.getKeyList()));
             String histogramField = 
commonExtensionService.queryDateHistogramField(logQuery.getStoreId());
+            boolean isTimestampMissing = containsTimestampField(queryBuilder);
+            if (!isTimestampMissing) {
+                log.warn("searchRequest is missing timestamp field, add 
timestamp field,logQuery:{}", GSON.toJson(logQuery));
+            }
             EsClient.EsRet esRet = esService.dateHistogram(esIndex, 
histogramField, interval, logQuery.getStartTime(), logQuery.getEndTime(), 
queryBuilder);
             result.setCounts(esRet.getCounts());
             result.setTimestamps(esRet.getTimestamps());
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
index f948bcbe..6b657be1 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
@@ -155,12 +155,16 @@ public class MilogConfigListener {
     }
 
     private void stopOldJobsForRemovedTailIds(SinkConfig sinkConfig) {
-        List<Long> newIds = 
sinkConfig.getLogtailConfigs().stream().map(LogtailConfig::getLogtailId).collect(Collectors.toList());
+        List<Long> newIds = Lists.newArrayList();
+        if (CollectionUtils.isNotEmpty(sinkConfig.getLogtailConfigs())) {
+            newIds = 
sinkConfig.getLogtailConfigs().stream().map(LogtailConfig::getLogtailId).collect(Collectors.toList());
+        }
         List<Long> oldIds = Lists.newArrayList();
         if (oldSinkConfigMap.containsKey(sinkConfig.getLogstoreId())) {
             oldIds = 
oldSinkConfigMap.get(sinkConfig.getLogstoreId()).getLogtailConfigs().stream().map(LogtailConfig::getLogtailId).collect(Collectors.toList());
         }
-        List<Long> collect = oldIds.stream().filter(tailId -> 
!newIds.contains(tailId)).collect(Collectors.toList());
+        List<Long> finalNewIds = newIds;
+        List<Long> collect = oldIds.stream().filter(tailId -> 
!finalNewIds.contains(tailId)).collect(Collectors.toList());
         if (CollectionUtils.isNotEmpty(collect)) {
             log.info("newIds:{},oldIds:{},collect:{}", gson.toJson(newIds), 
gson.toJson(oldIds), gson.toJson(collect));
             for (Long tailId : collect) {
@@ -211,6 +215,9 @@ public class MilogConfigListener {
         if (null != sinkConfig) {
             log.info("[listen tail] The task to stop:{}", 
gson.toJson(sinkConfig.getLogtailConfigs()));
             List<LogtailConfig> logTailConfigs = 
sinkConfig.getLogtailConfigs();
+            if (CollectionUtils.isEmpty(logTailConfigs)) {
+                return;
+            }
             for (LogtailConfig logTailConfig : logTailConfigs) {
                 stopOldJobForTail(logTailConfig, sinkConfig);
             }
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
index 3270852e..41665703 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
@@ -96,9 +96,11 @@ public class JobManager {
 
     public void stopJob(LogtailConfig logtailConfig) {
         try {
-            List<Long> jobKeys = 
jobs.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
-            if(CollectionUtils.isNotEmpty(jobKeys)){
-                log.info("【stop job】,all jobs:{}", jobKeys);
+            List<Long> jobKeys = jobs.keySet().stream()
+                    .filter(key -> key.equals(logtailConfig.getLogtailId()))
+                    .collect(Collectors.toList());
+            if (CollectionUtils.isNotEmpty(jobKeys)) {
+                log.info("stop job,the jobs:{}", jobKeys);
                 sinkJobsShutDown(logtailConfig);
             }
         } catch (Exception e) {
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
index d63f3896..c312e965 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
@@ -21,6 +21,11 @@ package org.apache.ozhera.log.stream.job;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.RateLimiter;
+import com.xiaomi.youpin.docean.Ioc;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.log.api.model.msg.LineMessage;
 import org.apache.ozhera.log.common.Config;
 import org.apache.ozhera.log.parse.LogParser;
@@ -31,10 +36,6 @@ import 
org.apache.ozhera.log.stream.job.extension.MessageLifecycleManager;
 import org.apache.ozhera.log.stream.job.extension.MessageSender;
 import org.apache.ozhera.log.stream.job.extension.MqMessagePostProcessing;
 import org.apache.ozhera.log.stream.sink.SinkChain;
-import com.xiaomi.youpin.docean.Ioc;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
 
 import java.time.Instant;
 import java.util.Map;
@@ -143,7 +144,9 @@ public class LogDataTransfer {
         dataMap.putIfAbsent(LOG_STREAM_SPACE_ID, 
sinkJobConfig.getLogSpaceId());
         dataMap.putIfAbsent(LOG_STREAM_STORE_ID, 
sinkJobConfig.getLogStoreId());
         dataMap.putIfAbsent(LOG_STREAM_TAIL_ID, sinkJobConfig.getLogTailId());
-        dataMap.putIfAbsent(DEPLOY_SPACE, sinkJobConfig.getDeploySpace());
+        if (StringUtils.isNotBlank(sinkJobConfig.getDeploySpace())) {
+            dataMap.putIfAbsent(DEPLOY_SPACE, sinkJobConfig.getDeploySpace());
+        }
     }
 
     private void sendMessage(Map<String, Object> dataMap) throws Exception {
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
index 08e3d0c8..7c878beb 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
@@ -63,7 +63,7 @@ public class EsPlugin {
 
     private static int DEFAULT_PROCESSOR_COUNT = 1;
 
-    private static ReentrantLock esLock = new ReentrantLock();
+    private static final ReentrantLock esLock = new ReentrantLock();
 
     private static Gson gson = new Gson();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to