This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 99f1b8d875 [INLONG-10034][Manager] Support calling the audit service 
interface to query audit information (#10037)
99f1b8d875 is described below

commit 99f1b8d8759487cb121f698336cb45c1b0c618cf
Author: fuweng11 <[email protected]>
AuthorDate: Mon Apr 22 18:28:02 2024 +0800

    [INLONG-10034][Manager] Support calling the audit service interface to 
query audit information (#10037)
---
 .../inlong/manager/pojo/audit/AuditResponse.java   |  50 ++++++
 .../manager/service/audit/AuditRunnable.java       | 185 +++++++++++++++++++++
 .../service/core/impl/AuditServiceImpl.java        |  92 +++-------
 .../src/main/resources/application-dev.properties  |   2 +
 .../src/main/resources/application-prod.properties |   2 +
 .../src/main/resources/application-test.properties |   2 +
 6 files changed, 265 insertions(+), 68 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditResponse.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditResponse.java
new file mode 100644
index 0000000000..280189878d
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditResponse.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.audit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * Audit response
+ */
+@Data
+public class AuditResponse {
+
+    private Boolean success;
+    private String errMsg;
+    private List<AuditItem> data;
+
+    @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class AuditItem {
+
+        private String logTs;
+        private String inlongGroupId;
+        private String inlongStreamId;
+        private String auditId;
+        private String auditTag;
+        private Long count;
+        private Long size;
+        private Long delay;
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditRunnable.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditRunnable.java
new file mode 100644
index 0000000000..aa74fed7e8
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditRunnable.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.audit;
+
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.audit.AuditRequest;
+import org.apache.inlong.manager.pojo.audit.AuditResponse;
+import org.apache.inlong.manager.pojo.audit.AuditVO;
+
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpMethod;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Audit query Runnable
+ */
+public class AuditRunnable implements Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditRunnable.class);
+
+    private static final String SECOND_FORMAT = "yyyy-MM-dd HH:mm:ss";
+    private static final String DAY_FORMAT = "yyyy-MM-dd";
+    private static final DateTimeFormatter SECOND_DATE_FORMATTER = 
DateTimeFormat.forPattern(SECOND_FORMAT);
+    private static final DateTimeFormatter DAY_DATE_FORMATTER = 
DateTimeFormat.forPattern(DAY_FORMAT);
+
+    private AuditRequest request;
+    private String auditId;
+    private String auditName;
+    private List<AuditVO> auditVOList;
+    private CountDownLatch latch;
+    private RestTemplate restTemplate;
+    private String auditUrl;
+    private Map<String, String> auditIdMap;
+    private Boolean listAll;
+
+    public AuditRunnable(
+            AuditRequest request,
+            String auditId,
+            String auditName,
+            List<AuditVO> auditVOList,
+            CountDownLatch latch,
+            RestTemplate restTemplate,
+            String auditUrl,
+            Map<String, String> auditIdMap,
+            Boolean listAll) {
+        this.request = request;
+        this.auditId = auditId;
+        this.auditName = auditName;
+        this.auditVOList = auditVOList;
+        this.latch = latch;
+        this.restTemplate = restTemplate;
+        this.auditUrl = auditUrl;
+        this.auditIdMap = auditIdMap == null ? new HashMap<>() : auditIdMap;
+        this.listAll = listAll;
+    }
+
+    @Override
+    public void run() {
+        try {
+            List<AuditInfo> auditSet;
+            if (listAll) {
+                auditSet = getAuditInfoListByIp(request, 
request.getInlongGroupId(), request.getInlongStreamId(),
+                        request.getIp(), auditId);
+            } else {
+                auditSet = getAuditInfoList(request, 
request.getInlongGroupId(), request.getInlongStreamId(), auditId);
+            }
+            auditVOList.add(new AuditVO(auditId, auditName, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
+        } catch (Exception e) {
+            LOGGER.error("query audit failed for request={}", request);
+            throw new BusinessException("query audit failed");
+        } finally {
+            this.latch.countDown();
+        }
+
+    }
+
+    private List<AuditInfo> getAuditInfoListByIp(AuditRequest request, String 
groupId, String streamId, String ip,
+            String auditId) {
+        List<AuditInfo> auditSet = new ArrayList<>();
+        try {
+            String start = request.getStartDate();
+            String end = request.getEndDate();
+            if (StringUtils.isBlank(request.getEndDate())) {
+                end = 
SECOND_DATE_FORMATTER.parseDateTime(request.getEndDate()).plusDays(1).toString(SECOND_FORMAT);
+            }
+            StringBuilder builder = new StringBuilder();
+            builder.append(auditUrl);
+            if (StringUtils.isNotBlank(ip)) {
+                builder.append("/audit/query/getIps?")
+                        .append("&ip=").append(ip);
+            } else {
+                builder.append("/audit/query/minutes?")
+                        .append("&inlongGroupId=").append(groupId)
+                        .append("&inlongStreamId=").append(streamId);
+            }
+            builder.append("startTime=").append(start)
+                    .append("&endTime=").append(end)
+                    .append("&auditId=").append(auditId);
+            String url = builder.toString();
+            LOGGER.info("query audit url ={}", url);
+            AuditResponse result = HttpUtils.request(restTemplate,
+                    url,
+                    HttpMethod.GET, null,
+                    null,
+                    AuditResponse.class);
+            LOGGER.info("success to query audit info result={}", result);
+            return CommonBeanUtils.copyListProperties(result.getData(), 
AuditInfo::new);
+        } catch (Exception e) {
+            LOGGER.info("query audit failed for request={}", request, e);
+        }
+
+        return auditSet;
+    }
+
+    private List<AuditInfo> getAuditInfoList(AuditRequest request, String 
groupId, String streamId, String auditId) {
+        List<AuditInfo> auditSet = new ArrayList<>();
+        try {
+            String start = 
DAY_DATE_FORMATTER.parseDateTime(request.getStartDate()).toString(SECOND_FORMAT);
+            String end = 
DAY_DATE_FORMATTER.parseDateTime(request.getStartDate()).plusDays(1).toString(SECOND_FORMAT);
+            if (StringUtils.isNotBlank(request.getEndDate())) {
+                end = 
DAY_DATE_FORMATTER.parseDateTime(request.getEndDate()).plusDays(1).toString(SECOND_FORMAT);
+            }
+            StringBuilder builder = new StringBuilder();
+            builder.append(auditUrl);
+            switch (request.getTimeStaticsDim()) {
+                case HOUR:
+                    builder.append("/audit/query/hour?");
+                    break;
+                case DAY:
+                    builder.append("/audit/query/day?");
+                    break;
+                default:
+                    builder.append("/audit/query/minutes?");
+                    break;
+            }
+
+            builder.append("startTime=").append(start)
+                    .append("&endTime=").append(end)
+                    .append("&auditId=").append(auditId)
+                    .append("&inlongGroupId=").append(groupId)
+                    .append("&inlongStreamId=").append(streamId)
+                    .append("&auditCycle=1");
+            String url = builder.toString();
+            AuditResponse result = HttpUtils.request(restTemplate,
+                    url,
+                    HttpMethod.GET, null,
+                    null,
+                    AuditResponse.class);
+            LOGGER.info("success to query audit info result={}", result);
+            return CommonBeanUtils.copyListProperties(result.getData(), 
AuditInfo::new);
+        } catch (Exception e) {
+            LOGGER.info("query audit failed for request={}", request, e);
+        }
+
+        return auditSet;
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 079812536e..a7ae4e4299 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -54,6 +54,7 @@ import 
org.apache.inlong.manager.pojo.node.es.ElasticsearchQuerySortInfo.SortVal
 import org.apache.inlong.manager.pojo.node.es.ElasticsearchRequest;
 import org.apache.inlong.manager.pojo.user.LoginUserUtils;
 import org.apache.inlong.manager.pojo.user.UserRoleCode;
+import org.apache.inlong.manager.service.audit.AuditRunnable;
 import org.apache.inlong.manager.service.audit.InlongAuditSourceOperator;
 import 
org.apache.inlong.manager.service.audit.InlongAuditSourceOperatorFactory;
 import org.apache.inlong.manager.service.core.AuditService;
@@ -64,7 +65,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -78,13 +78,13 @@ import 
org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
 
 import javax.annotation.PostConstruct;
 
 import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -96,6 +96,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -133,6 +137,8 @@ public class AuditServiceImpl implements AuditService {
     private static final String DELAY = "delay";
     private static final String TERMS = "terms";
 
+    private ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(10);
+
     // key 1: type of audit, like pulsar, hive, key 2: indicator type, value : 
entity of audit base item
     private final Map<String, Map<Integer, AuditBaseEntity>> auditIndicatorMap 
= new ConcurrentHashMap<>();
 
@@ -147,6 +153,8 @@ public class AuditServiceImpl implements AuditService {
 
     @Value("${audit.query.source}")
     private String auditQuerySource;
+    @Value("${audit.query.url:http://127.0.0.1:10080}";)
+    private String auditQueryUrl;
 
     @Autowired
     private AuditBaseEntityMapper auditBaseMapper;
@@ -166,6 +174,8 @@ public class AuditServiceImpl implements AuditService {
     private InlongGroupEntityMapper inlongGroupMapper;
     @Autowired
     private InlongAuditSourceOperatorFactory auditSourceOperatorFactory;
+    @Autowired
+    private RestTemplate restTemplate;
 
     @PostConstruct
     public void initialize() {
@@ -304,6 +314,7 @@ public class AuditServiceImpl implements AuditService {
 
         List<AuditVO> result = new ArrayList<>();
         AuditQuerySource querySource = 
AuditQuerySource.valueOf(auditQuerySource);
+        CountDownLatch latch = new 
CountDownLatch(request.getAuditIds().size());
         for (String auditId : request.getAuditIds()) {
             AuditBaseEntity auditBaseEntity = auditItemMap.get(auditId);
             String auditName = auditBaseEntity != null ? 
auditBaseEntity.getName() : "";
@@ -329,63 +340,25 @@ public class AuditServiceImpl implements AuditService {
                     return vo;
                 }).collect(Collectors.toList());
                 result.add(new AuditVO(auditId, auditName, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
-            } else if (AuditQuerySource.ELASTICSEARCH == querySource) {
-                String index = String.format("%s_%s", 
request.getStartDate().replaceAll("-", ""), auditId);
-                if (!elasticsearchApi.indexExists(index)) {
-                    LOGGER.warn("elasticsearch index={} not exists", index);
-                    continue;
-                }
-                JsonObject response = elasticsearchApi.search(index, 
toAuditSearchRequestJson(groupId, streamId));
-                JsonObject aggregations = 
response.getAsJsonObject(AGGREGATIONS).getAsJsonObject(TERM_FILED);
-                if (!aggregations.isJsonNull()) {
-                    JsonObject logTs = 
aggregations.getAsJsonObject(TERM_FILED);
-                    if (!logTs.isJsonNull()) {
-                        JsonArray buckets = logTs.getAsJsonArray(BUCKETS);
-                        List<AuditInfo> auditSet = new ArrayList<>();
-                        for (int i = 0; i < buckets.size(); i++) {
-                            JsonObject bucket = 
buckets.get(i).getAsJsonObject();
-                            AuditInfo vo = new AuditInfo();
-                            vo.setLogTs(bucket.get(KEY).getAsString());
-                            vo.setCount((long) 
bucket.get(AGGREGATIONS_COUNT).getAsJsonObject().get(VALUE)
-                                    .getAsLong());
-                            vo.setDelay((long) 
bucket.get(AGGREGATIONS_DELAY).getAsJsonObject().get(VALUE)
-                                    .getAsLong());
-                            auditSet.add(vo);
-                        }
-                        result.add(new AuditVO(auditId, auditName, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
-                    }
-                }
             } else if (AuditQuerySource.CLICKHOUSE == querySource) {
-                try (Connection connection = config.getCkConnection();
-                        PreparedStatement statement = 
getAuditCkStatementGroupByLogTs(connection, groupId, streamId,
-                                request.getIp(), auditId,
-                                request.getStartDate(),
-                                request.getEndDate());
-
-                        ResultSet resultSet = statement.executeQuery()) {
-                    List<AuditInfo> auditSet = new ArrayList<>();
-                    while (resultSet.next()) {
-                        AuditInfo vo = new AuditInfo();
-                        
vo.setInlongGroupId(resultSet.getString("inlong_group_id"));
-                        
vo.setInlongStreamId(resultSet.getString("inlong_stream_id"));
-                        vo.setLogTs(resultSet.getString("log_ts"));
-                        vo.setCount(resultSet.getLong("total"));
-                        vo.setDelay(resultSet.getLong("total_delay"));
-                        vo.setSize(resultSet.getLong("total_size"));
-                        auditSet.add(vo);
-                    }
-                    result.add(new AuditVO(auditId, auditName, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
-                }
+                this.executor.execute(new AuditRunnable(request, auditId, 
auditName, result, latch, restTemplate,
+                        auditQueryUrl, auditIdMap, false));
             }
         }
+        if (AuditQuerySource.CLICKHOUSE == querySource) {
+            latch.await(30, TimeUnit.SECONDS);
+        } else {
+            result = aggregateByTimeDim(result, request.getTimeStaticsDim());
+        }
         LOGGER.info("success to query audit list for request={}", request);
-        return aggregateByTimeDim(result, request.getTimeStaticsDim());
+        return result;
     }
 
     @Override
     public List<AuditVO> listAll(AuditRequest request) throws Exception {
         List<AuditVO> result = new ArrayList<>();
         AuditQuerySource querySource = 
AuditQuerySource.valueOf(auditQuerySource);
+        CountDownLatch latch = new 
CountDownLatch(request.getAuditIds().size());
         for (String auditId : request.getAuditIds()) {
             AuditBaseEntity auditBaseEntity = auditItemMap.get(auditId);
             String auditName = "";
@@ -412,25 +385,8 @@ public class AuditServiceImpl implements AuditService {
                 }).collect(Collectors.toList());
                 result.add(new AuditVO(auditId, auditName, auditSet, null));
             } else if (AuditQuerySource.CLICKHOUSE == querySource) {
-                try (Connection connection = config.getCkConnection();
-                        PreparedStatement statement = 
getAuditCkStatementGroupByIp(connection,
-                                request.getInlongGroupId(), 
request.getInlongStreamId(), request.getIp(), auditId,
-                                request.getStartDate(), request.getEndDate());
-
-                        ResultSet resultSet = statement.executeQuery()) {
-                    List<AuditInfo> auditSet = new ArrayList<>();
-                    while (resultSet.next()) {
-                        AuditInfo vo = new AuditInfo();
-                        
vo.setInlongGroupId(resultSet.getString("inlong_group_id"));
-                        
vo.setInlongStreamId(resultSet.getString("inlong_stream_id"));
-                        vo.setIp(resultSet.getString("ip"));
-                        vo.setCount(resultSet.getLong("total"));
-                        vo.setDelay(resultSet.getLong("total_delay"));
-                        vo.setSize(resultSet.getLong("total_size"));
-                        auditSet.add(vo);
-                    }
-                    result.add(new AuditVO(auditId, auditName, auditSet, 
null));
-                }
+                this.executor.execute(new AuditRunnable(request, auditId, 
auditName, result, latch, restTemplate,
+                        auditQueryUrl, null, true));
             }
         }
         return result;
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-dev.properties 
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index ec773d0442..e92862ca3e 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -52,6 +52,8 @@ 
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearc
 # Audit configuration
 # Audit query source that decide what data source to query, currently only 
supports [MYSQL|ELASTICSEARCH|CLICKHOUSE]
 audit.query.source=MYSQL
+# Audit query url
+audit.query.url=http://127.0.0.1:10080
 
 # Elasticsearch config
 # Elasticsearch host split by coma if more than one host, such as 'host1,host2'
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-prod.properties 
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 920eec020d..8ae818be72 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -51,6 +51,8 @@ 
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearc
 # Audit configuration
 # Audit query source that decide what data source to query, currently only 
supports [MYSQL|ELASTICSEARCH|CLICKHOUSE]
 audit.query.source=MYSQL
+# Audit query url
+audit.query.url=http://127.0.0.1:10080
 
 # Elasticsearch config
 # Elasticsearch host split by coma if more than one host, such as 'host1,host2'
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-test.properties 
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index c8323e5592..ba694a7b09 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -52,6 +52,8 @@ 
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearc
 # Audit configuration
 # Audit query source that decide what data source to query, currently only 
supports [MYSQL|ELASTICSEARCH|CLICKHOUSE]
 audit.query.source=MYSQL
+# Audit query url
+audit.query.url=http://127.0.0.1:10080
 
 # Elasticsearch config
 # Elasticsearch host split by coma if more than one host, such as 'host1,host2'

Reply via email to