This is an automated email from the ASF dual-hosted git repository.

dingtao pushed a commit to branch intelligence
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/intelligence by this push:
     new ffb00aa2 feat: offer HeraLogApiService service (#528)
ffb00aa2 is described below

commit ffb00aa23eaea68e1bd272c0e3de45c1b657ec68
Author: wtt <[email protected]>
AuthorDate: Mon Jan 20 14:58:47 2025 +0800

    feat: offer HeraLogApiService service (#528)
    
    * feat: add HeraLogApiService
    
    * feat: offer HeraLogApiService service
    
    * refactor: optimize the log query code and add comments
    
    * refactor: added query support for Doris storage type logs
---
 .../ozhera/log/api/model/dto/LogFilterOptions.java |  43 ++++
 .../ozhera/log/api/model/dto/LogUrlParam.java      |  40 +++
 .../ozhera/log/api/service/HeraLogApiService.java  |  39 +++
 .../service/impl/HeraLogApiServiceImpl.java        | 279 +++++++++++++++++++++
 4 files changed, 401 insertions(+)

diff --git 
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/dto/LogFilterOptions.java
 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/dto/LogFilterOptions.java
new file mode 100644
index 00000000..d28209bf
--- /dev/null
+++ 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/dto/LogFilterOptions.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.api.model.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * @author wtt
+ * @version 1.0
+ * @description
+ * @date 2024/3/6 16:15
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class LogFilterOptions implements Serializable {
+    private Long projectId;
+    private Long envId;
+    private String traceId;
+    private String level;
+    private String startTime;
+    private String endTime;
+}
diff --git 
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/dto/LogUrlParam.java
 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/dto/LogUrlParam.java
new file mode 100644
index 00000000..157892f2
--- /dev/null
+++ 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/dto/LogUrlParam.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.api.model.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * @author wtt
+ * @version 1.0
+ * @description
+ * @date 2024/3/6 16:15
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class LogUrlParam implements Serializable {
+    private Long projectId;
+    private Long envId;
+    private String traceId;
+}
diff --git 
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/service/HeraLogApiService.java
 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/service/HeraLogApiService.java
new file mode 100644
index 00000000..eccbcf6a
--- /dev/null
+++ 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/service/HeraLogApiService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.api.service;
+
+import org.apache.ozhera.log.api.model.dto.LogFilterOptions;
+import org.apache.ozhera.log.api.model.dto.LogUrlParam;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author wtt
+ * @version 1.0
+ * @description
+ * @date 2024/3/6 16:12
+ */
+public interface HeraLogApiService {
+
+    List<String> queryLogUrl(LogUrlParam logUrlParam);
+
+    List<Map<String, Object>> queryLogData(LogFilterOptions filterOptions);
+
+}
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/HeraLogApiServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/HeraLogApiServiceImpl.java
new file mode 100644
index 00000000..a5019c7e
--- /dev/null
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/HeraLogApiServiceImpl.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.manager.service.impl;
+
+import com.xiaomi.youpin.docean.Ioc;
+import com.xiaomi.youpin.docean.plugin.config.anno.Value;
+import com.xiaomi.youpin.docean.plugin.dubbo.anno.Service;
+import com.xiaomi.youpin.docean.plugin.es.EsService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.api.enums.LogStorageTypeEnum;
+import org.apache.ozhera.log.api.model.dto.LogFilterOptions;
+import org.apache.ozhera.log.api.model.dto.LogUrlParam;
+import org.apache.ozhera.log.api.service.HeraLogApiService;
+import org.apache.ozhera.log.common.Constant;
+import org.apache.ozhera.log.manager.dao.MilogLogTailDao;
+import org.apache.ozhera.log.manager.dao.MilogLogstoreDao;
+import org.apache.ozhera.log.manager.domain.EsCluster;
+import org.apache.ozhera.log.manager.model.Pair;
+import org.apache.ozhera.log.manager.model.pojo.MilogEsClusterDO;
+import org.apache.ozhera.log.manager.model.pojo.MilogLogStoreDO;
+import org.apache.ozhera.log.manager.model.pojo.MilogLogTailDo;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+
+import javax.annotation.Resource;
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Clock;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.ozhera.log.manager.user.MoneUserDetailService.GSON;
+
+
+/**
+ * @author wtt
+ * @version 1.0
+ * @description
+ * @date 2024/3/6 16:17
+ */
+@Slf4j
+@Service(interfaceClass = HeraLogApiService.class, group = "$dubbo.group", 
timeout = 10000)
+public class HeraLogApiServiceImpl implements HeraLogApiService {
+
+    private static final int QUERY_LIMIT = 20;
+    private static final String TIMESTAMP_FIELD = "timestamp";
+    private static final String LOG_TIME_FIELD = "log_time";
+    private static final String STORE_ID_FIELD = "storeId";
+    private static final String LEVEL_FIELD = "level";
+    private static final String TRACE_ID_FIELD = "traceId";
+
+    @Resource
+    private MilogLogTailDao milogLogTailDao;
+
+    @Resource
+    private MilogLogstoreDao milogLogstoreDao;
+
+    @Resource
+    private EsCluster esCluster;
+
+    @Value(value = "$hera.url")
+    private String heraUrl;
+
+    @Override
+    public List<String> queryLogUrl(LogUrlParam logUrlParam) {
+        List<String> urlList = new ArrayList<>();
+
+        List<MilogLogTailDo> logTailDos = 
milogLogTailDao.queryByAppId(logUrlParam.getProjectId());
+        if (CollectionUtils.isEmpty(logTailDos)) {
+            return urlList;
+        }
+        List<MilogLogTailDo> filteredLogTailDos = logTailDos.stream()
+                .filter(tailDo -> logUrlParam.getEnvId() == null || 
tailDo.getEnvId().equals(logUrlParam.getEnvId()))
+                .toList();
+
+        long curTimestamp = Clock.systemUTC().instant().toEpochMilli();
+        long fiveMinutesInMillis = TimeUnit.MINUTES.toMillis(5);
+
+        List<Pair<Long, Long>> pairList = filteredLogTailDos.stream()
+                .map(tail -> Pair.of(tail.getSpaceId(), tail.getStoreId()))
+                .distinct()
+                .toList();
+
+        String timeParam = buildTimeParam(curTimestamp, fiveMinutesInMillis);
+
+        for (Pair<Long, Long> pair : pairList) {
+            try {
+                String commonParam = buildCommonParam(pair, 
logUrlParam.getTraceId());
+
+                urlList.add(buildUrl(commonParam, timeParam));
+            } catch (Exception e) {
+                log.info("queryAccessLogList build data error,tail:{}", 
GSON.toJson(pair), e);
+            }
+        }
+
+        return urlList;
+    }
+
+    @Override
+    public List<Map<String, Object>> queryLogData(LogFilterOptions 
filterOptions) {
+        try {
+            List<MilogLogTailDo> milogLogTailDos = 
milogLogTailDao.queryByAppAndEnv(filterOptions.getProjectId(), 
filterOptions.getEnvId());
+            if (CollectionUtils.isEmpty(milogLogTailDos)) {
+                log.warn("No log tails found for projectId={}, envId={}", 
filterOptions.getProjectId(), filterOptions.getEnvId());
+                return Collections.emptyList();
+            }
+
+            MilogLogTailDo milogLogTailDo = 
milogLogTailDos.get(milogLogTailDos.size() - 1);
+            MilogLogStoreDO logStoreDO = 
milogLogstoreDao.queryById(milogLogTailDo.getStoreId());
+            MilogEsClusterDO cluster = 
esCluster.getById(logStoreDO.getEsClusterId());
+
+            LogStorageTypeEnum storageType = 
LogStorageTypeEnum.queryByName(cluster.getLogStorageType());
+            if (storageType == LogStorageTypeEnum.ELASTICSEARCH) {
+                return queryFromElasticsearch(filterOptions, logStoreDO);
+            } else if (storageType == LogStorageTypeEnum.DORIS) {
+                return queryFromDoris(filterOptions, logStoreDO);
+            } else {
+                log.error("unsupported log storage type: {}", storageType);
+                return Collections.emptyList();
+            }
+        } catch (Exception e) {
+            log.error("failed to query log data", e);
+            return Collections.emptyList();
+        }
+    }
+
+    private List<Map<String, Object>> queryFromElasticsearch(LogFilterOptions 
filterOptions, MilogLogStoreDO logStoreDO) throws IOException {
+        EsService esService = 
esCluster.getEsService(logStoreDO.getEsClusterId());
+        SearchSourceBuilder builder = buildSearchSourceBuilder(filterOptions, 
logStoreDO);
+        SearchRequest searchRequest = new 
SearchRequest(logStoreDO.getEsIndex()).source(builder);
+        SearchResponse searchResponse = esService.search(searchRequest);
+        return extractLogDataFromResponse(searchResponse);
+    }
+
+    private List<Map<String, Object>> queryFromDoris(LogFilterOptions 
filterOptions, MilogLogStoreDO logStoreDO) {
+        DataSource dataSource = 
Ioc.ins().getBean(Constant.LOG_STORAGE_SERV_BEAN_PRE + 
logStoreDO.getEsClusterId());
+        if (dataSource == null) {
+            log.error("DataSource not found for clusterId={}", 
logStoreDO.getEsClusterId());
+            return Collections.emptyList();
+        }
+
+        StringBuilder sqlBuilder = new StringBuilder("SELECT * FROM " + 
logStoreDO.getEsIndex() + " WHERE ")
+                .append("project_id = ? AND ")
+                .append("env_id = ? AND ")
+                .append(LOG_TIME_FIELD + " >= ? AND ")
+                .append(LOG_TIME_FIELD + " <= ?");
+
+        if (StringUtils.isNotBlank(filterOptions.getTraceId())) {
+            sqlBuilder.append(" AND " + TRACE_ID_FIELD + " = ?");
+        }
+        if (StringUtils.isNotBlank(filterOptions.getLevel())) {
+            sqlBuilder.append(" AND " + LEVEL_FIELD + " = ?");
+        }
+
+        sqlBuilder.append(" ORDER BY " + LOG_TIME_FIELD + " DESC LIMIT " + 
QUERY_LIMIT);
+
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sqlBuilder.toString())) {
+
+            setPreparedStatementParameters(preparedStatement, filterOptions);
+            return executeDorisQuery(preparedStatement);
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to execute Doris query", e);
+        }
+    }
+
+    private void setPreparedStatementParameters(PreparedStatement 
preparedStatement, LogFilterOptions filterOptions) throws SQLException {
+        preparedStatement.setLong(1, filterOptions.getProjectId());
+        preparedStatement.setLong(2, filterOptions.getEnvId());
+        preparedStatement.setString(3, filterOptions.getStartTime());
+        preparedStatement.setString(4, filterOptions.getEndTime());
+
+        int paramIndex = 5;
+        if (StringUtils.isNotBlank(filterOptions.getTraceId())) {
+            preparedStatement.setString(paramIndex++, 
filterOptions.getTraceId());
+        }
+        if (StringUtils.isNotBlank(filterOptions.getLevel())) {
+            preparedStatement.setString(paramIndex, filterOptions.getLevel());
+        }
+    }
+
+    private List<Map<String, Object>> executeDorisQuery(PreparedStatement 
preparedStatement) throws SQLException {
+        List<Map<String, Object>> logs = new ArrayList<>();
+        try (ResultSet resultSet = preparedStatement.executeQuery()) {
+            java.sql.ResultSetMetaData metaData = resultSet.getMetaData();
+            int columnCount = metaData.getColumnCount();
+            while (resultSet.next()) {
+                Map<String, Object> logEntry = new HashMap<>();
+                for (int i = 1; i <= columnCount; i++) {
+                    String columnName = metaData.getColumnName(i);
+                    Object columnValue = resultSet.getObject(i);
+                    logEntry.put(columnName, columnValue);
+                }
+                logs.add(logEntry);
+            }
+        }
+        return logs;
+    }
+
+    private SearchSourceBuilder buildSearchSourceBuilder(LogFilterOptions 
filterOptions, MilogLogStoreDO logStoreDO) {
+        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
+                .filter(QueryBuilders.rangeQuery("timestamp")
+                        .from(filterOptions.getStartTime())
+                        .to(filterOptions.getEndTime()))
+                .filter(QueryBuilders.termQuery("storeId", 
logStoreDO.getId()));
+
+        if (StringUtils.isNotBlank(filterOptions.getLevel())) {
+            boolQueryBuilder.filter(QueryBuilders.matchPhraseQuery("level", 
filterOptions.getLevel()));
+        }
+
+        if (StringUtils.isNotBlank(filterOptions.getTraceId())) {
+            boolQueryBuilder.filter(QueryBuilders.matchPhraseQuery("traceId", 
filterOptions.getTraceId()));
+        }
+
+        return new SearchSourceBuilder().query(boolQueryBuilder)
+                .sort("timestamp", SortOrder.DESC)
+                .from(0)
+                .size(20)
+                .timeout(TimeValue.timeValueMinutes(2L));
+    }
+
+    private List<Map<String, Object>> 
extractLogDataFromResponse(SearchResponse searchResponse) {
+        return Arrays.stream(searchResponse.getHits().getHits())
+                .map(SearchHit::getSourceAsMap)
+                .collect(Collectors.toList());
+    }
+
+    private String buildCommonParam(Pair<Long, Long> pair, String keyword) {
+        if (StringUtils.isEmpty(keyword)) {
+            return String.format("spaceId=%s&storeId=%s&type=search",
+                    pair.getKey(), pair.getValue());
+        }
+        return String.format("spaceId=%s&storeId=%s&type=search&inputV=%s",
+                pair.getKey(), pair.getValue(), keyword);
+    }
+
+    private String buildTimeParam(long curTimestamp, long fiveMinutesInMillis) 
{
+        long startTime = curTimestamp - fiveMinutesInMillis;
+        long endTime = curTimestamp + fiveMinutesInMillis;
+        return String.format("&startTime=%s&endTime=%s", startTime, endTime);
+    }
+
+    private String buildUrl(String commonParam, String timeParam) {
+        return new StringBuilder(heraUrl)
+                .append("/project-milog/user/space-tree?")
+                .append(commonParam)
+                .append(timeParam)
+                .toString();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to