This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 99f1b8d875 [INLONG-10034][Manager] Support calling the audit service
interface to query audit information (#10037)
99f1b8d875 is described below
commit 99f1b8d8759487cb121f698336cb45c1b0c618cf
Author: fuweng11 <[email protected]>
AuthorDate: Mon Apr 22 18:28:02 2024 +0800
[INLONG-10034][Manager] Support calling the audit service interface to
query audit information (#10037)
---
.../inlong/manager/pojo/audit/AuditResponse.java | 50 ++++++
.../manager/service/audit/AuditRunnable.java | 185 +++++++++++++++++++++
.../service/core/impl/AuditServiceImpl.java | 92 +++-------
.../src/main/resources/application-dev.properties | 2 +
.../src/main/resources/application-prod.properties | 2 +
.../src/main/resources/application-test.properties | 2 +
6 files changed, 265 insertions(+), 68 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditResponse.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditResponse.java
new file mode 100644
index 0000000000..280189878d
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditResponse.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.audit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * Audit response
+ */
+@Data
+public class AuditResponse {
+
+ private Boolean success;
+ private String errMsg;
+ private List<AuditItem> data;
+
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class AuditItem {
+
+ private String logTs;
+ private String inlongGroupId;
+ private String inlongStreamId;
+ private String auditId;
+ private String auditTag;
+ private Long count;
+ private Long size;
+ private Long delay;
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditRunnable.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditRunnable.java
new file mode 100644
index 0000000000..aa74fed7e8
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditRunnable.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.audit;
+
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.audit.AuditRequest;
+import org.apache.inlong.manager.pojo.audit.AuditResponse;
+import org.apache.inlong.manager.pojo.audit.AuditVO;
+
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpMethod;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Audit query Runnable
+ */
+public class AuditRunnable implements Runnable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AuditRunnable.class);
+
+ private static final String SECOND_FORMAT = "yyyy-MM-dd HH:mm:ss";
+ private static final String DAY_FORMAT = "yyyy-MM-dd";
+ private static final DateTimeFormatter SECOND_DATE_FORMATTER =
DateTimeFormat.forPattern(SECOND_FORMAT);
+ private static final DateTimeFormatter DAY_DATE_FORMATTER =
DateTimeFormat.forPattern(DAY_FORMAT);
+
+ private AuditRequest request;
+ private String auditId;
+ private String auditName;
+ private List<AuditVO> auditVOList;
+ private CountDownLatch latch;
+ private RestTemplate restTemplate;
+ private String auditUrl;
+ private Map<String, String> auditIdMap;
+ private Boolean listAll;
+
+ public AuditRunnable(
+ AuditRequest request,
+ String auditId,
+ String auditName,
+ List<AuditVO> auditVOList,
+ CountDownLatch latch,
+ RestTemplate restTemplate,
+ String auditUrl,
+ Map<String, String> auditIdMap,
+ Boolean listAll) {
+ this.request = request;
+ this.auditId = auditId;
+ this.auditName = auditName;
+ this.auditVOList = auditVOList;
+ this.latch = latch;
+ this.restTemplate = restTemplate;
+ this.auditUrl = auditUrl;
+ this.auditIdMap = auditIdMap == null ? new HashMap<>() : auditIdMap;
+ this.listAll = listAll;
+ }
+
+ @Override
+ public void run() {
+ try {
+ List<AuditInfo> auditSet;
+ if (listAll) {
+ auditSet = getAuditInfoListByIp(request,
request.getInlongGroupId(), request.getInlongStreamId(),
+ request.getIp(), auditId);
+ } else {
+ auditSet = getAuditInfoList(request,
request.getInlongGroupId(), request.getInlongStreamId(), auditId);
+ }
+ auditVOList.add(new AuditVO(auditId, auditName, auditSet,
auditIdMap.getOrDefault(auditId, null)));
+ } catch (Exception e) {
+ LOGGER.error("query audit failed for request={}", request);
+ throw new BusinessException("query audit failed");
+ } finally {
+ this.latch.countDown();
+ }
+
+ }
+
+ private List<AuditInfo> getAuditInfoListByIp(AuditRequest request, String
groupId, String streamId, String ip,
+ String auditId) {
+ List<AuditInfo> auditSet = new ArrayList<>();
+ try {
+ String start = request.getStartDate();
+ String end = request.getEndDate();
+ if (StringUtils.isBlank(request.getEndDate())) {
+ end =
SECOND_DATE_FORMATTER.parseDateTime(request.getEndDate()).plusDays(1).toString(SECOND_FORMAT);
+ }
+ StringBuilder builder = new StringBuilder();
+ builder.append(auditUrl);
+ if (StringUtils.isNotBlank(ip)) {
+ builder.append("/audit/query/getIps?")
+ .append("&ip=").append(ip);
+ } else {
+ builder.append("/audit/query/minutes?")
+ .append("&inlongGroupId=").append(groupId)
+ .append("&inlongStreamId=").append(streamId);
+ }
+ builder.append("startTime=").append(start)
+ .append("&endTime=").append(end)
+ .append("&auditId=").append(auditId);
+ String url = builder.toString();
+ LOGGER.info("query audit url ={}", url);
+ AuditResponse result = HttpUtils.request(restTemplate,
+ url,
+ HttpMethod.GET, null,
+ null,
+ AuditResponse.class);
+ LOGGER.info("success to query audit info result={}", result);
+ return CommonBeanUtils.copyListProperties(result.getData(),
AuditInfo::new);
+ } catch (Exception e) {
+ LOGGER.info("query audit failed for request={}", request, e);
+ }
+
+ return auditSet;
+ }
+
+ private List<AuditInfo> getAuditInfoList(AuditRequest request, String
groupId, String streamId, String auditId) {
+ List<AuditInfo> auditSet = new ArrayList<>();
+ try {
+ String start =
DAY_DATE_FORMATTER.parseDateTime(request.getStartDate()).toString(SECOND_FORMAT);
+ String end =
DAY_DATE_FORMATTER.parseDateTime(request.getStartDate()).plusDays(1).toString(SECOND_FORMAT);
+ if (StringUtils.isNotBlank(request.getEndDate())) {
+ end =
DAY_DATE_FORMATTER.parseDateTime(request.getEndDate()).plusDays(1).toString(SECOND_FORMAT);
+ }
+ StringBuilder builder = new StringBuilder();
+ builder.append(auditUrl);
+ switch (request.getTimeStaticsDim()) {
+ case HOUR:
+ builder.append("/audit/query/hour?");
+ break;
+ case DAY:
+ builder.append("/audit/query/day?");
+ break;
+ default:
+ builder.append("/audit/query/minutes?");
+ break;
+ }
+
+ builder.append("startTime=").append(start)
+ .append("&endTime=").append(end)
+ .append("&auditId=").append(auditId)
+ .append("&inlongGroupId=").append(groupId)
+ .append("&inlongStreamId=").append(streamId)
+ .append("&auditCycle=1");
+ String url = builder.toString();
+ AuditResponse result = HttpUtils.request(restTemplate,
+ url,
+ HttpMethod.GET, null,
+ null,
+ AuditResponse.class);
+ LOGGER.info("success to query audit info result={}", result);
+ return CommonBeanUtils.copyListProperties(result.getData(),
AuditInfo::new);
+ } catch (Exception e) {
+ LOGGER.info("query audit failed for request={}", request, e);
+ }
+
+ return auditSet;
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 079812536e..a7ae4e4299 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -54,6 +54,7 @@ import
org.apache.inlong.manager.pojo.node.es.ElasticsearchQuerySortInfo.SortVal
import org.apache.inlong.manager.pojo.node.es.ElasticsearchRequest;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.user.UserRoleCode;
+import org.apache.inlong.manager.service.audit.AuditRunnable;
import org.apache.inlong.manager.service.audit.InlongAuditSourceOperator;
import
org.apache.inlong.manager.service.audit.InlongAuditSourceOperatorFactory;
import org.apache.inlong.manager.service.core.AuditService;
@@ -64,7 +65,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -78,13 +78,13 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -96,6 +96,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -133,6 +137,8 @@ public class AuditServiceImpl implements AuditService {
private static final String DELAY = "delay";
private static final String TERMS = "terms";
+ private ScheduledExecutorService executor =
Executors.newScheduledThreadPool(10);
+
// key 1: type of audit, like pulsar, hive, key 2: indicator type, value :
entity of audit base item
private final Map<String, Map<Integer, AuditBaseEntity>> auditIndicatorMap
= new ConcurrentHashMap<>();
@@ -147,6 +153,8 @@ public class AuditServiceImpl implements AuditService {
@Value("${audit.query.source}")
private String auditQuerySource;
+ @Value("${audit.query.url:http://127.0.0.1:10080}")
+ private String auditQueryUrl;
@Autowired
private AuditBaseEntityMapper auditBaseMapper;
@@ -166,6 +174,8 @@ public class AuditServiceImpl implements AuditService {
private InlongGroupEntityMapper inlongGroupMapper;
@Autowired
private InlongAuditSourceOperatorFactory auditSourceOperatorFactory;
+ @Autowired
+ private RestTemplate restTemplate;
@PostConstruct
public void initialize() {
@@ -304,6 +314,7 @@ public class AuditServiceImpl implements AuditService {
List<AuditVO> result = new ArrayList<>();
AuditQuerySource querySource =
AuditQuerySource.valueOf(auditQuerySource);
+ CountDownLatch latch = new
CountDownLatch(request.getAuditIds().size());
for (String auditId : request.getAuditIds()) {
AuditBaseEntity auditBaseEntity = auditItemMap.get(auditId);
String auditName = auditBaseEntity != null ?
auditBaseEntity.getName() : "";
@@ -329,63 +340,25 @@ public class AuditServiceImpl implements AuditService {
return vo;
}).collect(Collectors.toList());
result.add(new AuditVO(auditId, auditName, auditSet,
auditIdMap.getOrDefault(auditId, null)));
- } else if (AuditQuerySource.ELASTICSEARCH == querySource) {
- String index = String.format("%s_%s",
request.getStartDate().replaceAll("-", ""), auditId);
- if (!elasticsearchApi.indexExists(index)) {
- LOGGER.warn("elasticsearch index={} not exists", index);
- continue;
- }
- JsonObject response = elasticsearchApi.search(index,
toAuditSearchRequestJson(groupId, streamId));
- JsonObject aggregations =
response.getAsJsonObject(AGGREGATIONS).getAsJsonObject(TERM_FILED);
- if (!aggregations.isJsonNull()) {
- JsonObject logTs =
aggregations.getAsJsonObject(TERM_FILED);
- if (!logTs.isJsonNull()) {
- JsonArray buckets = logTs.getAsJsonArray(BUCKETS);
- List<AuditInfo> auditSet = new ArrayList<>();
- for (int i = 0; i < buckets.size(); i++) {
- JsonObject bucket =
buckets.get(i).getAsJsonObject();
- AuditInfo vo = new AuditInfo();
- vo.setLogTs(bucket.get(KEY).getAsString());
- vo.setCount((long)
bucket.get(AGGREGATIONS_COUNT).getAsJsonObject().get(VALUE)
- .getAsLong());
- vo.setDelay((long)
bucket.get(AGGREGATIONS_DELAY).getAsJsonObject().get(VALUE)
- .getAsLong());
- auditSet.add(vo);
- }
- result.add(new AuditVO(auditId, auditName, auditSet,
auditIdMap.getOrDefault(auditId, null)));
- }
- }
} else if (AuditQuerySource.CLICKHOUSE == querySource) {
- try (Connection connection = config.getCkConnection();
- PreparedStatement statement =
getAuditCkStatementGroupByLogTs(connection, groupId, streamId,
- request.getIp(), auditId,
- request.getStartDate(),
- request.getEndDate());
-
- ResultSet resultSet = statement.executeQuery()) {
- List<AuditInfo> auditSet = new ArrayList<>();
- while (resultSet.next()) {
- AuditInfo vo = new AuditInfo();
-
vo.setInlongGroupId(resultSet.getString("inlong_group_id"));
-
vo.setInlongStreamId(resultSet.getString("inlong_stream_id"));
- vo.setLogTs(resultSet.getString("log_ts"));
- vo.setCount(resultSet.getLong("total"));
- vo.setDelay(resultSet.getLong("total_delay"));
- vo.setSize(resultSet.getLong("total_size"));
- auditSet.add(vo);
- }
- result.add(new AuditVO(auditId, auditName, auditSet,
auditIdMap.getOrDefault(auditId, null)));
- }
+ this.executor.execute(new AuditRunnable(request, auditId,
auditName, result, latch, restTemplate,
+ auditQueryUrl, auditIdMap, false));
}
}
+ if (AuditQuerySource.CLICKHOUSE == querySource) {
+ latch.await(30, TimeUnit.SECONDS);
+ } else {
+ result = aggregateByTimeDim(result, request.getTimeStaticsDim());
+ }
LOGGER.info("success to query audit list for request={}", request);
- return aggregateByTimeDim(result, request.getTimeStaticsDim());
+ return result;
}
@Override
public List<AuditVO> listAll(AuditRequest request) throws Exception {
List<AuditVO> result = new ArrayList<>();
AuditQuerySource querySource =
AuditQuerySource.valueOf(auditQuerySource);
+ CountDownLatch latch = new
CountDownLatch(request.getAuditIds().size());
for (String auditId : request.getAuditIds()) {
AuditBaseEntity auditBaseEntity = auditItemMap.get(auditId);
String auditName = "";
@@ -412,25 +385,8 @@ public class AuditServiceImpl implements AuditService {
}).collect(Collectors.toList());
result.add(new AuditVO(auditId, auditName, auditSet, null));
} else if (AuditQuerySource.CLICKHOUSE == querySource) {
- try (Connection connection = config.getCkConnection();
- PreparedStatement statement =
getAuditCkStatementGroupByIp(connection,
- request.getInlongGroupId(),
request.getInlongStreamId(), request.getIp(), auditId,
- request.getStartDate(), request.getEndDate());
-
- ResultSet resultSet = statement.executeQuery()) {
- List<AuditInfo> auditSet = new ArrayList<>();
- while (resultSet.next()) {
- AuditInfo vo = new AuditInfo();
-
vo.setInlongGroupId(resultSet.getString("inlong_group_id"));
-
vo.setInlongStreamId(resultSet.getString("inlong_stream_id"));
- vo.setIp(resultSet.getString("ip"));
- vo.setCount(resultSet.getLong("total"));
- vo.setDelay(resultSet.getLong("total_delay"));
- vo.setSize(resultSet.getLong("total_size"));
- auditSet.add(vo);
- }
- result.add(new AuditVO(auditId, auditName, auditSet,
null));
- }
+ this.executor.execute(new AuditRunnable(request, auditId,
auditName, result, latch, restTemplate,
+ auditQueryUrl, null, true));
}
}
return result;
diff --git
a/inlong-manager/manager-web/src/main/resources/application-dev.properties
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index ec773d0442..e92862ca3e 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -52,6 +52,8 @@
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearc
# Audit configuration
# Audit query source that decide what data source to query, currently only
supports [MYSQL|ELASTICSEARCH|CLICKHOUSE]
audit.query.source=MYSQL
+# Audit query url
+audit.query.url=http://127.0.0.1:10080
# Elasticsearch config
# Elasticsearch host split by coma if more than one host, such as 'host1,host2'
diff --git
a/inlong-manager/manager-web/src/main/resources/application-prod.properties
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 920eec020d..8ae818be72 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -51,6 +51,8 @@
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearc
# Audit configuration
# Audit query source that decide what data source to query, currently only
supports [MYSQL|ELASTICSEARCH|CLICKHOUSE]
audit.query.source=MYSQL
+# Audit query url
+audit.query.url=http://127.0.0.1:10080
# Elasticsearch config
# Elasticsearch host split by coma if more than one host, such as 'host1,host2'
diff --git
a/inlong-manager/manager-web/src/main/resources/application-test.properties
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index c8323e5592..ba694a7b09 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -52,6 +52,8 @@
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearc
# Audit configuration
# Audit query source that decide what data source to query, currently only
supports [MYSQL|ELASTICSEARCH|CLICKHOUSE]
audit.query.source=MYSQL
+# Audit query url
+audit.query.url=http://127.0.0.1:10080
# Elasticsearch config
# Elasticsearch host split by coma if more than one host, such as 'host1,host2'