This is an automated email from the ASF dual-hosted git repository.

zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 4404f158 feat: optimize log parsing and statistics functions (#548)
4404f158 is described below

commit 4404f1587bf5e26f384989d368d4fb885fd616a4
Author: wtt <[email protected]>
AuthorDate: Tue Feb 18 16:42:17 2025 +0800

    feat: optimize log parsing and statistics functions (#548)
    
    * feat(log): optimize log parsing and statistics functions
    
    * refactor: optimize log parsing and configuration services
---
 .../org/apache/ozhera/log/model/SinkConfig.java    |  4 +
 .../apache/ozhera/log/parse/AbstractLogParser.java |  5 ++
 .../org/apache/ozhera/log/parse/LogParserData.java |  4 +
 .../apache/ozhera/log/parse/LogParserFactory.java  |  7 +-
 .../ozhera/log/parse/SeparatorLogParser.java       | 82 +++++++++++++-------
 .../org/apache/ozhera/log/utils/IndexUtils.java    |  2 +-
 .../apache/ozhera/log/common/IndexUtilsTest.java   |  4 +-
 .../ozhera/log/common/LogParserFactoryTest.java    |  2 +-
 .../apache/ozhera/log/common/LogParserTest.java    | 31 ++++++--
 .../ozhera/log/manager/job/TailLogCountJob.java    | 12 ++-
 .../service/impl/AgentConfigServiceImpl.java       |  9 ++-
 .../manager/service/impl/LogCountServiceImpl.java  | 88 ++++++++++++++++------
 .../manager/service/impl/LogTailServiceImpl.java   | 20 ++---
 .../service/impl/MilogConfigNacosServiceImpl.java  | 11 +--
 .../main/resources/mapper/MilogLogstailMapper.xml  |  2 +
 .../ozhera/log/stream/config/ConfigManager.java    | 39 +++++-----
 .../apache/ozhera/log/stream/job/JobManager.java   |  1 +
 .../ozhera/log/stream/job/SinkJobConfig.java       |  9 ++-
 .../impl/DefaultStreamCommonExtension.java         | 15 +++-
 .../job/extension/kafka/KafkaSinkJobProvider.java  |  6 +-
 .../rocketmq/RocketMqSinkJobProvider.java          |  4 +-
 .../apache/ozhera/log/stream/TestSomething.java    |  8 +-
 22 files changed, 240 insertions(+), 125 deletions(-)

diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
index d04e5031..ce991a0e 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
@@ -35,6 +35,9 @@ public class SinkConfig {
      * timestamp is required
      */
     private String keyList;
+
+    private String keyOrderList;
+
     /**
      * key:logtailId
      */
@@ -56,6 +59,7 @@ public class SinkConfig {
         this.logstoreId = sinkConfig.getLogstoreId();
         this.logstoreName = sinkConfig.getLogstoreName();
         this.keyList = sinkConfig.getKeyList();
+        this.keyOrderList = sinkConfig.getKeyOrderList();
         this.esIndex = sinkConfig.getEsIndex();
         this.esInfo = sinkConfig.getEsInfo();
     }
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
index b500173a..ebc260f6 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
@@ -143,6 +143,11 @@ public abstract class AbstractLogParser implements 
LogParser {
         }
     }
 
+    protected String[] splitList(String keyList) {
+        return StringUtils.split(keyList, ",");
+    }
+
+
     public abstract Map<String, Object> doParse(String logData, String ip, 
Long lineNum, Long collectStamp, String fileName);
 
     public abstract Map<String, Object> doParseSimple(String logData, Long 
collectStamp);
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserData.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserData.java
index 211d3cea..6557d809 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserData.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserData.java
@@ -33,6 +33,10 @@ import lombok.*;
 @Builder
 public class LogParserData {
     private String keyList;
+
+    //Use this field to parse content more easily
+    private String keyOrderList;
+
     private String valueList;
     private String parseScript;
     private String topicName;
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
index 36edef25..6e4ea33f 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
@@ -34,13 +34,14 @@ public class LogParserFactory {
     private LogParserFactory() {
     }
 
-    public static LogParser getLogParser(Integer parseType, String keyList, 
String valueList, String parseScript) {
-        return LogParserFactory.getLogParser(parseType, keyList, valueList, 
parseScript, "", "", "", "");
+    public static LogParser getLogParser(Integer parseType, String keyList, 
String valueList, String parseScript, String keyOrderList) {
+        return LogParserFactory.getLogParser(parseType, keyList, valueList, 
parseScript, "", "", "", "", keyOrderList);
     }
 
     public static LogParser getLogParser(Integer parseType, String keyList, 
String valueList, String parseScript,
-                                         String topicName, String tailName, 
String mqTag, String logStoreName) {
+                                         String topicName, String tailName, 
String mqTag, String logStoreName, String keyOrderList) {
         LogParserData logParserData = LogParserData.builder().keyList(keyList)
+                .keyOrderList(keyOrderList)
                 .valueList(valueList)
                 .parseScript(parseScript)
                 .topicName(topicName)
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
index 9c5e142d..44035e4d 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
@@ -20,13 +20,12 @@ package org.apache.ozhera.log.parse;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.utils.IndexUtils;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * @author wtt
@@ -38,10 +37,29 @@ public class SeparatorLogParser extends AbstractLogParser {
     private String[] keysAndTypes;
     private String[] values;
 
+    private Map<String, Integer> valueMap;
+
     public SeparatorLogParser(LogParserData parserData) {
         super(parserData);
-        keysAndTypes = StringUtils.split(parserData.getKeyList(), ",");
-        values = StringUtils.split(parserData.getValueList(), ",");
+
+        keysAndTypes = splitList(parserData.getKeyList());
+        values = splitList(parserData.getValueList());
+
+        this.valueMap = createValueMap(parseKeyValueList(parserData));
+    }
+
+    private List<String> parseKeyValueList(LogParserData parserData) {
+        if (StringUtils.isNotBlank(parserData.getKeyOrderList())) {
+            String keyValueList = 
IndexUtils.getKeyValueList(parserData.getKeyOrderList(), 
parserData.getValueList());
+            return Arrays.asList(splitList(keyValueList));
+        }
+        return Collections.emptyList();
+    }
+
+    private Map<String, Integer> createValueMap(List<String> valueList) {
+        return IntStream.range(0, valueList.size())
+                .boxed()
+                .collect(Collectors.toMap(valueList::get, 
Function.identity()));
     }
 
     @Override
@@ -60,7 +78,7 @@ public class SeparatorLogParser extends AbstractLogParser {
         }
         try {
 
-            int maxLength = (int) Arrays.stream(values).filter(s -> 
!s.equals("-1")).count();
+            int maxLength = (int) Arrays.stream(values).filter(s -> 
!"-1".equals(s)).count();
 
             List<String> logArray = parseLogData(logData, maxLength);
             if (0 == maxLength) {
@@ -81,7 +99,7 @@ public class SeparatorLogParser extends AbstractLogParser {
              */
             for (int i = 0; i < keysAndTypes.length; i++) {
                 String[] kTsplit = keysAndTypes[i].split(":");
-                if (kTsplit.length != 2 || i >= values.length) {
+                if (kTsplit.length != 2 || i >= values.length && (null == 
valueMap || valueMap.isEmpty())) {
                     continue;
                 }
                 if (kTsplit[0].equals(esKeyMap_topic)) {
@@ -104,27 +122,35 @@ public class SeparatorLogParser extends AbstractLogParser 
{
                     count++;
                     continue;
                 }
-                String value = null;
-                int num = -1;
-                try {
-                    num = Integer.parseInt(values[i]);
-                    if (num == -1) {
-                        valueCount++;
-                        continue;
+                if (null != valueMap && !valueMap.isEmpty()) {
+                    String key = kTsplit[0].trim();
+                    if (valueMap.containsKey(key)) {
+                        String value = logArray.get(valueMap.get(key));
+                        ret.put(key, value);
                     }
-                } catch (Exception e) {
-                    continue;
-                }
-                if (num < logArray.size() && num > -1) {
-                    value = logArray.get(num);
-                } else {
-                    value = "";
-                }
-                if (kTsplit[0].equals(esKeyMap_timestamp) || 
kTsplit[1].equalsIgnoreCase(esKeyMap_Date)) {
-                    Long time = getTimestampFromString(value, collectStamp);
-                    ret.put(esKeyMap_timestamp, time);
                 } else {
-                    ret.put(kTsplit[0], StringUtils.isNotEmpty(value) ? 
value.trim() : value);
+                    String value = null;
+                    int num = -1;
+                    try {
+                        num = Integer.parseInt(values[i]);
+                        if (num == -1) {
+                            valueCount++;
+                            continue;
+                        }
+                    } catch (Exception e) {
+                        continue;
+                    }
+                    if (num < logArray.size() && num > -1) {
+                        value = logArray.get(num);
+                    } else {
+                        value = "";
+                    }
+                    if (kTsplit[0].equals(esKeyMap_timestamp) || 
kTsplit[1].equalsIgnoreCase(esKeyMap_Date)) {
+                        Long time = getTimestampFromString(value, 
collectStamp);
+                        ret.put(esKeyMap_timestamp, time);
+                    } else {
+                        ret.put(kTsplit[0], StringUtils.isNotEmpty(value) ? 
value.trim() : value);
+                    }
                 }
             }
 
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/IndexUtils.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/IndexUtils.java
index ac60bd10..98b263d3 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/IndexUtils.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/IndexUtils.java
@@ -43,7 +43,7 @@ public class IndexUtils {
         Map<Integer, String> map = new HashMap<>();
         for (int i = 0; i < valueS.length; i++) {
             int orderValue = Integer.parseInt(valueS[i]);
-            if (orderValue >= 0) {
+            if (orderValue >= 0 && i < keyListSlice.size()) {
                 map.put(orderValue, keyListSlice.get(i));
             }
         }
diff --git 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/IndexUtilsTest.java
 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/IndexUtilsTest.java
index d20a3d67..c3b17031 100644
--- 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/IndexUtilsTest.java
+++ 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/IndexUtilsTest.java
@@ -33,8 +33,8 @@ public class IndexUtilsTest {
 
     @Test
     public void testGetKeyValueList(){
-        String keyList = 
"timestamp:1,level:1,traceId:1,threadName:1,className:1,message:1,line:1,methodName:1,logstore:3,logsource:3,mqtopic:3,mqtag:3,logip:3,tail:3";
-        String valueList = "0,2,1,6,3,4,5,-1";
+        String keyList = 
"timestamp:1,level:1,traceId:1,threadName:1,className:1,line:1,message:1,logstore:3,logsource:3,mqtopic:3,mqtag:3,logip:3,tail:3,linenumber:3";
+        String valueList = "0,1,2,3,4,5,6,7";
         String keyValueList = IndexUtils.getKeyValueList(keyList, valueList);
         log.info(keyValueList);
     }
diff --git 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserFactoryTest.java
 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserFactoryTest.java
index c7842868..843a2ebf 100644
--- 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserFactoryTest.java
+++ 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserFactoryTest.java
@@ -52,7 +52,7 @@ public class LogParserFactoryTest {
         String mqTag = "fsfsd";
         String logStoreName = "testet";
         String message = "";
-        LogParser logParser = LogParserFactory.getLogParser(parseType, 
keyList, valueList, parseScript, topicName, tailName, mqTag, logStoreName);
+        LogParser logParser = LogParserFactory.getLogParser(parseType, 
keyList, valueList, parseScript, topicName, tailName, mqTag, logStoreName,"");
 
         LineMessage lineMessage = Constant.GSON.fromJson(message, 
LineMessage.class);
         if(lineMessage != null) {
diff --git 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
index 9026b535..d7499600 100644
--- 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
+++ 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
@@ -19,11 +19,12 @@
 package org.apache.ozhera.log.common;
 
 import com.google.common.base.Stopwatch;
-import org.apache.ozhera.log.parse.LogParser;
-import org.apache.ozhera.log.parse.LogParserFactory;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.time.DateParser;
 import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.ozhera.log.parse.LogParser;
+import org.apache.ozhera.log.parse.LogParserFactory;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.time.Instant;
@@ -53,7 +54,7 @@ public class LogParserTest {
         String ip = "127.0.0.1";
         Long currentStamp = Instant.now().toEpochMilli();
         Integer parserType = 
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
-        LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName);
+        LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, "");
         Map<String, Object> parse = customParse.parse(logData, ip, 1l, 
currentStamp, "");
         System.out.println(parse);
 
@@ -65,20 +66,34 @@ public class LogParserTest {
     @Test
     public void test2() {
         Stopwatch stopwatch = Stopwatch.createStarted();
-        String keyList = 
"message:text,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
-        String valueList = "0";
+        String keyList = 
"timestamp:date,level:keyword,traceId:keyword,threadName:text,className:text,line:keyword,methodName:keyword,message:text,podName:keyword,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
+        String valueList = "0,1,2,3,4,5,-1,6,-1";
         String parseScript = "|";
-        String logData = "";
+        String logData = "2025-02-13 16:52:09,325|INFO";
         String ip = "127.0.0.1";
         Long currentStamp = Instant.now().toEpochMilli();
-        Integer parserType = 
LogParserFactory.LogParserEnum.CUSTOM_PARSE.getCode();
-        LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName);
+        Integer parserType = 
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
+        LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, "");
         Map<String, Object> parse = customParse.parse(logData, ip, 1l, 
currentStamp, "");
         System.out.println(parse);
         stopwatch.stop();
         log.info("cost time:{}", stopwatch.elapsed().toMillis());
     }
 
+    @Test
+    public void parseSimpleTest() {
+        Integer parserType = 
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
+        String keyList = 
"timestamp:date,mqtopic:keyword,mqtag:keyword,logstore:keyword,logsource:keyword,message:text,tail:keyword,logip:keyword,linenumber:long,filename:keyword,datetime:date,project_name:keyword,client_ip:keyword,level:keyword,log_id:keyword,url:keyword,up_ip:keyword,logger_line:keyword,thread:keyword,biz_id:keyword,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword";
+        String leyOrderList = 
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,datetime:1,project_name:1,client_ip:1,level:1,log_id:1,url:1,up_ip:1,logger_line:1,thread:1,biz_id:1,tailId:3,spaceId:3,storeId:3,deploySpace:3";
+        String valueList = "-1,10,0,1,2,3,4,5,6,7,8,9";
+        String parseScript = "]|[";
+        String message = "2025-02-12T20:11:02.501+0800]";
+        Long collectStamp = Instant.now().toEpochMilli();
+        LogParser logParser = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, leyOrderList);
+        Map<String, Object> parseMsg = logParser.parseSimple(message, 
collectStamp);
+        Assert.assertNotNull(parseMsg);
+    }
+
     @Test
     public void test() {
         System.out.println("1.647590227174E12".length());
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/job/TailLogCountJob.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/job/TailLogCountJob.java
index ca77cc5a..1241cba2 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/job/TailLogCountJob.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/job/TailLogCountJob.java
@@ -19,11 +19,11 @@
 package org.apache.ozhera.log.manager.job;
 
 import cn.hutool.core.thread.ThreadUtil;
-import org.apache.ozhera.log.manager.service.impl.LogCountServiceImpl;
-import org.apache.ozhera.log.utils.DateUtils;
 import com.xiaomi.youpin.docean.anno.Component;
 import com.xiaomi.youpin.docean.plugin.config.anno.Value;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.ozhera.log.manager.service.impl.LogCountServiceImpl;
+import org.apache.ozhera.log.utils.DateUtils;
 
 import javax.annotation.Resource;
 import java.util.concurrent.Executors;
@@ -46,11 +46,9 @@ public class TailLogCountJob {
         ScheduledExecutorService scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor(
                 ThreadUtil.newNamedThreadFactory("log-tailLogCountJob", false)
         );
-        long initDelay = 0;
-        long intervalTime = 1;
-        scheduledExecutor.scheduleAtFixedRate(() -> {
-            statisticsAll();
-        }, initDelay, intervalTime, TimeUnit.HOURS);
+        long initDelay = 2;
+        long intervalTime = 2 * 60;
+        scheduledExecutor.scheduleAtFixedRate(this::statisticsAll, initDelay, 
intervalTime, TimeUnit.MINUTES);
     }
 
     public void statisticsAll() {
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/AgentConfigServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/AgentConfigServiceImpl.java
index d14e7857..12c247f1 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/AgentConfigServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/AgentConfigServiceImpl.java
@@ -18,13 +18,14 @@
  */
 package org.apache.ozhera.log.manager.service.impl;
 
-import com.google.gson.Gson;
+import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.ozhera.log.api.model.meta.LogCollectMeta;
 import org.apache.ozhera.log.api.service.AgentConfigService;
 import org.apache.ozhera.log.manager.service.extension.agent.MilogAgentService;
 import 
org.apache.ozhera.log.manager.service.extension.agent.MilogAgentServiceFactory;
-import com.xiaomi.youpin.docean.anno.Service;
-import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.ozhera.log.common.Constant.GSON;
 
 /**
  * @author wtt
@@ -59,7 +60,7 @@ public class AgentConfigServiceImpl implements 
AgentConfigService {
                 init();
             }
             LogCollectMeta logCollectMeta = 
milogAgentService.getLogCollectMetaFromManager(ip);
-            log.info("getLogCollectMetaFromManager end:{} {} {}", ip, new 
Gson().toJson(logCollectMeta), (System.currentTimeMillis() - begin));
+            log.info("getLogCollectMetaFromManager end:{} {} {}", ip, 
GSON.toJson(logCollectMeta), (System.currentTimeMillis() - begin));
             return logCollectMeta;
         } catch (Exception e) {
             log.error("getLogCollectMetaFromManager error,ip:{}", ip, e);
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogCountServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogCountServiceImpl.java
index c828b2ba..b2df9182 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogCountServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogCountServiceImpl.java
@@ -18,6 +18,13 @@
  */
 package org.apache.ozhera.log.manager.service.impl;
 
+import cn.hutool.core.collection.ListUtil;
+import com.google.common.collect.Lists;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.common.StringUtils;
+import com.xiaomi.youpin.docean.plugin.es.EsService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.ozhera.log.common.Result;
 import org.apache.ozhera.log.manager.domain.EsCluster;
 import org.apache.ozhera.log.manager.domain.Tpc;
@@ -30,13 +37,8 @@ import 
org.apache.ozhera.log.manager.model.dto.SpaceCollectTrendDTO;
 import org.apache.ozhera.log.manager.model.pojo.LogCountDO;
 import org.apache.ozhera.log.manager.service.LogCountService;
 import org.apache.ozhera.log.utils.DateUtils;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.common.StringUtils;
-import com.xiaomi.youpin.docean.plugin.es.EsService;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
 import org.elasticsearch.client.core.CountRequest;
-import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 
@@ -147,11 +149,25 @@ public class LogCountServiceImpl implements 
LogCountService {
         }
         Long thisDayFirstMillisecond = 
DateUtils.getThisDayFirstMillisecond(thisDay);
         List<LogCountDO> logCountDOList = new ArrayList();
-        LogCountDO logCountDO;
+        List<Map<String, Object>> tailList = 
logtailMapper.getAllTailForCount();
+        Map<Long, List<String>> existIndexMap = new HashMap<>();
+        long res = 0;
+        if (tailList.size() > 2000) {
+            List<List<Map<String, Object>>> partitionList = 
ListUtil.partition(tailList, 1000);
+            for (List<Map<String, Object>> mapList : partitionList) {
+                res += calculateAndInsertLogCounts(thisDay, mapList, 
existIndexMap, thisDayFirstMillisecond, logCountDOList);
+            }
+        } else {
+            res = calculateAndInsertLogCounts(thisDay, tailList, 
existIndexMap, thisDayFirstMillisecond, logCountDOList);
+        }
+        log.info("End of statistics log,Should be counted{},Total 
statistics{}", tailList.size(), res);
+    }
+
+    private long calculateAndInsertLogCounts(String thisDay, List<Map<String, 
Object>> tailList, Map<Long, List<String>> existIndexMap, Long 
thisDayFirstMillisecond, List<LogCountDO> logCountDOList) {
+        String esIndex;
         EsService esService;
         Long total;
-        String esIndex;
-        List<Map<String, Object>> tailList = 
logtailMapper.getAllTailForCount();
+        LogCountDO logCountDO;
         for (Map<String, Object> tail : tailList) {
             try {
                 esIndex = String.valueOf(tail.get("es_index"));
@@ -159,21 +175,24 @@ public class LogCountServiceImpl implements 
LogCountService {
                     total = 0l;
                     esIndex = "";
                 } else {
-                    esService = 
esCluster.getEsService(Long.parseLong(String.valueOf(tail.get("es_cluster_id"))));
+                    long clusterId = 
Long.parseLong(String.valueOf(tail.get("es_cluster_id")));
+                    esService = esCluster.getEsService(clusterId);
                     if (esService == null) {
                         log.warn("Statistics logs warn,tail:{} the logs are 
not counted and the ES client is not generated", tail);
                         continue;
                     }
-                    SearchSourceBuilder builder = new SearchSourceBuilder();
-                    BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
-                    boolQueryBuilder.filter(QueryBuilders.termQuery("tail", 
tail.get("tail")));
-                    
boolQueryBuilder.filter(QueryBuilders.rangeQuery("timestamp").from(thisDayFirstMillisecond).to(thisDayFirstMillisecond
 + DateUtils.dayms - 1));
-                    builder.query(boolQueryBuilder);
-                    // statistics
-                    CountRequest countRequest = new CountRequest();
-                    countRequest.indices(esIndex);
-                    countRequest.source(builder);
-                    total = esService.count(countRequest);
+
+                    existIndexMap.computeIfAbsent(clusterId, k -> new 
ArrayList<>());
+                    List<String> clusterIndexes = existIndexMap.get(clusterId);
+
+                    if (!clusterIndexes.contains(esIndex)) {
+                        if (existsTemplate(esService, esIndex)) {
+                            clusterIndexes.add(esIndex);
+                        } else {
+                            continue;
+                        }
+                    }
+                    total = countLogs(esService, esIndex, tail, 
thisDayFirstMillisecond);
                 }
                 logCountDO = new LogCountDO();
                 
logCountDO.setTailId(Long.parseLong(String.valueOf(tail.get("id"))));
@@ -189,7 +208,30 @@ public class LogCountServiceImpl implements 
LogCountService {
         if (CollectionUtils.isNotEmpty(logCountDOList)) {
             res = logCountMapper.batchInsert(logCountDOList);
         }
-        log.info("End of statistics log,Should be counted{},Total 
statistics{}", tailList.size(), res);
+        return res;
+    }
+
+    private Long countLogs(EsService esService, String esIndex, Map<String, 
Object> tail, Long startTime) {
+        SearchSourceBuilder builder = new SearchSourceBuilder();
+        builder.query(QueryBuilders.boolQuery()
+                .filter(QueryBuilders.termQuery("tailId", tail.get("id")))
+                
.filter(QueryBuilders.rangeQuery("timestamp").from(startTime).to(startTime + 
DateUtils.dayms - 1))
+        );
+
+        CountRequest countRequest = new CountRequest(esIndex);
+        countRequest.source(builder);
+
+        try {
+            return esService.count(countRequest);
+        } catch (Exception e) {
+            log.error("Failed to count logs for index [{}] and tail [{}]", 
esIndex, tail, e);
+            return 0L;
+        }
+    }
+
+    public boolean existsTemplate(EsService esService, String templateName) 
throws IOException {
+        IndexTemplatesExistRequest request = new 
IndexTemplatesExistRequest(templateName);
+        return esService.existsTemplate(request);
     }
 
     @Override
@@ -202,8 +244,8 @@ public class LogCountServiceImpl implements LogCountService 
{
     public void deleteHistoryLogCount() {
         Calendar calendar = Calendar.getInstance();
         calendar.add(Calendar.DAY_OF_MONTH, -70);
-        String deleteBeforedDay = new 
SimpleDateFormat("yyyy-MM-dd").format(calendar.getTime());
-        logCountMapper.deleteBeforeDay(deleteBeforedDay);
+        String deleteBeforeDay = new 
SimpleDateFormat("yyyy-MM-dd").format(calendar.getTime());
+        logCountMapper.deleteBeforeDay(deleteBeforeDay);
     }
 
     @Override
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
index b28925e3..bfa67e57 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
@@ -20,6 +20,11 @@ package org.apache.ozhera.log.manager.service.impl;
 
 import cn.hutool.core.lang.Assert;
 import com.google.common.collect.Lists;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.plugin.config.anno.Value;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.app.api.response.AppBaseInfo;
 import org.apache.ozhera.app.model.vo.HeraEnvIpVo;
 import org.apache.ozhera.log.api.enums.*;
@@ -56,11 +61,6 @@ import 
org.apache.ozhera.log.manager.service.nacos.impl.StreamConfigNacosProvide
 import org.apache.ozhera.log.parse.LogParser;
 import org.apache.ozhera.log.parse.LogParserFactory;
 import org.apache.ozhera.log.utils.IndexUtils;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.plugin.config.anno.Value;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.nutz.lang.Strings;
 
 import javax.annotation.Resource;
@@ -70,7 +70,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 import static org.apache.ozhera.log.common.Constant.*;
-import static org.apache.ozhera.log.manager.common.Utils.getKeyValueList;
 
 @Slf4j
 @Service
@@ -329,7 +328,7 @@ public class LogTailServiceImpl extends BaseService 
implements LogTailService {
             MilogLogStoreDO logStore = 
logStoreDao.queryById(tail.getStoreId());
             if (null != logStore && 
StringUtils.isNotEmpty(logStore.getKeyList())) {
                 String keyList = logStore.getKeyList();
-                String valueList = getKeyValueList(keyList, 
tail.getValueList());
+                String valueList = IndexUtils.getKeyValueList(keyList, 
tail.getValueList());
                 tail.setValueList(valueList);
             }
             // Handle filterconf to rateLimit
@@ -391,6 +390,9 @@ public class LogTailServiceImpl extends BaseService 
implements LogTailService {
 
     @Override
     public Result<Void> updateMilogLogTail(LogTailParam param) {
+        if (null == param.getId()) {
+            return Result.failParam("id can not be null");
+        }
         MilogLogTailDo ret = milogLogtailDao.queryById(param.getId());
         if (ret == null) {
             return new Result<>(CommonError.ParamsError.getCode(), "tail does 
not exist");
@@ -804,7 +806,7 @@ public class LogTailServiceImpl extends BaseService 
implements LogTailService {
         String valueList = 
IndexUtils.getNumberValueList(logstoreDO.getKeyList(), 
mlogParseParam.getValueList());
         Long currentStamp = Instant.now().toEpochMilli();
         try {
-            LogParser logParser = 
LogParserFactory.getLogParser(mlogParseParam.getParseType(), keyList, 
valueList, mlogParseParam.getParseScript());
+            LogParser logParser = 
LogParserFactory.getLogParser(mlogParseParam.getParseType(), keyList, 
valueList, mlogParseParam.getParseScript(), logstoreDO.getKeyList());
             Map<String, Object> parseMsg = 
logParser.parseSimple(mlogParseParam.getMsg(), currentStamp);
             return Result.success(parseMsg);
         } catch (Exception e) {
@@ -819,7 +821,7 @@ public class LogTailServiceImpl extends BaseService 
implements LogTailService {
             return Result.failParam(checkMsg);
         }
         try {
-            LogParser logParser = 
LogParserFactory.getLogParser(mlogParseParam.getParseType(), "", "", 
mlogParseParam.getParseScript());
+            LogParser logParser = 
LogParserFactory.getLogParser(mlogParseParam.getParseType(), "", "", 
mlogParseParam.getParseScript(), "");
             List<String> parsedLog = 
logParser.parseLogData(mlogParseParam.getMsg());
             return Result.success(parsedLog);
         } catch (Exception e) {
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
index 964ff95b..354aaf55 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
@@ -25,6 +25,11 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.xiaomi.data.push.nacos.NacosNaming;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.plugin.config.anno.Value;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.map.HashedMap;
 import org.apache.ozhera.log.api.enums.LogStorageTypeEnum;
 import org.apache.ozhera.log.api.enums.MQSourceEnum;
 import org.apache.ozhera.log.api.enums.OperateEnum;
@@ -49,11 +54,6 @@ import 
org.apache.ozhera.log.manager.service.nacos.DynamicConfigPublisher;
 import org.apache.ozhera.log.manager.service.nacos.FetchStreamMachineService;
 import org.apache.ozhera.log.manager.service.nacos.MultipleNacosConfig;
 import org.apache.ozhera.log.manager.service.nacos.impl.*;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.plugin.config.anno.Value;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.map.HashedMap;
 import org.apache.ozhera.log.model.*;
 
 import javax.annotation.Resource;
@@ -411,6 +411,7 @@ public class MilogConfigNacosServiceImpl implements 
MilogConfigNacosService {
 
             sinkConfig.setLogstoreName(logStoreDO.getLogstoreName());
             
sinkConfig.setKeyList(Utils.parse2KeyAndTypeList(logStoreDO.getKeyList(), 
logStoreDO.getColumnTypeList()));
+            sinkConfig.setKeyOrderList(logStoreDO.getKeyList());
             MilogEsClusterDO esInfo = 
esCluster.getById(logStoreDO.getEsClusterId());
             if (null != esInfo) {
                 sinkConfig.setEsIndex(logStoreDO.getEsIndex());
diff --git 
a/ozhera-log/log-manager/src/main/resources/mapper/MilogLogstailMapper.xml 
b/ozhera-log/log-manager/src/main/resources/mapper/MilogLogstailMapper.xml
index d5132e59..ab033294 100644
--- a/ozhera-log/log-manager/src/main/resources/mapper/MilogLogstailMapper.xml
+++ b/ozhera-log/log-manager/src/main/resources/mapper/MilogLogstailMapper.xml
@@ -33,5 +33,7 @@ http://www.apache.org/licenses/LICENSE-2.0
             milog_logstore store
         WHERE
             tail.store_id = store.id
+          AND store.es_cluster_id IS NOT NULL
+          AND store.es_index IS NOT NULL
     </select>
 </mapper>
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
index 0e97d9b3..d73b5d3c 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
@@ -95,27 +95,30 @@ public class ConfigManager {
                 log.warn("[ConfigManager.initConfigManager] Nacos 
configuration [dataID:{},group:{}]not found,exit initConfigManager", 
spaceDataId, DEFAULT_GROUP_ID);
                 return;
             }
-            String uniqueMark = StreamUtils.getCurrentMachineMark();
+            String uniqueMarks = StreamUtils.getCurrentMachineMark();
             Map<String, Map<Long, String>> config = 
milogStreamConfig.getConfig();
-            if (config.containsKey(uniqueMark)) {
-                Map<Long, String> milogStreamDataMap = config.get(uniqueMark);
-                log.info("[ConfigManager.initConfigManager] 
uniqueMark:{},data:{}", uniqueMark, gson.toJson(milogStreamDataMap));
-                for (Long spaceId : milogStreamDataMap.keySet()) {
-                    final String dataId = milogStreamDataMap.get(spaceId);
-                    // init spaceData config
-                    String logSpaceDataStr = nacosConfig.getConfigStr(dataId, 
DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
-                    if (StringUtils.isNotEmpty(logSpaceDataStr)) {
-                        MilogSpaceData milogSpaceData = 
GSON.fromJson(logSpaceDataStr, MilogSpaceData.class);
-                        if (null != milogSpaceData && 
!milogSpaceDataMap.containsKey(spaceId)) {
-                            MilogConfigListener configListener = new 
MilogConfigListener(spaceId, dataId, DEFAULT_GROUP_ID, milogSpaceData, 
nacosConfig);
-                            addListener(spaceId, configListener);
-                            milogSpaceDataMap.put(spaceId, milogSpaceData);
-                            log.info("[ConfigManager.initStream] added log 
config listener for spaceId:{},dataId:{}", spaceId, dataId);
+            String[] split = uniqueMarks.split(SYMBOL_COMMA);
+            for (String uniqueMark : split) {
+                if (config.containsKey(uniqueMark)) {
+                    Map<Long, String> milogStreamDataMap = 
config.get(uniqueMark);
+                    log.info("[ConfigManager.initConfigManager] 
uniqueMark:{},data:{}", uniqueMark, gson.toJson(milogStreamDataMap));
+                    for (Long spaceId : milogStreamDataMap.keySet()) {
+                        final String dataId = milogStreamDataMap.get(spaceId);
+                        // init spaceData config
+                        String logSpaceDataStr = 
nacosConfig.getConfigStr(dataId, DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
+                        if (StringUtils.isNotEmpty(logSpaceDataStr)) {
+                            MilogSpaceData milogSpaceData = 
GSON.fromJson(logSpaceDataStr, MilogSpaceData.class);
+                            if (null != milogSpaceData && 
!milogSpaceDataMap.containsKey(spaceId)) {
+                                MilogConfigListener configListener = new 
MilogConfigListener(spaceId, dataId, DEFAULT_GROUP_ID, milogSpaceData, 
nacosConfig);
+                                addListener(spaceId, configListener);
+                                milogSpaceDataMap.put(spaceId, milogSpaceData);
+                                log.info("[ConfigManager.initStream] added log 
config listener for spaceId:{},dataId:{}", spaceId, dataId);
+                            }
                         }
                     }
+                } else {
+                    log.info("server start current not contain space 
config,uniqueMark:{}", uniqueMark);
                 }
-            } else {
-                log.info("server start current not contain space 
config,uniqueMark:{}", uniqueMark);
             }
         } catch (Exception e) {
             log.error("[ConfigManager.initStream] initStream exec err", e);
@@ -167,7 +170,7 @@ public class ConfigManager {
     private void processConfigForUniqueMark(String uniqueMark, Map<String, 
Map<Long, String>> config) {
         StreamCommonExtension extensionInstance = 
getStreamCommonExtensionInstance();
         if (!extensionInstance.checkUniqueMarkExists(uniqueMark, config)) {
-            log.warn("listen dataID:{},groupId:{},but receive config is 
empty", spaceDataId, DEFAULT_GROUP_ID);
+            log.warn("listen dataID:{},groupId:{},but receive config is 
empty,uniqueMarks:{}", spaceDataId, DEFAULT_GROUP_ID, uniqueMark);
             return;
         }
         Map<Long, String> dataIdMap = 
extensionInstance.getConfigMapByUniqueMark(config, uniqueMark);
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
index 1a9227b1..3270852e 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
@@ -160,6 +160,7 @@ public class JobManager {
                 .tag(logtailConfig.getTag())
                 .index(sinkConfig.getEsIndex())
                 .keyList(sinkConfig.getKeyList())
+                .keyOrderList(sinkConfig.getKeyOrderList())
                 .valueList(logtailConfig.getValueList())
                 .parseScript(logtailConfig.getParseScript())
                 .logStoreName(sinkConfig.getLogstoreName())
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/SinkJobConfig.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/SinkJobConfig.java
index 476ccc55..51658765 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/SinkJobConfig.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/SinkJobConfig.java
@@ -18,13 +18,13 @@
  */
 package org.apache.ozhera.log.stream.job;
 
-import org.apache.ozhera.log.model.StorageInfo;
-import org.apache.ozhera.log.stream.common.SinkJobEnum;
-import org.apache.ozhera.log.stream.sink.SinkChain;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.ozhera.log.model.StorageInfo;
+import org.apache.ozhera.log.stream.common.SinkJobEnum;
+import org.apache.ozhera.log.stream.sink.SinkChain;
 
 import java.util.List;
 
@@ -47,6 +47,9 @@ public class SinkJobConfig extends LogConfig {
     private String tag;
     private String index;
     private String keyList;
+
+    private String keyOrderList;
+
     private String valueList;
     private String parseScript;
     private String logStoreName;
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
index 07047a8e..c3232dbc 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
@@ -18,14 +18,15 @@
  */
 package org.apache.ozhera.log.stream.job.extension.impl;
 
+import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.ozhera.log.model.LogtailConfig;
 import org.apache.ozhera.log.model.SinkConfig;
 import org.apache.ozhera.log.stream.job.extension.StreamCommonExtension;
-import com.xiaomi.youpin.docean.anno.Service;
-import lombok.extern.slf4j.Slf4j;
 
 import java.util.Map;
 
+import static org.apache.ozhera.log.common.Constant.SYMBOL_COMMA;
 import static 
org.apache.ozhera.log.stream.common.LogStreamConstants.DEFAULT_COMMON_STREAM_EXTENSION;
 
 /**
@@ -42,8 +43,14 @@ public class DefaultStreamCommonExtension implements 
StreamCommonExtension {
         return data;
     }
 
-    public Boolean checkUniqueMarkExists(String uniqueMark, Map<String, 
Map<Long, String>> config) {
-        return config.containsKey(uniqueMark);
+    public Boolean checkUniqueMarkExists(String uniqueMarks, Map<String, 
Map<Long, String>> config) {
+        String[] split = uniqueMarks.split(SYMBOL_COMMA);
+        for (String s : split) {
+            if (config.containsKey(s)) {
+                return true;
+            }
+        }
+        return false;
     }
 
     public Map<Long, String> getConfigMapByUniqueMark(Map<String, Map<Long, 
String>> config, String uniqueMark) {
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/kafka/KafkaSinkJobProvider.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/kafka/KafkaSinkJobProvider.java
index aaea9c17..feb228e6 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/kafka/KafkaSinkJobProvider.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/kafka/KafkaSinkJobProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.ozhera.log.stream.job.extension.kafka;
 
+import com.xiaomi.youpin.docean.anno.Service;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.ozhera.log.parse.LogParser;
 import org.apache.ozhera.log.parse.LogParserFactory;
 import org.apache.ozhera.log.stream.common.LogStreamConstants;
@@ -29,8 +31,6 @@ import 
org.apache.ozhera.log.stream.job.extension.MessageSenderFactory;
 import org.apache.ozhera.log.stream.job.extension.SinkJob;
 import org.apache.ozhera.log.stream.job.extension.SinkJobProvider;
 import org.apache.ozhera.log.stream.sink.SinkChain;
-import com.xiaomi.youpin.docean.anno.Service;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 
 /**
  * @author wtt
@@ -51,7 +51,7 @@ public class KafkaSinkJobProvider implements SinkJobProvider {
         LogParser logParser = LogParserFactory.getLogParser(
                 sinkJobConfig.getParseType(), sinkJobConfig.getKeyList(), 
sinkJobConfig.getValueList(),
                 sinkJobConfig.getParseScript(), sinkJobConfig.getTopic(), 
sinkJobConfig.getTail(),
-                sinkJobConfig.getTag(), sinkJobConfig.getLogStoreName());
+                sinkJobConfig.getTag(), sinkJobConfig.getLogStoreName(), 
sinkJobConfig.getKeyOrderList());
 
         LogDataTransfer dataTransfer = new LogDataTransfer(sinkChain, 
logParser, messageSender, sinkJobConfig);
         dataTransfer.setJobType(jobType);
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/rocketmq/RocketMqSinkJobProvider.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/rocketmq/RocketMqSinkJobProvider.java
index 268c4723..a02e706e 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/rocketmq/RocketMqSinkJobProvider.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/rocketmq/RocketMqSinkJobProvider.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ozhera.log.stream.job.extension.rocketmq;
 
+import com.xiaomi.youpin.docean.anno.Service;
 import org.apache.ozhera.log.parse.LogParser;
 import org.apache.ozhera.log.parse.LogParserFactory;
 import org.apache.ozhera.log.stream.common.LogStreamConstants;
@@ -29,7 +30,6 @@ import 
org.apache.ozhera.log.stream.job.extension.MessageSenderFactory;
 import org.apache.ozhera.log.stream.job.extension.SinkJob;
 import org.apache.ozhera.log.stream.job.extension.SinkJobProvider;
 import org.apache.ozhera.log.stream.sink.SinkChain;
-import com.xiaomi.youpin.docean.anno.Service;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 
 /**
@@ -49,7 +49,7 @@ public class RocketMqSinkJobProvider implements 
SinkJobProvider {
         LogParser logParser = LogParserFactory.getLogParser(
                 sinkJobConfig.getParseType(), sinkJobConfig.getKeyList(), 
sinkJobConfig.getValueList(),
                 sinkJobConfig.getParseScript(), sinkJobConfig.getTopic(), 
sinkJobConfig.getTail(),
-                sinkJobConfig.getTag(), sinkJobConfig.getLogStoreName());
+                sinkJobConfig.getTag(), sinkJobConfig.getLogStoreName(), 
sinkJobConfig.getKeyOrderList());
 
         LogDataTransfer dataTransfer = new LogDataTransfer(sinkChain, 
logParser, messageSender, sinkJobConfig);
         dataTransfer.setJobType(jobType);
diff --git 
a/ozhera-log/log-stream/src/test/java/org/apache/ozhera/log/stream/TestSomething.java
 
b/ozhera-log/log-stream/src/test/java/org/apache/ozhera/log/stream/TestSomething.java
index 090c42c2..f6f84962 100644
--- 
a/ozhera-log/log-stream/src/test/java/org/apache/ozhera/log/stream/TestSomething.java
+++ 
b/ozhera-log/log-stream/src/test/java/org/apache/ozhera/log/stream/TestSomething.java
@@ -19,11 +19,11 @@
 package org.apache.ozhera.log.stream;
 
 import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.log.api.model.msg.LineMessage;
 import org.apache.ozhera.log.parse.LogParser;
 import org.apache.ozhera.log.parse.LogParserFactory;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -92,7 +92,7 @@ public class TestSomething {
         //RmqSinkJob rmqSinkJob = new RmqSinkJob();
         Integer parserType = 
LogParserFactory.LogParserEnum.CUSTOM_PARSE.getCode();
 
-        LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName);
+        LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, "");
 
 //        rmqSinkJob.setLogParser(customParse);
 
@@ -105,6 +105,6 @@ public class TestSomething {
         Map<String, Map<String, String>> testMap = new HashMap<>();
         Map<String, String> test = testMap.computeIfAbsent("test", k -> new 
HashMap<>());
         test.put("sfsdf", "ersrser");
-        log.info("result:{}",GSON.toJson(testMap));
+        log.info("result:{}", GSON.toJson(testMap));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to