This is an automated email from the ASF dual-hosted git repository.
zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new ef75b1ae refactor(log): fefactoring log parsing logic (#551)
ef75b1ae is described below
commit ef75b1ae4f5cef12a6202330cd9588a93eeadadc
Author: wtt <[email protected]>
AuthorDate: Tue Feb 25 14:10:38 2025 +0800
refactor(log): fefactoring log parsing logic (#551)
---
.../apache/ozhera/log/parse/AbstractLogParser.java | 25 +++++++++--
.../apache/ozhera/log/parse/RegexLogParser.java | 51 +++++++++++++++-------
.../ozhera/log/parse/SeparatorLogParser.java | 18 --------
.../apache/ozhera/log/common/LogParserTest.java | 34 ++++++++++++---
.../manager/service/impl/LogTailServiceImpl.java | 4 +-
.../ozhera/log/stream/plugin/es/EsPlugin.java | 10 ++---
6 files changed, 91 insertions(+), 51 deletions(-)
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
index ebc260f6..09acb66c 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
@@ -23,12 +23,12 @@ import cn.hutool.core.util.ReflectUtil;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.utils.IndexUtils;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.*;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* @author wtt
@@ -43,9 +43,26 @@ public abstract class AbstractLogParser implements LogParser
{
private List<FieldInterceptor> fieldInterceptors = Lists.newArrayList();
+ protected Map<String, Integer> valueMap;
+
public AbstractLogParser(LogParserData parserData) {
this.parserData = parserData;
createFieldInterceptors();
+ this.valueMap = createValueMap(parseKeyValueList(parserData));
+ }
+
+ private List<String> parseKeyValueList(LogParserData parserData) {
+ if (StringUtils.isNotBlank(parserData.getKeyOrderList())) {
+ String keyValueList =
IndexUtils.getKeyValueList(parserData.getKeyOrderList(),
parserData.getValueList());
+ return Arrays.asList(splitList(keyValueList));
+ }
+ return Collections.emptyList();
+ }
+
+ private Map<String, Integer> createValueMap(List<String> valueList) {
+ return IntStream.range(0, valueList.size())
+ .boxed()
+ .collect(Collectors.toMap(valueList::get,
Function.identity()));
}
private void createFieldInterceptors() {
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/RegexLogParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/RegexLogParser.java
index 8ae9ea69..8173d439 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/RegexLogParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/RegexLogParser.java
@@ -18,13 +18,14 @@
*/
package org.apache.ozhera.log.parse;
-import org.apache.ozhera.log.utils.IndexUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.utils.IndexUtils;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* @author zhangjuan
@@ -36,8 +37,12 @@ public class RegexLogParser extends AbstractLogParser {
private Pattern pattern;
+ private Map<Integer, String> reversedMap;
+
public RegexLogParser(LogParserData parserData) {
super(parserData);
+ reversedMap = valueMap.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getValue,
Map.Entry::getKey));
pattern = Pattern.compile(parserData.getParseScript(),
Pattern.MULTILINE);
}
@@ -49,27 +54,32 @@ public class RegexLogParser extends AbstractLogParser {
@Override
public Map<String, Object> doParseSimple(String logData, Long
collectStamp) {
Map<String, Object> ret = new HashMap<>();
- if (logData == null || logData.length() == 0) {
+ if (logData == null || logData.isEmpty()) {
return ret;
}
try {
// A list of extracted contents by regular regex
List<String> logArray = parseLogData(logData);
- // Index the list of column names
- List<String> keyNameList =
IndexUtils.getKeyListSlice(parserData.getKeyList());
- // Each index column name corresponds to an array of index values
for the content in the regular extracted content list
- int[] valueIndexList =
Arrays.stream(parserData.getValueList().split(",")).mapToInt(Integer::parseInt).toArray();
- for (int i = 0; i < keyNameList.size(); i++) {
- // If the index of the key is outside the range of value, or
the index corresponding to value is -1, the current key is skipped
- if (i >= valueIndexList.length || valueIndexList[i] == -1) {
- continue;
- }
- // If the index of value does not exceed the regular parsed
content array, the key has a corresponding resolution value, otherwise it is ""
- String value = "";
- if (valueIndexList[i] < logArray.size()) {
- value = logArray.get(valueIndexList[i]);
+
+ if (null != valueMap && !valueMap.isEmpty()) {
+ parseWithValueMap(logArray, ret);
+ } else {
+ // Index the list of column names
+ List<String> keyNameList =
IndexUtils.getKeyListSlice(parserData.getKeyList());
+ // Each index column name corresponds to an array of index
values for the content in the regular extracted content list
+ int[] valueIndexList =
Arrays.stream(parserData.getValueList().split(",")).mapToInt(Integer::parseInt).toArray();
+ for (int i = 0; i < keyNameList.size(); i++) {
+ // If the index of the key is outside the range of value,
or the index corresponding to value is -1, the current key is skipped
+ if (i >= valueIndexList.length || valueIndexList[i] == -1)
{
+ continue;
+ }
+ // If the index of value does not exceed the regular
parsed content array, the key has a corresponding resolution value, otherwise
it is ""
+ String value = "";
+ if (valueIndexList[i] < logArray.size()) {
+ value = logArray.get(valueIndexList[i]);
+ }
+ ret.put(keyNameList.get(i), StringUtils.isNotEmpty(value)
? value.trim() : value);
}
- ret.put(keyNameList.get(i), StringUtils.isNotEmpty(value) ?
value.trim() : value);
}
validTimestamp(ret, collectStamp);
} catch (Exception e) {
@@ -79,6 +89,15 @@ public class RegexLogParser extends AbstractLogParser {
return ret;
}
+ private void parseWithValueMap(List<String> logArray, Map<String, Object>
ret) {
+ for (int i = 0; i < logArray.size(); i++) {
+ String key = reversedMap.get(i);
+ if (key != null) {
+ ret.put(key, logArray.get(i));
+ }
+ }
+ }
+
@Override
public List<String> parseLogData(String logData) throws Exception {
List<String> ret = new ArrayList<>();
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
index 44035e4d..0baa3a9f 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
@@ -37,29 +37,11 @@ public class SeparatorLogParser extends AbstractLogParser {
private String[] keysAndTypes;
private String[] values;
- private Map<String, Integer> valueMap;
-
public SeparatorLogParser(LogParserData parserData) {
super(parserData);
keysAndTypes = splitList(parserData.getKeyList());
values = splitList(parserData.getValueList());
-
- this.valueMap = createValueMap(parseKeyValueList(parserData));
- }
-
- private List<String> parseKeyValueList(LogParserData parserData) {
- if (StringUtils.isNotBlank(parserData.getKeyOrderList())) {
- String keyValueList =
IndexUtils.getKeyValueList(parserData.getKeyOrderList(),
parserData.getValueList());
- return Arrays.asList(splitList(keyValueList));
- }
- return Collections.emptyList();
- }
-
- private Map<String, Integer> createValueMap(List<String> valueList) {
- return IntStream.range(0, valueList.size())
- .boxed()
- .collect(Collectors.toMap(valueList::get,
Function.identity()));
}
@Override
diff --git
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
index d7499600..c6ab6ba4 100644
---
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
+++
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
@@ -66,20 +66,42 @@ public class LogParserTest {
@Test
public void test2() {
Stopwatch stopwatch = Stopwatch.createStarted();
- String keyList =
"timestamp:date,level:keyword,traceId:keyword,threadName:text,className:text,line:keyword,methodName:keyword,message:text,podName:keyword,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
- String valueList = "0,1,2,3,4,5,-1,6,-1";
- String parseScript = "|";
- String logData = "2025-02-13 16:52:09,325|INFO";
+ String keyList =
"timestamp:date,mqtopic:keyword,mqtag:keyword,logstore:keyword,logsource:keyword,message:text,tail:keyword,logip:keyword,linenumber:long,filename:keyword,time:keyword,log_level:keyword,thread_name:keyword,log_name:keyword,trace_id:keyword,user_login_name:keyword,marker:keyword,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword";
+ String keyOrderList =
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,time:1,log_level:1,thread_name:1,log_name:1,trace_id:1,user_login_name:1,marker:1,tailId:3,spaceId:3,storeId:3,deploySpace:3";
+ String valueList = "-1,7,0,1,2,3,4,5,6";
+ String parseScript =
"(?s)(?s)(?s)(?s)(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\s\\+\\d{4})\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s([\\s\\S]*)";
+ String logData = "";
String ip = "127.0.0.1";
Long currentStamp = Instant.now().toEpochMilli();
- Integer parserType =
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
- LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, "");
+ Integer parserType =
LogParserFactory.LogParserEnum.REGEX_PARSE.getCode();
+ LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName,
keyOrderList);
Map<String, Object> parse = customParse.parse(logData, ip, 1l,
currentStamp, "");
System.out.println(parse);
stopwatch.stop();
log.info("cost time:{}", stopwatch.elapsed().toMillis());
}
+ @Test
+ public void test3() {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ //String keyList =
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,time:1,log_level:1,thread_name:1,log_name:1,trace_id:1,user_login_name:1,marker:1,tailId:3,spaceId:3,storeId:3,deploySpace:3";
+ String keyList =
"timestamp:date,mqtopic:keyword,mqtag:keyword,logstore:keyword,logsource:keyword,message:text,tail:keyword,logip:keyword,linenumber:long,filename:keyword,time:keyword,log_level:keyword,thread_name:keyword,log_name:keyword,trace_id:keyword,user_login_name:keyword,marker:keyword,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword";
+ String keyOrderList =
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,time:1,log_level:1,thread_name:1,log_name:1,trace_id:1,user_login_name:1,marker:1,tailId:3,spaceId:3,storeId:3,deploySpace:3";
+ String valueList = "-1,7,0,1,2,3,4,5,6";
+ String parseScript =
"(?s)(?s)(?s)(?s)(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\s\\+\\d{4})\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s([\\s\\S]*)";
+ String logData = "";
+ String ip = "127.0.0.1";
+ Long currentStamp = Instant.now().toEpochMilli();
+ Integer parserType =
LogParserFactory.LogParserEnum.REGEX_PARSE.getCode();
+ LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName,
keyOrderList);
+ Map<String, Object> parse = customParse.parse(logData, ip, 1l,
currentStamp, "");
+ System.out.println(parse);
+
+ System.out.println(customParse.getTimestampFromString("2023-08-25
10:46:09.239", currentStamp));
+ stopwatch.stop();
+ log.info("cost time:{}", stopwatch.elapsed().toMillis());
+ }
+
@Test
public void parseSimpleTest() {
Integer parserType =
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
index bfa67e57..1236d7ee 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
@@ -390,8 +390,8 @@ public class LogTailServiceImpl extends BaseService
implements LogTailService {
@Override
public Result<Void> updateMilogLogTail(LogTailParam param) {
- if (null == param.getId()) {
- return Result.failParam("id can not be null");
+ if (null == param || null == param.getId()) {
+ return Result.failParam("parameter error,param:" + param);
}
MilogLogTailDo ret = milogLogtailDao.queryById(param.getId());
if (ret == null) {
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
index 0f50f493..08e3d0c8 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
@@ -23,9 +23,6 @@ import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.xiaomi.mone.es.EsProcessor;
-import org.apache.ozhera.log.common.Config;
-import org.apache.ozhera.log.model.StorageInfo;
-import org.apache.ozhera.log.stream.job.compensate.MqMessageDTO;
import com.xiaomi.youpin.docean.anno.Service;
import com.xiaomi.youpin.docean.plugin.es.EsProcessorConf;
import com.xiaomi.youpin.docean.plugin.es.EsService;
@@ -34,6 +31,9 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.ozhera.log.common.Config;
+import org.apache.ozhera.log.model.StorageInfo;
+import org.apache.ozhera.log.stream.job.compensate.MqMessageDTO;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
@@ -201,7 +201,7 @@ public class EsPlugin {
, x.getVersion()
, failure.getCause().getMessage()
);
- log.error("esInfo:{},Bulk executionId:[{}] has
error messages:{}", GSON.toJson(esInfo), executionId, msg);
+ log.error("esInfo:{},Bulk executionId:[{}] has
error messages:{}", GSON.toJson(esInfo), executionId, msg, failure.getCause());
count.incrementAndGet();
}
});
@@ -230,7 +230,7 @@ public class EsPlugin {
, clazz.getSimpleName()
, clazz.getTypeName()
, clazz.getCanonicalName()
- , failure.getMessage());
+ , failure);
sendMessageToTopic(request, esInfo, onFailedConsumer);
}
}));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]