This is an automated email from the ASF dual-hosted git repository.

zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new ef75b1ae refactor(log): fefactoring log parsing logic (#551)
ef75b1ae is described below

commit ef75b1ae4f5cef12a6202330cd9588a93eeadadc
Author: wtt <[email protected]>
AuthorDate: Tue Feb 25 14:10:38 2025 +0800

    refactor(log): fefactoring log parsing logic (#551)
---
 .../apache/ozhera/log/parse/AbstractLogParser.java | 25 +++++++++--
 .../apache/ozhera/log/parse/RegexLogParser.java    | 51 +++++++++++++++-------
 .../ozhera/log/parse/SeparatorLogParser.java       | 18 --------
 .../apache/ozhera/log/common/LogParserTest.java    | 34 ++++++++++++---
 .../manager/service/impl/LogTailServiceImpl.java   |  4 +-
 .../ozhera/log/stream/plugin/es/EsPlugin.java      | 10 ++---
 6 files changed, 91 insertions(+), 51 deletions(-)

diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
index ebc260f6..09acb66c 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
@@ -23,12 +23,12 @@ import cn.hutool.core.util.ReflectUtil;
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.utils.IndexUtils;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.*;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * @author wtt
@@ -43,9 +43,26 @@ public abstract class AbstractLogParser implements LogParser 
{
 
     private List<FieldInterceptor> fieldInterceptors = Lists.newArrayList();
 
+    protected Map<String, Integer> valueMap;
+
     public AbstractLogParser(LogParserData parserData) {
         this.parserData = parserData;
         createFieldInterceptors();
+        this.valueMap = createValueMap(parseKeyValueList(parserData));
+    }
+
+    private List<String> parseKeyValueList(LogParserData parserData) {
+        if (StringUtils.isNotBlank(parserData.getKeyOrderList())) {
+            String keyValueList = 
IndexUtils.getKeyValueList(parserData.getKeyOrderList(), 
parserData.getValueList());
+            return Arrays.asList(splitList(keyValueList));
+        }
+        return Collections.emptyList();
+    }
+
+    private Map<String, Integer> createValueMap(List<String> valueList) {
+        return IntStream.range(0, valueList.size())
+                .boxed()
+                .collect(Collectors.toMap(valueList::get, 
Function.identity()));
     }
 
     private void createFieldInterceptors() {
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/RegexLogParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/RegexLogParser.java
index 8ae9ea69..8173d439 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/RegexLogParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/RegexLogParser.java
@@ -18,13 +18,14 @@
  */
 package org.apache.ozhera.log.parse;
 
-import org.apache.ozhera.log.utils.IndexUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.utils.IndexUtils;
 
 import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * @author zhangjuan
@@ -36,8 +37,12 @@ public class RegexLogParser extends AbstractLogParser {
 
     private Pattern pattern;
 
+    private Map<Integer, String> reversedMap;
+
     public RegexLogParser(LogParserData parserData) {
         super(parserData);
+        reversedMap = valueMap.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getValue, 
Map.Entry::getKey));
         pattern = Pattern.compile(parserData.getParseScript(), 
Pattern.MULTILINE);
     }
 
@@ -49,27 +54,32 @@ public class RegexLogParser extends AbstractLogParser {
     @Override
     public Map<String, Object> doParseSimple(String logData, Long 
collectStamp) {
         Map<String, Object> ret = new HashMap<>();
-        if (logData == null || logData.length() == 0) {
+        if (logData == null || logData.isEmpty()) {
             return ret;
         }
         try {
             // A list of extracted contents by regular regex
             List<String> logArray = parseLogData(logData);
-            // Index the list of column names
-            List<String> keyNameList = 
IndexUtils.getKeyListSlice(parserData.getKeyList());
-            // Each index column name corresponds to an array of index values 
for the content in the regular extracted content list
-            int[] valueIndexList = 
Arrays.stream(parserData.getValueList().split(",")).mapToInt(Integer::parseInt).toArray();
-            for (int i = 0; i < keyNameList.size(); i++) {
-                // If the index of the key is outside the range of value, or 
the index corresponding to value is -1, the current key is skipped
-                if (i >= valueIndexList.length || valueIndexList[i] == -1) {
-                    continue;
-                }
-                // If the index of value does not exceed the regular parsed 
content array, the key has a corresponding resolution value, otherwise it is ""
-                String value = "";
-                if (valueIndexList[i] < logArray.size()) {
-                    value = logArray.get(valueIndexList[i]);
+
+            if (null != valueMap && !valueMap.isEmpty()) {
+                parseWithValueMap(logArray, ret);
+            } else {
+                // Index the list of column names
+                List<String> keyNameList = 
IndexUtils.getKeyListSlice(parserData.getKeyList());
+                // Each index column name corresponds to an array of index 
values for the content in the regular extracted content list
+                int[] valueIndexList = 
Arrays.stream(parserData.getValueList().split(",")).mapToInt(Integer::parseInt).toArray();
+                for (int i = 0; i < keyNameList.size(); i++) {
+                    // If the index of the key is outside the range of value, 
or the index corresponding to value is -1, the current key is skipped
+                    if (i >= valueIndexList.length || valueIndexList[i] == -1) 
{
+                        continue;
+                    }
+                    // If the index of value does not exceed the regular 
parsed content array, the key has a corresponding resolution value, otherwise 
it is ""
+                    String value = "";
+                    if (valueIndexList[i] < logArray.size()) {
+                        value = logArray.get(valueIndexList[i]);
+                    }
+                    ret.put(keyNameList.get(i), StringUtils.isNotEmpty(value) 
? value.trim() : value);
                 }
-                ret.put(keyNameList.get(i), StringUtils.isNotEmpty(value) ? 
value.trim() : value);
             }
             validTimestamp(ret, collectStamp);
         } catch (Exception e) {
@@ -79,6 +89,15 @@ public class RegexLogParser extends AbstractLogParser {
         return ret;
     }
 
+    private void parseWithValueMap(List<String> logArray, Map<String, Object> 
ret) {
+        for (int i = 0; i < logArray.size(); i++) {
+            String key = reversedMap.get(i);
+            if (key != null) {
+                ret.put(key, logArray.get(i));
+            }
+        }
+    }
+
     @Override
     public List<String> parseLogData(String logData) throws Exception {
         List<String> ret = new ArrayList<>();
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
index 44035e4d..0baa3a9f 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
@@ -37,29 +37,11 @@ public class SeparatorLogParser extends AbstractLogParser {
     private String[] keysAndTypes;
     private String[] values;
 
-    private Map<String, Integer> valueMap;
-
     public SeparatorLogParser(LogParserData parserData) {
         super(parserData);
 
         keysAndTypes = splitList(parserData.getKeyList());
         values = splitList(parserData.getValueList());
-
-        this.valueMap = createValueMap(parseKeyValueList(parserData));
-    }
-
-    private List<String> parseKeyValueList(LogParserData parserData) {
-        if (StringUtils.isNotBlank(parserData.getKeyOrderList())) {
-            String keyValueList = 
IndexUtils.getKeyValueList(parserData.getKeyOrderList(), 
parserData.getValueList());
-            return Arrays.asList(splitList(keyValueList));
-        }
-        return Collections.emptyList();
-    }
-
-    private Map<String, Integer> createValueMap(List<String> valueList) {
-        return IntStream.range(0, valueList.size())
-                .boxed()
-                .collect(Collectors.toMap(valueList::get, 
Function.identity()));
     }
 
     @Override
diff --git 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
index d7499600..c6ab6ba4 100644
--- 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
+++ 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
@@ -66,20 +66,42 @@ public class LogParserTest {
     @Test
     public void test2() {
         Stopwatch stopwatch = Stopwatch.createStarted();
-        String keyList = 
"timestamp:date,level:keyword,traceId:keyword,threadName:text,className:text,line:keyword,methodName:keyword,message:text,podName:keyword,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
-        String valueList = "0,1,2,3,4,5,-1,6,-1";
-        String parseScript = "|";
-        String logData = "2025-02-13 16:52:09,325|INFO";
+        String keyList = 
"timestamp:date,mqtopic:keyword,mqtag:keyword,logstore:keyword,logsource:keyword,message:text,tail:keyword,logip:keyword,linenumber:long,filename:keyword,time:keyword,log_level:keyword,thread_name:keyword,log_name:keyword,trace_id:keyword,user_login_name:keyword,marker:keyword,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword";
+        String keyOrderList = 
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,time:1,log_level:1,thread_name:1,log_name:1,trace_id:1,user_login_name:1,marker:1,tailId:3,spaceId:3,storeId:3,deploySpace:3";
+        String valueList = "-1,7,0,1,2,3,4,5,6";
+        String parseScript = 
"(?s)(?s)(?s)(?s)(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\s\\+\\d{4})\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s([\\s\\S]*)";
+        String logData = "";
         String ip = "127.0.0.1";
         Long currentStamp = Instant.now().toEpochMilli();
-        Integer parserType = 
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
-        LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, "");
+        Integer parserType = 
LogParserFactory.LogParserEnum.REGEX_PARSE.getCode();
+        LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, 
keyOrderList);
         Map<String, Object> parse = customParse.parse(logData, ip, 1l, 
currentStamp, "");
         System.out.println(parse);
         stopwatch.stop();
         log.info("cost time:{}", stopwatch.elapsed().toMillis());
     }
 
+    @Test
+    public void test3() {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        //String keyList = 
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,time:1,log_level:1,thread_name:1,log_name:1,trace_id:1,user_login_name:1,marker:1,tailId:3,spaceId:3,storeId:3,deploySpace:3";
+        String keyList = 
"timestamp:date,mqtopic:keyword,mqtag:keyword,logstore:keyword,logsource:keyword,message:text,tail:keyword,logip:keyword,linenumber:long,filename:keyword,time:keyword,log_level:keyword,thread_name:keyword,log_name:keyword,trace_id:keyword,user_login_name:keyword,marker:keyword,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword";
+        String keyOrderList = 
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,time:1,log_level:1,thread_name:1,log_name:1,trace_id:1,user_login_name:1,marker:1,tailId:3,spaceId:3,storeId:3,deploySpace:3";
+        String valueList = "-1,7,0,1,2,3,4,5,6";
+        String parseScript = 
"(?s)(?s)(?s)(?s)(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\s\\+\\d{4})\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s([\\s\\S]*)";
+        String logData = "";
+        String ip = "127.0.0.1";
+        Long currentStamp = Instant.now().toEpochMilli();
+        Integer parserType = 
LogParserFactory.LogParserEnum.REGEX_PARSE.getCode();
+        LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, 
keyOrderList);
+        Map<String, Object> parse = customParse.parse(logData, ip, 1l, 
currentStamp, "");
+        System.out.println(parse);
+
+        System.out.println(customParse.getTimestampFromString("2023-08-25 
10:46:09.239", currentStamp));
+        stopwatch.stop();
+        log.info("cost time:{}", stopwatch.elapsed().toMillis());
+    }
+
     @Test
     public void parseSimpleTest() {
         Integer parserType = 
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
index bfa67e57..1236d7ee 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
@@ -390,8 +390,8 @@ public class LogTailServiceImpl extends BaseService 
implements LogTailService {
 
     @Override
     public Result<Void> updateMilogLogTail(LogTailParam param) {
-        if (null == param.getId()) {
-            return Result.failParam("id can not be null");
+        if (null == param || null == param.getId()) {
+            return Result.failParam("parameter error,param:" + param);
         }
         MilogLogTailDo ret = milogLogtailDao.queryById(param.getId());
         if (ret == null) {
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
index 0f50f493..08e3d0c8 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
@@ -23,9 +23,6 @@ import com.alibaba.fastjson.JSON;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.xiaomi.mone.es.EsProcessor;
-import org.apache.ozhera.log.common.Config;
-import org.apache.ozhera.log.model.StorageInfo;
-import org.apache.ozhera.log.stream.job.compensate.MqMessageDTO;
 import com.xiaomi.youpin.docean.anno.Service;
 import com.xiaomi.youpin.docean.plugin.es.EsProcessorConf;
 import com.xiaomi.youpin.docean.plugin.es.EsService;
@@ -34,6 +31,9 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.ozhera.log.common.Config;
+import org.apache.ozhera.log.model.StorageInfo;
+import org.apache.ozhera.log.stream.job.compensate.MqMessageDTO;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -201,7 +201,7 @@ public class EsPlugin {
                                     , x.getVersion()
                                     , failure.getCause().getMessage()
                             );
-                            log.error("esInfo:{},Bulk executionId:[{}] has 
error messages:{}", GSON.toJson(esInfo), executionId, msg);
+                            log.error("esInfo:{},Bulk executionId:[{}] has 
error messages:{}", GSON.toJson(esInfo), executionId, msg, failure.getCause());
                             count.incrementAndGet();
                         }
                     });
@@ -230,7 +230,7 @@ public class EsPlugin {
                         , clazz.getSimpleName()
                         , clazz.getTypeName()
                         , clazz.getCanonicalName()
-                        , failure.getMessage());
+                        , failure);
                 sendMessageToTopic(request, esInfo, onFailedConsumer);
             }
         }));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to