This is an automated email from the ASF dual-hosted git repository.
zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new 4404f158 feat: optimize log parsing and statistics functions (#548)
4404f158 is described below
commit 4404f1587bf5e26f384989d368d4fb885fd616a4
Author: wtt <[email protected]>
AuthorDate: Tue Feb 18 16:42:17 2025 +0800
feat: optimize log parsing and statistics functions (#548)
* feat(log): optimize log parsing and statistics functions
* refactor: optimize log parsing and configuration services
---
.../org/apache/ozhera/log/model/SinkConfig.java | 4 +
.../apache/ozhera/log/parse/AbstractLogParser.java | 5 ++
.../org/apache/ozhera/log/parse/LogParserData.java | 4 +
.../apache/ozhera/log/parse/LogParserFactory.java | 7 +-
.../ozhera/log/parse/SeparatorLogParser.java | 82 +++++++++++++-------
.../org/apache/ozhera/log/utils/IndexUtils.java | 2 +-
.../apache/ozhera/log/common/IndexUtilsTest.java | 4 +-
.../ozhera/log/common/LogParserFactoryTest.java | 2 +-
.../apache/ozhera/log/common/LogParserTest.java | 31 ++++++--
.../ozhera/log/manager/job/TailLogCountJob.java | 12 ++-
.../service/impl/AgentConfigServiceImpl.java | 9 ++-
.../manager/service/impl/LogCountServiceImpl.java | 88 ++++++++++++++++------
.../manager/service/impl/LogTailServiceImpl.java | 20 ++---
.../service/impl/MilogConfigNacosServiceImpl.java | 11 +--
.../main/resources/mapper/MilogLogstailMapper.xml | 2 +
.../ozhera/log/stream/config/ConfigManager.java | 39 +++++-----
.../apache/ozhera/log/stream/job/JobManager.java | 1 +
.../ozhera/log/stream/job/SinkJobConfig.java | 9 ++-
.../impl/DefaultStreamCommonExtension.java | 15 +++-
.../job/extension/kafka/KafkaSinkJobProvider.java | 6 +-
.../rocketmq/RocketMqSinkJobProvider.java | 4 +-
.../apache/ozhera/log/stream/TestSomething.java | 8 +-
22 files changed, 240 insertions(+), 125 deletions(-)
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
index d04e5031..ce991a0e 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
@@ -35,6 +35,9 @@ public class SinkConfig {
* timestamp is required
*/
private String keyList;
+
+ private String keyOrderList;
+
/**
* key:logtailId
*/
@@ -56,6 +59,7 @@ public class SinkConfig {
this.logstoreId = sinkConfig.getLogstoreId();
this.logstoreName = sinkConfig.getLogstoreName();
this.keyList = sinkConfig.getKeyList();
+ this.keyOrderList = sinkConfig.getKeyOrderList();
this.esIndex = sinkConfig.getEsIndex();
this.esInfo = sinkConfig.getEsInfo();
}
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
index b500173a..ebc260f6 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
@@ -143,6 +143,11 @@ public abstract class AbstractLogParser implements
LogParser {
}
}
+ protected String[] splitList(String keyList) {
+ return StringUtils.split(keyList, ",");
+ }
+
+
public abstract Map<String, Object> doParse(String logData, String ip,
Long lineNum, Long collectStamp, String fileName);
public abstract Map<String, Object> doParseSimple(String logData, Long
collectStamp);
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserData.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserData.java
index 211d3cea..6557d809 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserData.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserData.java
@@ -33,6 +33,10 @@ import lombok.*;
@Builder
public class LogParserData {
private String keyList;
+
+ //Use this field to parse content more easily
+ private String keyOrderList;
+
private String valueList;
private String parseScript;
private String topicName;
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
index 36edef25..6e4ea33f 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
@@ -34,13 +34,14 @@ public class LogParserFactory {
private LogParserFactory() {
}
- public static LogParser getLogParser(Integer parseType, String keyList,
String valueList, String parseScript) {
- return LogParserFactory.getLogParser(parseType, keyList, valueList,
parseScript, "", "", "", "");
+ public static LogParser getLogParser(Integer parseType, String keyList,
String valueList, String parseScript, String keyOrderList) {
+ return LogParserFactory.getLogParser(parseType, keyList, valueList,
parseScript, "", "", "", "", keyOrderList);
}
public static LogParser getLogParser(Integer parseType, String keyList,
String valueList, String parseScript,
- String topicName, String tailName,
String mqTag, String logStoreName) {
+ String topicName, String tailName,
String mqTag, String logStoreName, String keyOrderList) {
LogParserData logParserData = LogParserData.builder().keyList(keyList)
+ .keyOrderList(keyOrderList)
.valueList(valueList)
.parseScript(parseScript)
.topicName(topicName)
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
index 9c5e142d..44035e4d 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
@@ -20,13 +20,12 @@ package org.apache.ozhera.log.parse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.utils.IndexUtils;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* @author wtt
@@ -38,10 +37,29 @@ public class SeparatorLogParser extends AbstractLogParser {
private String[] keysAndTypes;
private String[] values;
+ private Map<String, Integer> valueMap;
+
public SeparatorLogParser(LogParserData parserData) {
super(parserData);
- keysAndTypes = StringUtils.split(parserData.getKeyList(), ",");
- values = StringUtils.split(parserData.getValueList(), ",");
+
+ keysAndTypes = splitList(parserData.getKeyList());
+ values = splitList(parserData.getValueList());
+
+ this.valueMap = createValueMap(parseKeyValueList(parserData));
+ }
+
+ private List<String> parseKeyValueList(LogParserData parserData) {
+ if (StringUtils.isNotBlank(parserData.getKeyOrderList())) {
+ String keyValueList =
IndexUtils.getKeyValueList(parserData.getKeyOrderList(),
parserData.getValueList());
+ return Arrays.asList(splitList(keyValueList));
+ }
+ return Collections.emptyList();
+ }
+
+ private Map<String, Integer> createValueMap(List<String> valueList) {
+ return IntStream.range(0, valueList.size())
+ .boxed()
+ .collect(Collectors.toMap(valueList::get,
Function.identity()));
}
@Override
@@ -60,7 +78,7 @@ public class SeparatorLogParser extends AbstractLogParser {
}
try {
- int maxLength = (int) Arrays.stream(values).filter(s ->
!s.equals("-1")).count();
+ int maxLength = (int) Arrays.stream(values).filter(s ->
!"-1".equals(s)).count();
List<String> logArray = parseLogData(logData, maxLength);
if (0 == maxLength) {
@@ -81,7 +99,7 @@ public class SeparatorLogParser extends AbstractLogParser {
*/
for (int i = 0; i < keysAndTypes.length; i++) {
String[] kTsplit = keysAndTypes[i].split(":");
- if (kTsplit.length != 2 || i >= values.length) {
+ if (kTsplit.length != 2 || i >= values.length && (null ==
valueMap || valueMap.isEmpty())) {
continue;
}
if (kTsplit[0].equals(esKeyMap_topic)) {
@@ -104,27 +122,35 @@ public class SeparatorLogParser extends AbstractLogParser
{
count++;
continue;
}
- String value = null;
- int num = -1;
- try {
- num = Integer.parseInt(values[i]);
- if (num == -1) {
- valueCount++;
- continue;
+ if (null != valueMap && !valueMap.isEmpty()) {
+ String key = kTsplit[0].trim();
+ if (valueMap.containsKey(key)) {
+ String value = logArray.get(valueMap.get(key));
+ ret.put(key, value);
}
- } catch (Exception e) {
- continue;
- }
- if (num < logArray.size() && num > -1) {
- value = logArray.get(num);
- } else {
- value = "";
- }
- if (kTsplit[0].equals(esKeyMap_timestamp) ||
kTsplit[1].equalsIgnoreCase(esKeyMap_Date)) {
- Long time = getTimestampFromString(value, collectStamp);
- ret.put(esKeyMap_timestamp, time);
} else {
- ret.put(kTsplit[0], StringUtils.isNotEmpty(value) ?
value.trim() : value);
+ String value = null;
+ int num = -1;
+ try {
+ num = Integer.parseInt(values[i]);
+ if (num == -1) {
+ valueCount++;
+ continue;
+ }
+ } catch (Exception e) {
+ continue;
+ }
+ if (num < logArray.size() && num > -1) {
+ value = logArray.get(num);
+ } else {
+ value = "";
+ }
+ if (kTsplit[0].equals(esKeyMap_timestamp) ||
kTsplit[1].equalsIgnoreCase(esKeyMap_Date)) {
+ Long time = getTimestampFromString(value,
collectStamp);
+ ret.put(esKeyMap_timestamp, time);
+ } else {
+ ret.put(kTsplit[0], StringUtils.isNotEmpty(value) ?
value.trim() : value);
+ }
}
}
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/IndexUtils.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/IndexUtils.java
index ac60bd10..98b263d3 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/IndexUtils.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/IndexUtils.java
@@ -43,7 +43,7 @@ public class IndexUtils {
Map<Integer, String> map = new HashMap<>();
for (int i = 0; i < valueS.length; i++) {
int orderValue = Integer.parseInt(valueS[i]);
- if (orderValue >= 0) {
+ if (orderValue >= 0 && i < keyListSlice.size()) {
map.put(orderValue, keyListSlice.get(i));
}
}
diff --git
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/IndexUtilsTest.java
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/IndexUtilsTest.java
index d20a3d67..c3b17031 100644
---
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/IndexUtilsTest.java
+++
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/IndexUtilsTest.java
@@ -33,8 +33,8 @@ public class IndexUtilsTest {
@Test
public void testGetKeyValueList(){
- String keyList =
"timestamp:1,level:1,traceId:1,threadName:1,className:1,message:1,line:1,methodName:1,logstore:3,logsource:3,mqtopic:3,mqtag:3,logip:3,tail:3";
- String valueList = "0,2,1,6,3,4,5,-1";
+ String keyList =
"timestamp:1,level:1,traceId:1,threadName:1,className:1,line:1,message:1,logstore:3,logsource:3,mqtopic:3,mqtag:3,logip:3,tail:3,linenumber:3";
+ String valueList = "0,1,2,3,4,5,6,7";
String keyValueList = IndexUtils.getKeyValueList(keyList, valueList);
log.info(keyValueList);
}
diff --git
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserFactoryTest.java
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserFactoryTest.java
index c7842868..843a2ebf 100644
---
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserFactoryTest.java
+++
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserFactoryTest.java
@@ -52,7 +52,7 @@ public class LogParserFactoryTest {
String mqTag = "fsfsd";
String logStoreName = "testet";
String message = "";
- LogParser logParser = LogParserFactory.getLogParser(parseType,
keyList, valueList, parseScript, topicName, tailName, mqTag, logStoreName);
+ LogParser logParser = LogParserFactory.getLogParser(parseType,
keyList, valueList, parseScript, topicName, tailName, mqTag, logStoreName,"");
LineMessage lineMessage = Constant.GSON.fromJson(message,
LineMessage.class);
if(lineMessage != null) {
diff --git
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
index 9026b535..d7499600 100644
---
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
+++
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
@@ -19,11 +19,12 @@
package org.apache.ozhera.log.common;
import com.google.common.base.Stopwatch;
-import org.apache.ozhera.log.parse.LogParser;
-import org.apache.ozhera.log.parse.LogParserFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateParser;
import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.ozhera.log.parse.LogParser;
+import org.apache.ozhera.log.parse.LogParserFactory;
+import org.junit.Assert;
import org.junit.Test;
import java.time.Instant;
@@ -53,7 +54,7 @@ public class LogParserTest {
String ip = "127.0.0.1";
Long currentStamp = Instant.now().toEpochMilli();
Integer parserType =
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
- LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName);
+ LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, "");
Map<String, Object> parse = customParse.parse(logData, ip, 1l,
currentStamp, "");
System.out.println(parse);
@@ -65,20 +66,34 @@ public class LogParserTest {
@Test
public void test2() {
Stopwatch stopwatch = Stopwatch.createStarted();
- String keyList =
"message:text,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
- String valueList = "0";
+ String keyList =
"timestamp:date,level:keyword,traceId:keyword,threadName:text,className:text,line:keyword,methodName:keyword,message:text,podName:keyword,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
+ String valueList = "0,1,2,3,4,5,-1,6,-1";
String parseScript = "|";
- String logData = "";
+ String logData = "2025-02-13 16:52:09,325|INFO";
String ip = "127.0.0.1";
Long currentStamp = Instant.now().toEpochMilli();
- Integer parserType =
LogParserFactory.LogParserEnum.CUSTOM_PARSE.getCode();
- LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName);
+ Integer parserType =
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
+ LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, "");
Map<String, Object> parse = customParse.parse(logData, ip, 1l,
currentStamp, "");
System.out.println(parse);
stopwatch.stop();
log.info("cost time:{}", stopwatch.elapsed().toMillis());
}
+ @Test
+ public void parseSimpleTest() {
+ Integer parserType =
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
+ String keyList =
"timestamp:date,mqtopic:keyword,mqtag:keyword,logstore:keyword,logsource:keyword,message:text,tail:keyword,logip:keyword,linenumber:long,filename:keyword,datetime:date,project_name:keyword,client_ip:keyword,level:keyword,log_id:keyword,url:keyword,up_ip:keyword,logger_line:keyword,thread:keyword,biz_id:keyword,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword";
+ String leyOrderList =
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,datetime:1,project_name:1,client_ip:1,level:1,log_id:1,url:1,up_ip:1,logger_line:1,thread:1,biz_id:1,tailId:3,spaceId:3,storeId:3,deploySpace:3";
+ String valueList = "-1,10,0,1,2,3,4,5,6,7,8,9";
+ String parseScript = "]|[";
+ String message = "2025-02-12T20:11:02.501+0800]";
+ Long collectStamp = Instant.now().toEpochMilli();
+ LogParser logParser = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, leyOrderList);
+ Map<String, Object> parseMsg = logParser.parseSimple(message,
collectStamp);
+ Assert.assertNotNull(parseMsg);
+ }
+
@Test
public void test() {
System.out.println("1.647590227174E12".length());
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/job/TailLogCountJob.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/job/TailLogCountJob.java
index ca77cc5a..1241cba2 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/job/TailLogCountJob.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/job/TailLogCountJob.java
@@ -19,11 +19,11 @@
package org.apache.ozhera.log.manager.job;
import cn.hutool.core.thread.ThreadUtil;
-import org.apache.ozhera.log.manager.service.impl.LogCountServiceImpl;
-import org.apache.ozhera.log.utils.DateUtils;
import com.xiaomi.youpin.docean.anno.Component;
import com.xiaomi.youpin.docean.plugin.config.anno.Value;
import lombok.extern.slf4j.Slf4j;
+import org.apache.ozhera.log.manager.service.impl.LogCountServiceImpl;
+import org.apache.ozhera.log.utils.DateUtils;
import javax.annotation.Resource;
import java.util.concurrent.Executors;
@@ -46,11 +46,9 @@ public class TailLogCountJob {
ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor(
ThreadUtil.newNamedThreadFactory("log-tailLogCountJob", false)
);
- long initDelay = 0;
- long intervalTime = 1;
- scheduledExecutor.scheduleAtFixedRate(() -> {
- statisticsAll();
- }, initDelay, intervalTime, TimeUnit.HOURS);
+ long initDelay = 2;
+ long intervalTime = 2 * 60;
+ scheduledExecutor.scheduleAtFixedRate(this::statisticsAll, initDelay,
intervalTime, TimeUnit.MINUTES);
}
public void statisticsAll() {
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/AgentConfigServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/AgentConfigServiceImpl.java
index d14e7857..12c247f1 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/AgentConfigServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/AgentConfigServiceImpl.java
@@ -18,13 +18,14 @@
*/
package org.apache.ozhera.log.manager.service.impl;
-import com.google.gson.Gson;
+import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
import org.apache.ozhera.log.api.model.meta.LogCollectMeta;
import org.apache.ozhera.log.api.service.AgentConfigService;
import org.apache.ozhera.log.manager.service.extension.agent.MilogAgentService;
import
org.apache.ozhera.log.manager.service.extension.agent.MilogAgentServiceFactory;
-import com.xiaomi.youpin.docean.anno.Service;
-import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.ozhera.log.common.Constant.GSON;
/**
* @author wtt
@@ -59,7 +60,7 @@ public class AgentConfigServiceImpl implements
AgentConfigService {
init();
}
LogCollectMeta logCollectMeta =
milogAgentService.getLogCollectMetaFromManager(ip);
- log.info("getLogCollectMetaFromManager end:{} {} {}", ip, new
Gson().toJson(logCollectMeta), (System.currentTimeMillis() - begin));
+ log.info("getLogCollectMetaFromManager end:{} {} {}", ip,
GSON.toJson(logCollectMeta), (System.currentTimeMillis() - begin));
return logCollectMeta;
} catch (Exception e) {
log.error("getLogCollectMetaFromManager error,ip:{}", ip, e);
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogCountServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogCountServiceImpl.java
index c828b2ba..b2df9182 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogCountServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogCountServiceImpl.java
@@ -18,6 +18,13 @@
*/
package org.apache.ozhera.log.manager.service.impl;
+import cn.hutool.core.collection.ListUtil;
+import com.google.common.collect.Lists;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.common.StringUtils;
+import com.xiaomi.youpin.docean.plugin.es.EsService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.ozhera.log.common.Result;
import org.apache.ozhera.log.manager.domain.EsCluster;
import org.apache.ozhera.log.manager.domain.Tpc;
@@ -30,13 +37,8 @@ import
org.apache.ozhera.log.manager.model.dto.SpaceCollectTrendDTO;
import org.apache.ozhera.log.manager.model.pojo.LogCountDO;
import org.apache.ozhera.log.manager.service.LogCountService;
import org.apache.ozhera.log.utils.DateUtils;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.common.StringUtils;
-import com.xiaomi.youpin.docean.plugin.es.EsService;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
import org.elasticsearch.client.core.CountRequest;
-import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -147,11 +149,25 @@ public class LogCountServiceImpl implements
LogCountService {
}
Long thisDayFirstMillisecond =
DateUtils.getThisDayFirstMillisecond(thisDay);
List<LogCountDO> logCountDOList = new ArrayList();
- LogCountDO logCountDO;
+ List<Map<String, Object>> tailList =
logtailMapper.getAllTailForCount();
+ Map<Long, List<String>> existIndexMap = new HashMap<>();
+ long res = 0;
+ if (tailList.size() > 2000) {
+ List<List<Map<String, Object>>> partitionList =
ListUtil.partition(tailList, 1000);
+ for (List<Map<String, Object>> mapList : partitionList) {
+ res += calculateAndInsertLogCounts(thisDay, mapList,
existIndexMap, thisDayFirstMillisecond, logCountDOList);
+ }
+ } else {
+ res = calculateAndInsertLogCounts(thisDay, tailList,
existIndexMap, thisDayFirstMillisecond, logCountDOList);
+ }
+ log.info("End of statistics log,Should be counted{},Total
statistics{}", tailList.size(), res);
+ }
+
+ private long calculateAndInsertLogCounts(String thisDay, List<Map<String,
Object>> tailList, Map<Long, List<String>> existIndexMap, Long
thisDayFirstMillisecond, List<LogCountDO> logCountDOList) {
+ String esIndex;
EsService esService;
Long total;
- String esIndex;
- List<Map<String, Object>> tailList =
logtailMapper.getAllTailForCount();
+ LogCountDO logCountDO;
for (Map<String, Object> tail : tailList) {
try {
esIndex = String.valueOf(tail.get("es_index"));
@@ -159,21 +175,24 @@ public class LogCountServiceImpl implements
LogCountService {
total = 0l;
esIndex = "";
} else {
- esService =
esCluster.getEsService(Long.parseLong(String.valueOf(tail.get("es_cluster_id"))));
+ long clusterId =
Long.parseLong(String.valueOf(tail.get("es_cluster_id")));
+ esService = esCluster.getEsService(clusterId);
if (esService == null) {
log.warn("Statistics logs warn,tail:{} the logs are
not counted and the ES client is not generated", tail);
continue;
}
- SearchSourceBuilder builder = new SearchSourceBuilder();
- BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
- boolQueryBuilder.filter(QueryBuilders.termQuery("tail",
tail.get("tail")));
-
boolQueryBuilder.filter(QueryBuilders.rangeQuery("timestamp").from(thisDayFirstMillisecond).to(thisDayFirstMillisecond
+ DateUtils.dayms - 1));
- builder.query(boolQueryBuilder);
- // statistics
- CountRequest countRequest = new CountRequest();
- countRequest.indices(esIndex);
- countRequest.source(builder);
- total = esService.count(countRequest);
+
+ existIndexMap.computeIfAbsent(clusterId, k -> new
ArrayList<>());
+ List<String> clusterIndexes = existIndexMap.get(clusterId);
+
+ if (!clusterIndexes.contains(esIndex)) {
+ if (existsTemplate(esService, esIndex)) {
+ clusterIndexes.add(esIndex);
+ } else {
+ continue;
+ }
+ }
+ total = countLogs(esService, esIndex, tail,
thisDayFirstMillisecond);
}
logCountDO = new LogCountDO();
logCountDO.setTailId(Long.parseLong(String.valueOf(tail.get("id"))));
@@ -189,7 +208,30 @@ public class LogCountServiceImpl implements
LogCountService {
if (CollectionUtils.isNotEmpty(logCountDOList)) {
res = logCountMapper.batchInsert(logCountDOList);
}
- log.info("End of statistics log,Should be counted{},Total
statistics{}", tailList.size(), res);
+ return res;
+ }
+
+ private Long countLogs(EsService esService, String esIndex, Map<String,
Object> tail, Long startTime) {
+ SearchSourceBuilder builder = new SearchSourceBuilder();
+ builder.query(QueryBuilders.boolQuery()
+ .filter(QueryBuilders.termQuery("tailId", tail.get("id")))
+
.filter(QueryBuilders.rangeQuery("timestamp").from(startTime).to(startTime +
DateUtils.dayms - 1))
+ );
+
+ CountRequest countRequest = new CountRequest(esIndex);
+ countRequest.source(builder);
+
+ try {
+ return esService.count(countRequest);
+ } catch (Exception e) {
+ log.error("Failed to count logs for index [{}] and tail [{}]",
esIndex, tail, e);
+ return 0L;
+ }
+ }
+
+ public boolean existsTemplate(EsService esService, String templateName)
throws IOException {
+ IndexTemplatesExistRequest request = new
IndexTemplatesExistRequest(templateName);
+ return esService.existsTemplate(request);
}
@Override
@@ -202,8 +244,8 @@ public class LogCountServiceImpl implements LogCountService
{
public void deleteHistoryLogCount() {
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DAY_OF_MONTH, -70);
- String deleteBeforedDay = new
SimpleDateFormat("yyyy-MM-dd").format(calendar.getTime());
- logCountMapper.deleteBeforeDay(deleteBeforedDay);
+ String deleteBeforeDay = new
SimpleDateFormat("yyyy-MM-dd").format(calendar.getTime());
+ logCountMapper.deleteBeforeDay(deleteBeforeDay);
}
@Override
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
index b28925e3..bfa67e57 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
@@ -20,6 +20,11 @@ package org.apache.ozhera.log.manager.service.impl;
import cn.hutool.core.lang.Assert;
import com.google.common.collect.Lists;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.plugin.config.anno.Value;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.app.api.response.AppBaseInfo;
import org.apache.ozhera.app.model.vo.HeraEnvIpVo;
import org.apache.ozhera.log.api.enums.*;
@@ -56,11 +61,6 @@ import
org.apache.ozhera.log.manager.service.nacos.impl.StreamConfigNacosProvide
import org.apache.ozhera.log.parse.LogParser;
import org.apache.ozhera.log.parse.LogParserFactory;
import org.apache.ozhera.log.utils.IndexUtils;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.plugin.config.anno.Value;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.nutz.lang.Strings;
import javax.annotation.Resource;
@@ -70,7 +70,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import static org.apache.ozhera.log.common.Constant.*;
-import static org.apache.ozhera.log.manager.common.Utils.getKeyValueList;
@Slf4j
@Service
@@ -329,7 +328,7 @@ public class LogTailServiceImpl extends BaseService
implements LogTailService {
MilogLogStoreDO logStore =
logStoreDao.queryById(tail.getStoreId());
if (null != logStore &&
StringUtils.isNotEmpty(logStore.getKeyList())) {
String keyList = logStore.getKeyList();
- String valueList = getKeyValueList(keyList,
tail.getValueList());
+ String valueList = IndexUtils.getKeyValueList(keyList,
tail.getValueList());
tail.setValueList(valueList);
}
// Handle filterconf to rateLimit
@@ -391,6 +390,9 @@ public class LogTailServiceImpl extends BaseService
implements LogTailService {
@Override
public Result<Void> updateMilogLogTail(LogTailParam param) {
+ if (null == param.getId()) {
+ return Result.failParam("id can not be null");
+ }
MilogLogTailDo ret = milogLogtailDao.queryById(param.getId());
if (ret == null) {
return new Result<>(CommonError.ParamsError.getCode(), "tail does
not exist");
@@ -804,7 +806,7 @@ public class LogTailServiceImpl extends BaseService
implements LogTailService {
String valueList =
IndexUtils.getNumberValueList(logstoreDO.getKeyList(),
mlogParseParam.getValueList());
Long currentStamp = Instant.now().toEpochMilli();
try {
- LogParser logParser =
LogParserFactory.getLogParser(mlogParseParam.getParseType(), keyList,
valueList, mlogParseParam.getParseScript());
+ LogParser logParser =
LogParserFactory.getLogParser(mlogParseParam.getParseType(), keyList,
valueList, mlogParseParam.getParseScript(), logstoreDO.getKeyList());
Map<String, Object> parseMsg =
logParser.parseSimple(mlogParseParam.getMsg(), currentStamp);
return Result.success(parseMsg);
} catch (Exception e) {
@@ -819,7 +821,7 @@ public class LogTailServiceImpl extends BaseService
implements LogTailService {
return Result.failParam(checkMsg);
}
try {
- LogParser logParser =
LogParserFactory.getLogParser(mlogParseParam.getParseType(), "", "",
mlogParseParam.getParseScript());
+ LogParser logParser =
LogParserFactory.getLogParser(mlogParseParam.getParseType(), "", "",
mlogParseParam.getParseScript(), "");
List<String> parsedLog =
logParser.parseLogData(mlogParseParam.getMsg());
return Result.success(parsedLog);
} catch (Exception e) {
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
index 964ff95b..354aaf55 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
@@ -25,6 +25,11 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.xiaomi.data.push.nacos.NacosNaming;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.plugin.config.anno.Value;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.map.HashedMap;
import org.apache.ozhera.log.api.enums.LogStorageTypeEnum;
import org.apache.ozhera.log.api.enums.MQSourceEnum;
import org.apache.ozhera.log.api.enums.OperateEnum;
@@ -49,11 +54,6 @@ import
org.apache.ozhera.log.manager.service.nacos.DynamicConfigPublisher;
import org.apache.ozhera.log.manager.service.nacos.FetchStreamMachineService;
import org.apache.ozhera.log.manager.service.nacos.MultipleNacosConfig;
import org.apache.ozhera.log.manager.service.nacos.impl.*;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.plugin.config.anno.Value;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.map.HashedMap;
import org.apache.ozhera.log.model.*;
import javax.annotation.Resource;
@@ -411,6 +411,7 @@ public class MilogConfigNacosServiceImpl implements
MilogConfigNacosService {
sinkConfig.setLogstoreName(logStoreDO.getLogstoreName());
sinkConfig.setKeyList(Utils.parse2KeyAndTypeList(logStoreDO.getKeyList(),
logStoreDO.getColumnTypeList()));
+ sinkConfig.setKeyOrderList(logStoreDO.getKeyList());
MilogEsClusterDO esInfo =
esCluster.getById(logStoreDO.getEsClusterId());
if (null != esInfo) {
sinkConfig.setEsIndex(logStoreDO.getEsIndex());
diff --git
a/ozhera-log/log-manager/src/main/resources/mapper/MilogLogstailMapper.xml
b/ozhera-log/log-manager/src/main/resources/mapper/MilogLogstailMapper.xml
index d5132e59..ab033294 100644
--- a/ozhera-log/log-manager/src/main/resources/mapper/MilogLogstailMapper.xml
+++ b/ozhera-log/log-manager/src/main/resources/mapper/MilogLogstailMapper.xml
@@ -33,5 +33,7 @@ http://www.apache.org/licenses/LICENSE-2.0
milog_logstore store
WHERE
tail.store_id = store.id
+ AND store.es_cluster_id IS NOT NULL
+ AND store.es_index IS NOT NULL
</select>
</mapper>
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
index 0e97d9b3..d73b5d3c 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
@@ -95,27 +95,30 @@ public class ConfigManager {
log.warn("[ConfigManager.initConfigManager] Nacos
configuration [dataID:{},group:{}]not found,exit initConfigManager",
spaceDataId, DEFAULT_GROUP_ID);
return;
}
- String uniqueMark = StreamUtils.getCurrentMachineMark();
+ String uniqueMarks = StreamUtils.getCurrentMachineMark();
Map<String, Map<Long, String>> config =
milogStreamConfig.getConfig();
- if (config.containsKey(uniqueMark)) {
- Map<Long, String> milogStreamDataMap = config.get(uniqueMark);
- log.info("[ConfigManager.initConfigManager]
uniqueMark:{},data:{}", uniqueMark, gson.toJson(milogStreamDataMap));
- for (Long spaceId : milogStreamDataMap.keySet()) {
- final String dataId = milogStreamDataMap.get(spaceId);
- // init spaceData config
- String logSpaceDataStr = nacosConfig.getConfigStr(dataId,
DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
- if (StringUtils.isNotEmpty(logSpaceDataStr)) {
- MilogSpaceData milogSpaceData =
GSON.fromJson(logSpaceDataStr, MilogSpaceData.class);
- if (null != milogSpaceData &&
!milogSpaceDataMap.containsKey(spaceId)) {
- MilogConfigListener configListener = new
MilogConfigListener(spaceId, dataId, DEFAULT_GROUP_ID, milogSpaceData,
nacosConfig);
- addListener(spaceId, configListener);
- milogSpaceDataMap.put(spaceId, milogSpaceData);
- log.info("[ConfigManager.initStream] added log
config listener for spaceId:{},dataId:{}", spaceId, dataId);
+ String[] split = uniqueMarks.split(SYMBOL_COMMA);
+ for (String uniqueMark : split) {
+ if (config.containsKey(uniqueMark)) {
+ Map<Long, String> milogStreamDataMap =
config.get(uniqueMark);
+ log.info("[ConfigManager.initConfigManager]
uniqueMark:{},data:{}", uniqueMark, gson.toJson(milogStreamDataMap));
+ for (Long spaceId : milogStreamDataMap.keySet()) {
+ final String dataId = milogStreamDataMap.get(spaceId);
+ // init spaceData config
+ String logSpaceDataStr =
nacosConfig.getConfigStr(dataId, DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
+ if (StringUtils.isNotEmpty(logSpaceDataStr)) {
+ MilogSpaceData milogSpaceData =
GSON.fromJson(logSpaceDataStr, MilogSpaceData.class);
+ if (null != milogSpaceData &&
!milogSpaceDataMap.containsKey(spaceId)) {
+ MilogConfigListener configListener = new
MilogConfigListener(spaceId, dataId, DEFAULT_GROUP_ID, milogSpaceData,
nacosConfig);
+ addListener(spaceId, configListener);
+ milogSpaceDataMap.put(spaceId, milogSpaceData);
+ log.info("[ConfigManager.initStream] added log
config listener for spaceId:{},dataId:{}", spaceId, dataId);
+ }
}
}
+ } else {
+ log.info("server start current not contain space
config,uniqueMark:{}", uniqueMark);
}
- } else {
- log.info("server start current not contain space
config,uniqueMark:{}", uniqueMark);
}
} catch (Exception e) {
log.error("[ConfigManager.initStream] initStream exec err", e);
@@ -167,7 +170,7 @@ public class ConfigManager {
private void processConfigForUniqueMark(String uniqueMark, Map<String,
Map<Long, String>> config) {
StreamCommonExtension extensionInstance =
getStreamCommonExtensionInstance();
if (!extensionInstance.checkUniqueMarkExists(uniqueMark, config)) {
- log.warn("listen dataID:{},groupId:{},but receive config is
empty", spaceDataId, DEFAULT_GROUP_ID);
+ log.warn("listen dataID:{},groupId:{},but receive config is
empty,uniqueMarks:{}", spaceDataId, DEFAULT_GROUP_ID, uniqueMark);
return;
}
Map<Long, String> dataIdMap =
extensionInstance.getConfigMapByUniqueMark(config, uniqueMark);
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
index 1a9227b1..3270852e 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
@@ -160,6 +160,7 @@ public class JobManager {
.tag(logtailConfig.getTag())
.index(sinkConfig.getEsIndex())
.keyList(sinkConfig.getKeyList())
+ .keyOrderList(sinkConfig.getKeyOrderList())
.valueList(logtailConfig.getValueList())
.parseScript(logtailConfig.getParseScript())
.logStoreName(sinkConfig.getLogstoreName())
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/SinkJobConfig.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/SinkJobConfig.java
index 476ccc55..51658765 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/SinkJobConfig.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/SinkJobConfig.java
@@ -18,13 +18,13 @@
*/
package org.apache.ozhera.log.stream.job;
-import org.apache.ozhera.log.model.StorageInfo;
-import org.apache.ozhera.log.stream.common.SinkJobEnum;
-import org.apache.ozhera.log.stream.sink.SinkChain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.ozhera.log.model.StorageInfo;
+import org.apache.ozhera.log.stream.common.SinkJobEnum;
+import org.apache.ozhera.log.stream.sink.SinkChain;
import java.util.List;
@@ -47,6 +47,9 @@ public class SinkJobConfig extends LogConfig {
private String tag;
private String index;
private String keyList;
+
+ private String keyOrderList;
+
private String valueList;
private String parseScript;
private String logStoreName;
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
index 07047a8e..c3232dbc 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/impl/DefaultStreamCommonExtension.java
@@ -18,14 +18,15 @@
*/
package org.apache.ozhera.log.stream.job.extension.impl;
+import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
import org.apache.ozhera.log.model.LogtailConfig;
import org.apache.ozhera.log.model.SinkConfig;
import org.apache.ozhera.log.stream.job.extension.StreamCommonExtension;
-import com.xiaomi.youpin.docean.anno.Service;
-import lombok.extern.slf4j.Slf4j;
import java.util.Map;
+import static org.apache.ozhera.log.common.Constant.SYMBOL_COMMA;
import static
org.apache.ozhera.log.stream.common.LogStreamConstants.DEFAULT_COMMON_STREAM_EXTENSION;
/**
@@ -42,8 +43,14 @@ public class DefaultStreamCommonExtension implements
StreamCommonExtension {
return data;
}
- public Boolean checkUniqueMarkExists(String uniqueMark, Map<String,
Map<Long, String>> config) {
- return config.containsKey(uniqueMark);
+ public Boolean checkUniqueMarkExists(String uniqueMarks, Map<String,
Map<Long, String>> config) {
+ String[] split = uniqueMarks.split(SYMBOL_COMMA);
+ for (String s : split) {
+ if (config.containsKey(s)) {
+ return true;
+ }
+ }
+ return false;
}
public Map<Long, String> getConfigMapByUniqueMark(Map<String, Map<Long,
String>> config, String uniqueMark) {
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/kafka/KafkaSinkJobProvider.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/kafka/KafkaSinkJobProvider.java
index aaea9c17..feb228e6 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/kafka/KafkaSinkJobProvider.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/kafka/KafkaSinkJobProvider.java
@@ -18,6 +18,8 @@
*/
package org.apache.ozhera.log.stream.job.extension.kafka;
+import com.xiaomi.youpin.docean.anno.Service;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.ozhera.log.parse.LogParser;
import org.apache.ozhera.log.parse.LogParserFactory;
import org.apache.ozhera.log.stream.common.LogStreamConstants;
@@ -29,8 +31,6 @@ import
org.apache.ozhera.log.stream.job.extension.MessageSenderFactory;
import org.apache.ozhera.log.stream.job.extension.SinkJob;
import org.apache.ozhera.log.stream.job.extension.SinkJobProvider;
import org.apache.ozhera.log.stream.sink.SinkChain;
-import com.xiaomi.youpin.docean.anno.Service;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* @author wtt
@@ -51,7 +51,7 @@ public class KafkaSinkJobProvider implements SinkJobProvider {
LogParser logParser = LogParserFactory.getLogParser(
sinkJobConfig.getParseType(), sinkJobConfig.getKeyList(),
sinkJobConfig.getValueList(),
sinkJobConfig.getParseScript(), sinkJobConfig.getTopic(),
sinkJobConfig.getTail(),
- sinkJobConfig.getTag(), sinkJobConfig.getLogStoreName());
+ sinkJobConfig.getTag(), sinkJobConfig.getLogStoreName(),
sinkJobConfig.getKeyOrderList());
LogDataTransfer dataTransfer = new LogDataTransfer(sinkChain,
logParser, messageSender, sinkJobConfig);
dataTransfer.setJobType(jobType);
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/rocketmq/RocketMqSinkJobProvider.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/rocketmq/RocketMqSinkJobProvider.java
index 268c4723..a02e706e 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/rocketmq/RocketMqSinkJobProvider.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/extension/rocketmq/RocketMqSinkJobProvider.java
@@ -18,6 +18,7 @@
*/
package org.apache.ozhera.log.stream.job.extension.rocketmq;
+import com.xiaomi.youpin.docean.anno.Service;
import org.apache.ozhera.log.parse.LogParser;
import org.apache.ozhera.log.parse.LogParserFactory;
import org.apache.ozhera.log.stream.common.LogStreamConstants;
@@ -29,7 +30,6 @@ import
org.apache.ozhera.log.stream.job.extension.MessageSenderFactory;
import org.apache.ozhera.log.stream.job.extension.SinkJob;
import org.apache.ozhera.log.stream.job.extension.SinkJobProvider;
import org.apache.ozhera.log.stream.sink.SinkChain;
-import com.xiaomi.youpin.docean.anno.Service;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
/**
@@ -49,7 +49,7 @@ public class RocketMqSinkJobProvider implements
SinkJobProvider {
LogParser logParser = LogParserFactory.getLogParser(
sinkJobConfig.getParseType(), sinkJobConfig.getKeyList(),
sinkJobConfig.getValueList(),
sinkJobConfig.getParseScript(), sinkJobConfig.getTopic(),
sinkJobConfig.getTail(),
- sinkJobConfig.getTag(), sinkJobConfig.getLogStoreName());
+ sinkJobConfig.getTag(), sinkJobConfig.getLogStoreName(),
sinkJobConfig.getKeyOrderList());
LogDataTransfer dataTransfer = new LogDataTransfer(sinkChain,
logParser, messageSender, sinkJobConfig);
dataTransfer.setJobType(jobType);
diff --git
a/ozhera-log/log-stream/src/test/java/org/apache/ozhera/log/stream/TestSomething.java
b/ozhera-log/log-stream/src/test/java/org/apache/ozhera/log/stream/TestSomething.java
index 090c42c2..f6f84962 100644
---
a/ozhera-log/log-stream/src/test/java/org/apache/ozhera/log/stream/TestSomething.java
+++
b/ozhera-log/log-stream/src/test/java/org/apache/ozhera/log/stream/TestSomething.java
@@ -19,11 +19,11 @@
package org.apache.ozhera.log.stream;
import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.log.api.model.msg.LineMessage;
import org.apache.ozhera.log.parse.LogParser;
import org.apache.ozhera.log.parse.LogParserFactory;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import java.util.HashMap;
@@ -92,7 +92,7 @@ public class TestSomething {
//RmqSinkJob rmqSinkJob = new RmqSinkJob();
Integer parserType =
LogParserFactory.LogParserEnum.CUSTOM_PARSE.getCode();
- LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName);
+ LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, "");
// rmqSinkJob.setLogParser(customParse);
@@ -105,6 +105,6 @@ public class TestSomething {
Map<String, Map<String, String>> testMap = new HashMap<>();
Map<String, String> test = testMap.computeIfAbsent("test", k -> new
HashMap<>());
test.put("sfsdf", "ersrser");
- log.info("result:{}",GSON.toJson(testMap));
+ log.info("result:{}", GSON.toJson(testMap));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]