This is an automated email from the ASF dual-hosted git repository.

gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 80a8747a refactor: refactoring log statistics function (#573)
80a8747a is described below

commit 80a8747ae4eab42299a3267375c7adf0a1f1a281
Author: wtt <[email protected]>
AuthorDate: Tue Apr 1 18:38:08 2025 +0800

    refactor: refactoring log statistics function (#573)
---
 .../manager/service/impl/LogCountServiceImpl.java  | 125 +++++++++------------
 .../manager/service/impl/LogTailServiceImpl.java   |  10 +-
 2 files changed, 58 insertions(+), 77 deletions(-)

diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogCountServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogCountServiceImpl.java
index b2df9182..20201366 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogCountServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogCountServiceImpl.java
@@ -19,7 +19,6 @@
 package org.apache.ozhera.log.manager.service.impl;
 
 import cn.hutool.core.collection.ListUtil;
-import com.google.common.collect.Lists;
 import com.xiaomi.youpin.docean.anno.Service;
 import com.xiaomi.youpin.docean.common.StringUtils;
 import com.xiaomi.youpin.docean.plugin.es.EsService;
@@ -38,7 +37,7 @@ import org.apache.ozhera.log.manager.model.pojo.LogCountDO;
 import org.apache.ozhera.log.manager.service.LogCountService;
 import org.apache.ozhera.log.utils.DateUtils;
 import org.elasticsearch.client.core.CountRequest;
-import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
+import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 
@@ -148,92 +147,70 @@ public class LogCountServiceImpl implements 
LogCountService {
             thisDay = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
         }
         Long thisDayFirstMillisecond = 
DateUtils.getThisDayFirstMillisecond(thisDay);
-        List<LogCountDO> logCountDOList = new ArrayList();
-        List<Map<String, Object>> tailList = 
logtailMapper.getAllTailForCount();
-        Map<Long, List<String>> existIndexMap = new HashMap<>();
+
         long res = 0;
-        if (tailList.size() > 2000) {
-            List<List<Map<String, Object>>> partitionList = 
ListUtil.partition(tailList, 1000);
-            for (List<Map<String, Object>> mapList : partitionList) {
-                res += calculateAndInsertLogCounts(thisDay, mapList, 
existIndexMap, thisDayFirstMillisecond, logCountDOList);
+        List<Map<String, Object>> tailList = 
logtailMapper.getAllTailForCount();
+        if (tailList.size() > 5000) {
+            //group up to 2000 items per group
+            List<List<Map<String, Object>>> partitionList = 
ListUtil.partition(tailList, 2000);
+            for (List<Map<String, Object>> partition : partitionList) {
+                res += calculateLogCount(thisDay, partition, 
thisDayFirstMillisecond, res);
             }
         } else {
-            res = calculateAndInsertLogCounts(thisDay, tailList, 
existIndexMap, thisDayFirstMillisecond, logCountDOList);
+            res = calculateLogCount(thisDay, tailList, 
thisDayFirstMillisecond, res);
         }
         log.info("End of statistics log,Should be counted{},Total 
statistics{}", tailList.size(), res);
     }
 
-    private long calculateAndInsertLogCounts(String thisDay, List<Map<String, 
Object>> tailList, Map<Long, List<String>> existIndexMap, Long 
thisDayFirstMillisecond, List<LogCountDO> logCountDOList) {
-        String esIndex;
-        EsService esService;
-        Long total;
-        LogCountDO logCountDO;
+    private long calculateLogCount(String thisDay, List<Map<String, Object>> 
tailList, Long thisDayFirstMillisecond, long res) {
+        List<LogCountDO> logCountDOList = new ArrayList<>();
         for (Map<String, Object> tail : tailList) {
-            try {
-                esIndex = String.valueOf(tail.get("es_index"));
-                if (StringUtils.isEmpty(esIndex) || tail.get("es_cluster_id") 
== null) {
-                    total = 0l;
-                    esIndex = "";
-                } else {
-                    long clusterId = 
Long.parseLong(String.valueOf(tail.get("es_cluster_id")));
-                    esService = esCluster.getEsService(clusterId);
-                    if (esService == null) {
-                        log.warn("Statistics logs warn,tail:{} the logs are 
not counted and the ES client is not generated", tail);
-                        continue;
-                    }
-
-                    existIndexMap.computeIfAbsent(clusterId, k -> new 
ArrayList<>());
-                    List<String> clusterIndexes = existIndexMap.get(clusterId);
-
-                    if (!clusterIndexes.contains(esIndex)) {
-                        if (existsTemplate(esService, esIndex)) {
-                            clusterIndexes.add(esIndex);
-                        } else {
-                            continue;
-                        }
-                    }
-                    total = countLogs(esService, esIndex, tail, 
thisDayFirstMillisecond);
-                }
-                logCountDO = new LogCountDO();
-                
logCountDO.setTailId(Long.parseLong(String.valueOf(tail.get("id"))));
-                logCountDO.setEsIndex(esIndex);
-                logCountDO.setNumber(total);
-                logCountDO.setDay(thisDay);
-                logCountDOList.add(logCountDO);
-            } catch (Exception e) {
-                log.error("collectLogCount error,thisDay:{}", thisDay, e);
-            }
+            collectLogCount(thisDay, tail, thisDayFirstMillisecond, 
logCountDOList);
         }
-        long res = 0;
         if (CollectionUtils.isNotEmpty(logCountDOList)) {
             res = logCountMapper.batchInsert(logCountDOList);
         }
         return res;
     }
 
-    private Long countLogs(EsService esService, String esIndex, Map<String, 
Object> tail, Long startTime) {
-        SearchSourceBuilder builder = new SearchSourceBuilder();
-        builder.query(QueryBuilders.boolQuery()
-                .filter(QueryBuilders.termQuery("tailId", tail.get("id")))
-                
.filter(QueryBuilders.rangeQuery("timestamp").from(startTime).to(startTime + 
DateUtils.dayms - 1))
-        );
-
-        CountRequest countRequest = new CountRequest(esIndex);
-        countRequest.source(builder);
-
+    private void collectLogCount(String thisDay, Map<String, Object> tail, 
Long thisDayFirstMillisecond, List<LogCountDO> logCountDOList) {
+        String esIndex;
+        LogCountDO logCountDO;
+        Long total;
+        EsService esService;
         try {
-            return esService.count(countRequest);
+            esIndex = String.valueOf(tail.get("es_index"));
+            if (StringUtils.isEmpty(esIndex) || tail.get("es_cluster_id") == 
null) {
+                total = 0l;
+                esIndex = "";
+            } else {
+                esService = 
esCluster.getEsService(Long.parseLong(String.valueOf(tail.get("es_cluster_id"))));
+                if (esService == null) {
+                    log.warn("Statistics logs warn,tail:{} the logs are not 
counted and the ES client is not generated", tail);
+                    return;
+                }
+                SearchSourceBuilder builder = new SearchSourceBuilder();
+                BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
+                boolQueryBuilder.filter(QueryBuilders.termQuery("tail", 
tail.get("tail")));
+                
boolQueryBuilder.filter(QueryBuilders.rangeQuery("timestamp").from(thisDayFirstMillisecond).to(thisDayFirstMillisecond
 + DateUtils.dayms - 1));
+                builder.query(boolQueryBuilder);
+                // statistics
+                CountRequest countRequest = new CountRequest();
+                countRequest.indices(esIndex);
+                countRequest.source(builder);
+                total = esService.count(countRequest);
+            }
+            logCountDO = new LogCountDO();
+            
logCountDO.setTailId(Long.parseLong(String.valueOf(tail.get("id"))));
+            logCountDO.setEsIndex(esIndex);
+            logCountDO.setNumber(total);
+            logCountDO.setDay(thisDay);
+            logCountDOList.add(logCountDO);
         } catch (Exception e) {
-            log.error("Failed to count logs for index [{}] and tail [{}]", 
esIndex, tail, e);
-            return 0L;
+            log.error("collectLogCount error,thisDay:{}", thisDay, e);
         }
     }
 
-    public boolean existsTemplate(EsService esService, String templateName) 
throws IOException {
-        IndexTemplatesExistRequest request = new 
IndexTemplatesExistRequest(templateName);
-        return esService.existsTemplate(request);
-    }
-
     @Override
     public boolean isLogtailCountDone(String day) {
         Long logTailCountDone = logCountMapper.isLogtailCountDone(day);
@@ -258,18 +235,18 @@ public class LogCountServiceImpl implements 
LogCountService {
         logtailCollectTrendMap.clear();
     }
 
-    private String getLogNumberFormat(long number) {
+    private static String getLogNumberFormat(long number) {
         NumberFormat format = NumberFormat.getInstance();
         format.setMaximumFractionDigits(2);
         format.setMinimumFractionDigits(2);
         if (number >= 100000000) {
-            return format.format((float) number / 100000000) + "hundred 
million";
+            return format.format((float) number / 100000000) + " hundred 
million";
         } else if (number >= 1000000) {
-            return format.format((float) number / 1000000) + "million";
+            return format.format((float) number / 1000000) + " million";
         } else if (number >= 10000) {
-            return format.format((float) number / 10000) + "ten thousand";
+            return format.format((float) number / 10000) + " ten thousand";
         } else {
-            return number + "strip";
+            return number + " strip";
         }
     }
 
@@ -333,4 +310,4 @@ public class LogCountServiceImpl implements LogCountService 
{
         return Result.success(spaceCollectTrendCache.get(spaceId));
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
index 79e4873a..0ac1c429 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/LogTailServiceImpl.java
@@ -432,7 +432,7 @@ public class LogTailServiceImpl extends BaseService 
implements LogTailService {
             defines.add(filterDefine);
         }
 
-        MilogLogTailDo milogLogtailDo = logTailParam2Do(param, logStoreDO, 
appBaseInfo);
+        MilogLogTailDo milogLogtailDo = convertLogTailParamToDo(ret, param, 
logStoreDO, appBaseInfo);
         wrapBaseCommon(milogLogtailDo, OperateEnum.UPDATE_OPERATE);
         boolean isSucceed = milogLogtailDao.update(milogLogtailDo);
 
@@ -640,6 +640,10 @@ public class LogTailServiceImpl extends BaseService 
implements LogTailService {
 
     private MilogLogTailDo logTailParam2Do(LogTailParam logTailParam, 
MilogLogStoreDO logStoreDO, AppBaseInfo appBaseInfo) {
         MilogLogTailDo milogLogtailDo = new MilogLogTailDo();
+        return convertLogTailParamToDo(milogLogtailDo, logTailParam, 
logStoreDO, appBaseInfo);
+    }
+
+    private MilogLogTailDo convertLogTailParamToDo(MilogLogTailDo 
milogLogtailDo, LogTailParam logTailParam, MilogLogStoreDO logStoreDO, 
AppBaseInfo appBaseInfo) {
         milogLogtailDo.setId(logTailParam.getId());
         milogLogtailDo.setTail(logTailParam.getTail());
         milogLogtailDo.setSpaceId(logTailParam.getSpaceId());
@@ -664,7 +668,7 @@ public class LogTailServiceImpl extends BaseService 
implements LogTailService {
         milogLogtailDo.setValueList(logTailParam.getValueList());
         FilterDefine filterDefine = 
FilterDefine.consRateLimitFilterDefine(logTailParam.getTailRate());
         if (filterDefine != null) {
-            milogLogtailDo.setFilter(Arrays.asList(filterDefine));
+            milogLogtailDo.setFilter(List.of(filterDefine));
         }
         tailExtensionService.logTailDoExtraFiled(milogLogtailDo, logStoreDO, 
logTailParam);
         milogLogtailDo.setDeployWay(logTailParam.getDeployWay());
@@ -922,4 +926,4 @@ public class LogTailServiceImpl extends BaseService 
implements LogTailService {
         }
         return Result.success(applyQueryVO(logTailDos.get(logTailDos.size() - 
1), null));
     }
-}
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to