This is an automated email from the ASF dual-hosted git repository.
zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new 36867a64 refactor: refactor channelService creation logic (#562)
36867a64 is described below
commit 36867a6434a7e55348201be6d8a7bbb8a0bedcaa
Author: wtt <[email protected]>
AuthorDate: Thu Mar 13 19:11:33 2025 +0800
refactor: refactor channelService creation logic (#562)
* refactor: refactor channelService creation logic
* refactor: optimize log query and export functions
---
.../log/agent/channel/AbstractChannelService.java | 6 +-
.../ozhera/log/agent/channel/ChannelEngine.java | 29 +-----
.../log/agent/channel/ChannelServiceFactory.java | 113 +++++++++++++++++++++
.../log/agent/channel/ChannelServiceImpl.java | 41 ++++----
.../org/apache/ozhera/log/utils/ConfigUtils.java | 12 ++-
.../manager/service/impl/EsDataServiceImpl.java | 39 ++-----
6 files changed, 161 insertions(+), 79 deletions(-)
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
index bf011466..fb2633aa 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
@@ -51,7 +51,7 @@ public abstract class AbstractChannelService implements
ChannelService {
public String instanceId = UUID.randomUUID().toString();
- private final int FILTER_LOG_PREFIX_LENGTH =
Integer.parseInt(getConfigValue("filter_log_level_prefix_length"));
+ private final int FILTER_LOG_PREFIX_LENGTH =
Integer.parseInt(getConfigValue("filter_log_level_prefix_length", "60"));
@Override
public String instanceId() {
@@ -206,11 +206,11 @@ public abstract class AbstractChannelService implements
ChannelService {
fileProgress.setCtTime(ct);
}
- public Boolean shouldFilterLogs(List<String> logLevelList, String line){
+ public Boolean shouldFilterLogs(List<String> logLevelList, String line) {
if (logLevelList == null || logLevelList.isEmpty()) {
return false;
}
- if (line.length() > FILTER_LOG_PREFIX_LENGTH){
+ if (line.length() > FILTER_LOG_PREFIX_LENGTH) {
line = line.substring(0, FILTER_LOG_PREFIX_LENGTH);
}
String lineLowerCase = line.toLowerCase();
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
index c6312672..41563db9 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
@@ -18,7 +18,6 @@
*/
package org.apache.ozhera.log.agent.channel;
-import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Pair;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -64,9 +63,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.ozhera.log.common.Constant.GSON;
-import static org.apache.ozhera.log.common.Constant.SYMBOL_COMMA;
-import static org.apache.ozhera.log.common.PathUtils.PATH_WILDCARD;
-import static org.apache.ozhera.log.common.PathUtils.SEPARATOR;
/**
* @author shanwb
@@ -89,6 +85,8 @@ public class ChannelEngine {
*/
private FileMonitorListener fileMonitorListener;
+ private ChannelServiceFactory channelServiceFactory;
+
private String memoryBasePath;
private Gson gson = GSON;
@@ -114,6 +112,8 @@ public class ChannelEngine {
agentMemoryService = new AgentMemoryServiceImpl(memoryBasePath);
fileMonitorListener = new DefaultFileMonitorListener();
+ channelServiceFactory = new
ChannelServiceFactory(agentMemoryService, memoryBasePath);
+
log.info("query channelDefineList:{}",
gson.toJson(channelDefineList));
channelServiceList = channelDefineList.stream()
.filter(channelDefine ->
filterCollStart(channelDefine.getAppName()))
@@ -268,32 +268,13 @@ public class ChannelEngine {
if (null == agentMemoryService) {
agentMemoryService = new
AgentMemoryServiceImpl(org.apache.ozhera.log.common.Config.ins().get("agent.memory.path",
AgentMemoryService.DEFAULT_BASE_PATH));
}
- return createChannelService(channelDefine, exporter, filterChain);
+ return channelServiceFactory.createChannelService(channelDefine,
exporter, filterChain);
} catch (Throwable e) {
log.error("channelServiceTrans exception, channelDefine:{}",
gson.toJson(channelDefine), e);
}
return null;
}
- private ChannelService createChannelService(ChannelDefine channelDefine,
MsgExporter exporter, FilterChain filterChain) {
- Input input = channelDefine.getInput();
- String logType = channelDefine.getInput().getType();
- boolean containsWildcard =
isWildcardAllowedForLogType(input.getLogPattern(), logType);
- if (containsWildcard || FileUtil.exist(input.getLogPattern())) {
- return new ChannelServiceImpl(exporter, agentMemoryService,
channelDefine, filterChain);
- } else {
- return new WildcardChannelServiceImpl(exporter,
agentMemoryService, channelDefine, filterChain, memoryBasePath);
- }
- }
-
- private boolean isWildcardAllowedForLogType(String logPattern, String
logType) {
- if (LogTypeEnum.OPENTELEMETRY == LogTypeEnum.name2enum(logType)) {
- return true;
- }
- return Arrays.stream(logPattern.split(SYMBOL_COMMA))
- .noneMatch(data -> StringUtils.substringAfterLast(data,
SEPARATOR).contains(PATH_WILDCARD));
- }
-
private void preCheckChannelDefine(ChannelDefine channelDefine) {
Preconditions.checkArgument(null != channelDefine, "channelDefine can
not be null");
Preconditions.checkArgument(null != channelDefine.getInput(),
"channelDefine.input can not be null");
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
new file mode 100644
index 00000000..20dc82b2
--- /dev/null
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.agent.channel;
+
+import cn.hutool.core.io.FileUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.agent.channel.memory.AgentMemoryService;
+import org.apache.ozhera.log.agent.export.MsgExporter;
+import org.apache.ozhera.log.agent.filter.FilterChain;
+import org.apache.ozhera.log.agent.input.Input;
+import org.apache.ozhera.log.api.enums.LogTypeEnum;
+
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import static org.apache.ozhera.log.common.Constant.SYMBOL_COMMA;
+import static org.apache.ozhera.log.common.PathUtils.PATH_WILDCARD;
+import static org.apache.ozhera.log.common.PathUtils.SEPARATOR;
+
+/**
+ * @author wtt
+ * @version 1.0
+ * @description
+ * @date 2025/3/13 10:00
+ */
+public class ChannelServiceFactory {
+
+ private final AgentMemoryService agentMemoryService;
+ private final String memoryBasePath;
+
+ private static final Pattern regexCharsPattern =
Pattern.compile("[*+?^${}()|\\[\\]\\\\]");
+
+ public ChannelServiceFactory(AgentMemoryService agentMemoryService, String
memoryBasePath) {
+ this.agentMemoryService = agentMemoryService;
+ this.memoryBasePath = memoryBasePath;
+ }
+
+ public ChannelService createChannelService(ChannelDefine channelDefine,
+ MsgExporter exporter,
FilterChain filterChain) {
+ if (channelDefine == null || channelDefine.getInput() == null) {
+ throw new IllegalArgumentException("Channel define or input cannot
be null");
+ }
+
+ Input input = channelDefine.getInput();
+ String logType = input.getType();
+ String logPattern = input.getLogPattern();
+
+ if (LogTypeEnum.OPENTELEMETRY == LogTypeEnum.name2enum(logType) ||
FileUtil.exist(logPattern)) {
+ return createStandardChannelService(exporter, channelDefine,
filterChain);
+ }
+
+ if (shouldUseWildcardService(logPattern)) {
+ return createWildcardChannelService(exporter, channelDefine,
filterChain);
+ }
+
+ return createStandardChannelService(exporter, channelDefine,
filterChain);
+ }
+
+ private boolean shouldUseWildcardService(String logPattern) {
+ if (StringUtils.isEmpty(logPattern)) {
+ return false;
+ }
+ return isRegexPattern(logPattern) ||
+ containsWildcard(logPattern);
+ }
+
+ private boolean containsWildcard(String logPattern) {
+ return Arrays.stream(logPattern.split(SYMBOL_COMMA))
+ .map(String::trim)
+ .filter(StringUtils::isNotEmpty)
+ .anyMatch(this::hasWildcardInPath);
+ }
+
+ private boolean hasWildcardInPath(String path) {
+ String fileName = StringUtils.substringAfterLast(path, SEPARATOR);
+ return !StringUtils.isEmpty(fileName) &&
fileName.contains(PATH_WILDCARD);
+ }
+
+ private boolean isRegexPattern(String logPattern) {
+ if (StringUtils.isEmpty(logPattern)) {
+ return false;
+ }
+ return regexCharsPattern.matcher(logPattern).find();
+ }
+
+ private ChannelService createStandardChannelService(MsgExporter exporter,
+ ChannelDefine
channelDefine, FilterChain filterChain) {
+ return new ChannelServiceImpl(exporter, agentMemoryService,
+ channelDefine, filterChain);
+ }
+
+ private ChannelService createWildcardChannelService(MsgExporter exporter,
+ ChannelDefine
channelDefine, FilterChain filterChain) {
+ return new WildcardChannelServiceImpl(exporter, agentMemoryService,
+ channelDefine, filterChain, memoryBasePath);
+ }
+}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 3d976ddd..6e3a1ece 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -308,26 +308,27 @@ public class ChannelServiceImpl extends
AbstractChannelService {
return;
}
long ct = System.currentTimeMillis();
- readResult.get().getLines().stream().forEach(l -> {
- String logType = channelDefine.getInput().getType();
- LogTypeEnum logTypeEnum = LogTypeEnum.name2enum(logType);
- // Multi-line application log type and opentelemetry type are
used to determine the exception stack
- if (LogTypeEnum.APP_LOG_MULTI == logTypeEnum ||
LogTypeEnum.OPENTELEMETRY == logTypeEnum) {
- l = mLog.append2(l);
- } else {
- // tail single line mode
- }
- if (null != l) {
- try {
- fileColLock.lock();
- wrapDataToSend(l, readResult, pattern, patternCode,
ip, ct);
- } finally {
- fileColLock.unlock();
- }
- } else {
- log.debug("biz log channelId:{}, not new line:{}",
channelDefine.getChannelId(), l);
- }
- });
+ readResult.get().getLines()
+ .stream().filter(l ->
!shouldFilterLogs(channelDefine.getFilterLogLevelList(), l)).forEach(l -> {
+ String logType = channelDefine.getInput().getType();
+ LogTypeEnum logTypeEnum =
LogTypeEnum.name2enum(logType);
+ // Multi-line application log type and opentelemetry
type are used to determine the exception stack
+ if (LogTypeEnum.APP_LOG_MULTI == logTypeEnum ||
LogTypeEnum.OPENTELEMETRY == logTypeEnum) {
+ l = mLog.append2(l);
+ } else {
+ // tail single line mode
+ }
+ if (null != l) {
+ try {
+ fileColLock.lock();
+ wrapDataToSend(l, readResult, pattern,
patternCode, ip, ct);
+ } finally {
+ fileColLock.unlock();
+ }
+ } else {
+ log.debug("biz log channelId:{}, not new line:{}",
channelDefine.getChannelId(), l);
+ }
+ });
});
resultMap.put(pattern, Pair.of(mLog, readResult));
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/ConfigUtils.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/ConfigUtils.java
index 0e98b3b7..3ae8e504 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/ConfigUtils.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/ConfigUtils.java
@@ -19,9 +19,9 @@
package org.apache.ozhera.log.utils;
import cn.hutool.core.util.HashUtil;
-import org.apache.ozhera.log.common.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.common.Config;
/**
* @author: wtt
@@ -36,23 +36,27 @@ public class ConfigUtils {
}
public static String getConfigValue(String propertyKey) {
+ return getConfigValue(propertyKey, "");
+ }
+
+ public static String getConfigValue(String propertyKey, String
defaultValue) {
String propertyValue = "";
propertyValue = System.getenv(propertyKey);
try {
- if(StringUtils.isBlank(propertyValue)) {
+ if (StringUtils.isBlank(propertyValue)) {
propertyValue = System.getProperty(propertyKey);
}
} catch (Exception e) {
log.error("get system param error,propertyKey:{}", propertyKey, e);
}
if (StringUtils.isBlank(propertyValue)) {
- propertyValue = Config.ins().get(propertyKey, "");
+ propertyValue = Config.ins().get(propertyKey, defaultValue);
}
return propertyValue;
}
/**
- * The data data maps to a value between 0 and max
+ * The data data maps to a value between 0 and max
*
* @param data
* @param max
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
index 8c5a07cf..5575d1c7 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
@@ -70,10 +70,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
@@ -227,12 +224,16 @@ public class EsDataServiceImpl implements EsDataService,
LogDataService, EsDataB
}
// Build query parameters
BoolQueryBuilder boolQueryBuilder =
searchLog.getQueryBuilder(logQuery,
getKeyColonPrefix(milogLogstoreDO.getKeyList()));
- SearchSourceBuilder builder = assembleSearchSourceBuilder(logQuery,
keyList, boolQueryBuilder);
+ // use constant_score to wrap boolquerybuilder
+ ConstantScoreQueryBuilder constantScoreQueryBuilder =
QueryBuilders.constantScoreQuery(boolQueryBuilder);
+ SearchSourceBuilder builder = assembleSearchSourceBuilder(logQuery,
keyList, constantScoreQueryBuilder);
+ // disable score calculation
+ builder.trackScores(false);
SearchRequest searchRequest = new SearchRequest(new
String[]{esIndexName}, builder);
- boolean isTimestampMissing = isTimestampMissingInQuery(searchRequest);
- if (isTimestampMissing) {
+ boolean isTimestampMissing = containsTimestampField(boolQueryBuilder);
+ if (!isTimestampMissing) {
log.warn("searchRequest is missing timestamp field, add timestamp
field,logQuery:{}, operator:{}", GSON.toJson(logQuery), operator);
}
@@ -256,25 +257,6 @@ public class EsDataServiceImpl implements EsDataService,
LogDataService, EsDataB
return Result.success(dto);
}
- public static boolean isTimestampMissingInQuery(SearchRequest
searchRequest) {
- if (searchRequest == null) {
- return true;
- }
-
- SearchSourceBuilder sourceBuilder = searchRequest.source();
- if (sourceBuilder == null || sourceBuilder.query() == null) {
- return true;
- }
-
- // Check whether the query condition contains the timestamp field
- if (sourceBuilder.query() instanceof BoolQueryBuilder) {
- BoolQueryBuilder boolQuery = (BoolQueryBuilder)
sourceBuilder.query();
- return !containsTimestampField(boolQuery);
- }
-
- return true;
- }
-
/**
* Check if the timestamp field is included in the BoolQueryBuilder
*
@@ -412,7 +394,7 @@ public class EsDataServiceImpl implements EsDataService,
LogDataService, EsDataB
}
- private SearchSourceBuilder assembleSearchSourceBuilder(LogQuery logQuery,
List<String> keyList, BoolQueryBuilder boolQueryBuilder) {
+ private SearchSourceBuilder assembleSearchSourceBuilder(LogQuery logQuery,
List<String> keyList, ConstantScoreQueryBuilder boolQueryBuilder) {
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(boolQueryBuilder);
@@ -753,8 +735,9 @@ public class EsDataServiceImpl implements EsDataService,
LogDataService, EsDataB
public void logExport(LogQuery logQuery) throws Exception {
+ log.info("Log query, logExport, logQuery:{}", GSON.toJson(logQuery));
// Generate Excel
- int maxLogNum = 10000;
+ int maxLogNum = 5000;
logQuery.setPageSize(maxLogNum);
Result<LogDTO> logDTOResult = this.logQuery(logQuery);
List<Map<String, Object>> exportData = logDTOResult.getCode() !=
CommonError.Success.getCode() || logDTOResult.getData().getLogDataDTOList() ==
null || logDTOResult.getData().getLogDataDTOList().isEmpty() ? null :
logDTOResult.getData().getLogDataDTOList().stream().map(logDataDto ->
ExportUtils.SplitTooLongContent(logDataDto)).collect(Collectors.toList());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]