This is an automated email from the ASF dual-hosted git repository.

zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 36867a64 refactor: refactor channelService creation logic (#562)
36867a64 is described below

commit 36867a6434a7e55348201be6d8a7bbb8a0bedcaa
Author: wtt <[email protected]>
AuthorDate: Thu Mar 13 19:11:33 2025 +0800

    refactor: refactor channelService creation logic (#562)
    
    * refactor: refactor channelService creation logic
    
    * refactor: optimize log query and export functions
---
 .../log/agent/channel/AbstractChannelService.java  |   6 +-
 .../ozhera/log/agent/channel/ChannelEngine.java    |  29 +-----
 .../log/agent/channel/ChannelServiceFactory.java   | 113 +++++++++++++++++++++
 .../log/agent/channel/ChannelServiceImpl.java      |  41 ++++----
 .../org/apache/ozhera/log/utils/ConfigUtils.java   |  12 ++-
 .../manager/service/impl/EsDataServiceImpl.java    |  39 ++-----
 6 files changed, 161 insertions(+), 79 deletions(-)

diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
index bf011466..fb2633aa 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
@@ -51,7 +51,7 @@ public abstract class AbstractChannelService implements 
ChannelService {
 
     public String instanceId = UUID.randomUUID().toString();
 
-    private final int FILTER_LOG_PREFIX_LENGTH = 
Integer.parseInt(getConfigValue("filter_log_level_prefix_length"));
+    private final int FILTER_LOG_PREFIX_LENGTH = 
Integer.parseInt(getConfigValue("filter_log_level_prefix_length", "60"));
 
     @Override
     public String instanceId() {
@@ -206,11 +206,11 @@ public abstract class AbstractChannelService implements 
ChannelService {
         fileProgress.setCtTime(ct);
     }
 
-    public Boolean shouldFilterLogs(List<String> logLevelList, String line){
+    public Boolean shouldFilterLogs(List<String> logLevelList, String line) {
         if (logLevelList == null || logLevelList.isEmpty()) {
             return false;
         }
-        if (line.length() > FILTER_LOG_PREFIX_LENGTH){
+        if (line.length() > FILTER_LOG_PREFIX_LENGTH) {
             line = line.substring(0, FILTER_LOG_PREFIX_LENGTH);
         }
         String lineLowerCase = line.toLowerCase();
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
index c6312672..41563db9 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
@@ -18,7 +18,6 @@
  */
 package org.apache.ozhera.log.agent.channel;
 
-import cn.hutool.core.io.FileUtil;
 import cn.hutool.core.lang.Pair;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -64,9 +63,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.ozhera.log.common.Constant.GSON;
-import static org.apache.ozhera.log.common.Constant.SYMBOL_COMMA;
-import static org.apache.ozhera.log.common.PathUtils.PATH_WILDCARD;
-import static org.apache.ozhera.log.common.PathUtils.SEPARATOR;
 
 /**
  * @author shanwb
@@ -89,6 +85,8 @@ public class ChannelEngine {
      */
     private FileMonitorListener fileMonitorListener;
 
+    private ChannelServiceFactory channelServiceFactory;
+
     private String memoryBasePath;
 
     private Gson gson = GSON;
@@ -114,6 +112,8 @@ public class ChannelEngine {
             agentMemoryService = new AgentMemoryServiceImpl(memoryBasePath);
             fileMonitorListener = new DefaultFileMonitorListener();
 
+            channelServiceFactory = new 
ChannelServiceFactory(agentMemoryService, memoryBasePath);
+
             log.info("query channelDefineList:{}", 
gson.toJson(channelDefineList));
             channelServiceList = channelDefineList.stream()
                     .filter(channelDefine -> 
filterCollStart(channelDefine.getAppName()))
@@ -268,32 +268,13 @@ public class ChannelEngine {
             if (null == agentMemoryService) {
                 agentMemoryService = new 
AgentMemoryServiceImpl(org.apache.ozhera.log.common.Config.ins().get("agent.memory.path",
 AgentMemoryService.DEFAULT_BASE_PATH));
             }
-            return createChannelService(channelDefine, exporter, filterChain);
+            return channelServiceFactory.createChannelService(channelDefine, 
exporter, filterChain);
         } catch (Throwable e) {
             log.error("channelServiceTrans exception, channelDefine:{}", 
gson.toJson(channelDefine), e);
         }
         return null;
     }
 
-    private ChannelService createChannelService(ChannelDefine channelDefine, 
MsgExporter exporter, FilterChain filterChain) {
-        Input input = channelDefine.getInput();
-        String logType = channelDefine.getInput().getType();
-        boolean containsWildcard = 
isWildcardAllowedForLogType(input.getLogPattern(), logType);
-        if (containsWildcard || FileUtil.exist(input.getLogPattern())) {
-            return new ChannelServiceImpl(exporter, agentMemoryService, 
channelDefine, filterChain);
-        } else {
-            return new WildcardChannelServiceImpl(exporter, 
agentMemoryService, channelDefine, filterChain, memoryBasePath);
-        }
-    }
-
-    private boolean isWildcardAllowedForLogType(String logPattern, String 
logType) {
-        if (LogTypeEnum.OPENTELEMETRY == LogTypeEnum.name2enum(logType)) {
-            return true;
-        }
-        return Arrays.stream(logPattern.split(SYMBOL_COMMA))
-                .noneMatch(data -> StringUtils.substringAfterLast(data, 
SEPARATOR).contains(PATH_WILDCARD));
-    }
-
     private void preCheckChannelDefine(ChannelDefine channelDefine) {
         Preconditions.checkArgument(null != channelDefine, "channelDefine can 
not be null");
         Preconditions.checkArgument(null != channelDefine.getInput(), 
"channelDefine.input can not be null");
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
new file mode 100644
index 00000000..20dc82b2
--- /dev/null
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.agent.channel;
+
+import cn.hutool.core.io.FileUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.agent.channel.memory.AgentMemoryService;
+import org.apache.ozhera.log.agent.export.MsgExporter;
+import org.apache.ozhera.log.agent.filter.FilterChain;
+import org.apache.ozhera.log.agent.input.Input;
+import org.apache.ozhera.log.api.enums.LogTypeEnum;
+
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import static org.apache.ozhera.log.common.Constant.SYMBOL_COMMA;
+import static org.apache.ozhera.log.common.PathUtils.PATH_WILDCARD;
+import static org.apache.ozhera.log.common.PathUtils.SEPARATOR;
+
+/**
+ * @author wtt
+ * @version 1.0
+ * @description
+ * @date 2025/3/13 10:00
+ */
+public class ChannelServiceFactory {
+
+    private final AgentMemoryService agentMemoryService;
+    private final String memoryBasePath;
+
+    private static final Pattern regexCharsPattern = 
Pattern.compile("[*+?^${}()|\\[\\]\\\\]");
+
+    public ChannelServiceFactory(AgentMemoryService agentMemoryService, String 
memoryBasePath) {
+        this.agentMemoryService = agentMemoryService;
+        this.memoryBasePath = memoryBasePath;
+    }
+
+    public ChannelService createChannelService(ChannelDefine channelDefine,
+                                               MsgExporter exporter, 
FilterChain filterChain) {
+        if (channelDefine == null || channelDefine.getInput() == null) {
+            throw new IllegalArgumentException("Channel define or input cannot 
be null");
+        }
+
+        Input input = channelDefine.getInput();
+        String logType = input.getType();
+        String logPattern = input.getLogPattern();
+
+        if (LogTypeEnum.OPENTELEMETRY == LogTypeEnum.name2enum(logType) || 
FileUtil.exist(logPattern)) {
+            return createStandardChannelService(exporter, channelDefine, 
filterChain);
+        }
+
+        if (shouldUseWildcardService(logPattern)) {
+            return createWildcardChannelService(exporter, channelDefine, 
filterChain);
+        }
+
+        return createStandardChannelService(exporter, channelDefine, 
filterChain);
+    }
+
+    private boolean shouldUseWildcardService(String logPattern) {
+        if (StringUtils.isEmpty(logPattern)) {
+            return false;
+        }
+        return isRegexPattern(logPattern) ||
+                containsWildcard(logPattern);
+    }
+
+    private boolean containsWildcard(String logPattern) {
+        return Arrays.stream(logPattern.split(SYMBOL_COMMA))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .anyMatch(this::hasWildcardInPath);
+    }
+
+    private boolean hasWildcardInPath(String path) {
+        String fileName = StringUtils.substringAfterLast(path, SEPARATOR);
+        return !StringUtils.isEmpty(fileName) && 
fileName.contains(PATH_WILDCARD);
+    }
+
+    private boolean isRegexPattern(String logPattern) {
+        if (StringUtils.isEmpty(logPattern)) {
+            return false;
+        }
+        return regexCharsPattern.matcher(logPattern).find();
+    }
+
+    private ChannelService createStandardChannelService(MsgExporter exporter,
+                                                        ChannelDefine 
channelDefine, FilterChain filterChain) {
+        return new ChannelServiceImpl(exporter, agentMemoryService,
+                channelDefine, filterChain);
+    }
+
+    private ChannelService createWildcardChannelService(MsgExporter exporter,
+                                                        ChannelDefine 
channelDefine, FilterChain filterChain) {
+        return new WildcardChannelServiceImpl(exporter, agentMemoryService,
+                channelDefine, filterChain, memoryBasePath);
+    }
+}
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 3d976ddd..6e3a1ece 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -308,26 +308,27 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                 return;
             }
             long ct = System.currentTimeMillis();
-            readResult.get().getLines().stream().forEach(l -> {
-                String logType = channelDefine.getInput().getType();
-                LogTypeEnum logTypeEnum = LogTypeEnum.name2enum(logType);
-                // Multi-line application log type and opentelemetry type are 
used to determine the exception stack
-                if (LogTypeEnum.APP_LOG_MULTI == logTypeEnum || 
LogTypeEnum.OPENTELEMETRY == logTypeEnum) {
-                    l = mLog.append2(l);
-                } else {
-                    // tail single line mode
-                }
-                if (null != l) {
-                    try {
-                        fileColLock.lock();
-                        wrapDataToSend(l, readResult, pattern, patternCode, 
ip, ct);
-                    } finally {
-                        fileColLock.unlock();
-                    }
-                } else {
-                    log.debug("biz log channelId:{}, not new line:{}", 
channelDefine.getChannelId(), l);
-                }
-            });
+            readResult.get().getLines()
+                    .stream().filter(l -> 
!shouldFilterLogs(channelDefine.getFilterLogLevelList(), l)).forEach(l -> {
+                        String logType = channelDefine.getInput().getType();
+                        LogTypeEnum logTypeEnum = 
LogTypeEnum.name2enum(logType);
+                        // Multi-line application log type and opentelemetry 
type are used to determine the exception stack
+                        if (LogTypeEnum.APP_LOG_MULTI == logTypeEnum || 
LogTypeEnum.OPENTELEMETRY == logTypeEnum) {
+                            l = mLog.append2(l);
+                        } else {
+                            // tail single line mode
+                        }
+                        if (null != l) {
+                            try {
+                                fileColLock.lock();
+                                wrapDataToSend(l, readResult, pattern, 
patternCode, ip, ct);
+                            } finally {
+                                fileColLock.unlock();
+                            }
+                        } else {
+                            log.debug("biz log channelId:{}, not new line:{}", 
channelDefine.getChannelId(), l);
+                        }
+                    });
 
         });
         resultMap.put(pattern, Pair.of(mLog, readResult));
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/ConfigUtils.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/ConfigUtils.java
index 0e98b3b7..3ae8e504 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/ConfigUtils.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/ConfigUtils.java
@@ -19,9 +19,9 @@
 package org.apache.ozhera.log.utils;
 
 import cn.hutool.core.util.HashUtil;
-import org.apache.ozhera.log.common.Config;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.common.Config;
 
 /**
  * @author: wtt
@@ -36,23 +36,27 @@ public class ConfigUtils {
     }
 
     public static String getConfigValue(String propertyKey) {
+        return getConfigValue(propertyKey, "");
+    }
+
+    public static String getConfigValue(String propertyKey, String 
defaultValue) {
         String propertyValue = "";
         propertyValue = System.getenv(propertyKey);
         try {
-            if(StringUtils.isBlank(propertyValue)) {
+            if (StringUtils.isBlank(propertyValue)) {
                 propertyValue = System.getProperty(propertyKey);
             }
         } catch (Exception e) {
             log.error("get system param error,propertyKey:{}", propertyKey, e);
         }
         if (StringUtils.isBlank(propertyValue)) {
-            propertyValue = Config.ins().get(propertyKey, "");
+            propertyValue = Config.ins().get(propertyKey, defaultValue);
         }
         return propertyValue;
     }
 
     /**
-     *  The data data maps to a value between 0 and max
+     * The data data maps to a value between 0 and max
      *
      * @param data
      * @param max
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
index 8c5a07cf..5575d1c7 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
@@ -70,10 +70,7 @@ import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.index.query.*;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
@@ -227,12 +224,16 @@ public class EsDataServiceImpl implements EsDataService, 
LogDataService, EsDataB
         }
         // Build query parameters
         BoolQueryBuilder boolQueryBuilder = 
searchLog.getQueryBuilder(logQuery, 
getKeyColonPrefix(milogLogstoreDO.getKeyList()));
-        SearchSourceBuilder builder = assembleSearchSourceBuilder(logQuery, 
keyList, boolQueryBuilder);
+        // use constant_score to wrap boolquerybuilder
+        ConstantScoreQueryBuilder constantScoreQueryBuilder = 
QueryBuilders.constantScoreQuery(boolQueryBuilder);
+        SearchSourceBuilder builder = assembleSearchSourceBuilder(logQuery, 
keyList, constantScoreQueryBuilder);
+        // disable score calculation
+        builder.trackScores(false);
 
         SearchRequest searchRequest = new SearchRequest(new 
String[]{esIndexName}, builder);
 
-        boolean isTimestampMissing = isTimestampMissingInQuery(searchRequest);
-        if (isTimestampMissing) {
+        boolean isTimestampMissing = containsTimestampField(boolQueryBuilder);
+        if (!isTimestampMissing) {
             log.warn("searchRequest is missing timestamp field, add timestamp 
field,logQuery:{}, operator:{}", GSON.toJson(logQuery), operator);
         }
 
@@ -256,25 +257,6 @@ public class EsDataServiceImpl implements EsDataService, 
LogDataService, EsDataB
         return Result.success(dto);
     }
 
-    public static boolean isTimestampMissingInQuery(SearchRequest 
searchRequest) {
-        if (searchRequest == null) {
-            return true;
-        }
-
-        SearchSourceBuilder sourceBuilder = searchRequest.source();
-        if (sourceBuilder == null || sourceBuilder.query() == null) {
-            return true;
-        }
-
-        // Check whether the query condition contains the timestamp field
-        if (sourceBuilder.query() instanceof BoolQueryBuilder) {
-            BoolQueryBuilder boolQuery = (BoolQueryBuilder) 
sourceBuilder.query();
-            return !containsTimestampField(boolQuery);
-        }
-
-        return true;
-    }
-
     /**
      * Check if the timestamp field is included in the BoolQueryBuilder
      *
@@ -412,7 +394,7 @@ public class EsDataServiceImpl implements EsDataService, 
LogDataService, EsDataB
     }
 
 
-    private SearchSourceBuilder assembleSearchSourceBuilder(LogQuery logQuery, 
List<String> keyList, BoolQueryBuilder boolQueryBuilder) {
+    private SearchSourceBuilder assembleSearchSourceBuilder(LogQuery logQuery, 
List<String> keyList, ConstantScoreQueryBuilder boolQueryBuilder) {
         SearchSourceBuilder builder = new SearchSourceBuilder();
         builder.query(boolQueryBuilder);
 
@@ -753,8 +735,9 @@ public class EsDataServiceImpl implements EsDataService, 
LogDataService, EsDataB
 
 
     public void logExport(LogQuery logQuery) throws Exception {
+        log.info("Log query, logExport, logQuery:{}", GSON.toJson(logQuery));
         // Generate Excel
-        int maxLogNum = 10000;
+        int maxLogNum = 5000;
         logQuery.setPageSize(maxLogNum);
         Result<LogDTO> logDTOResult = this.logQuery(logQuery);
         List<Map<String, Object>> exportData = logDTOResult.getCode() != 
CommonError.Success.getCode() || logDTOResult.getData().getLogDataDTOList() == 
null || logDTOResult.getData().getLogDataDTOList().isEmpty() ? null : 
logDTOResult.getData().getLogDataDTOList().stream().map(logDataDto -> 
ExportUtils.SplitTooLongContent(logDataDto)).collect(Collectors.toList());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to