This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 09a1f073ec [INLONG-10321][Audit] Audit supports the Audit Proxy
service discovery and management (#10386)
09a1f073ec is described below
commit 09a1f073ec2829290b557c654b9451341b2eec53
Author: doleyzi <[email protected]>
AuthorDate: Tue Jun 11 19:22:25 2024 +0800
[INLONG-10321][Audit] Audit supports the Audit Proxy service discovery and
management (#10386)
* Audit supports the Audit Proxy service discovery and management
* ADD SQL
---
.../inlong/audit/entities/AuditComponent.java} | 17 +-
.../apache/inlong/audit/file/ConfigManager.java | 8 +
.../apache/inlong/audit/heartbeat/Heartbeat.java | 135 ++++++++++++
.../org/apache/inlong/audit/node/Application.java | 5 +
.../apache/inlong/audit/cache/AuditProxyCache.java | 234 +++++++++++++++++++++
.../inlong/audit/config/OpenApiConstants.java | 11 +-
.../apache/inlong/audit/config/SqlConstants.java | 61 ++++++
.../org/apache/inlong/audit/entities/ApiType.java | 2 +-
.../entities/{ApiType.java => AuditProxy.java} | 14 +-
.../entities/{ApiType.java => Heartbeat.java} | 15 +-
.../inlong/audit/heartbeat/ProxyHeartbeat.java | 96 +++++++++
.../apache/inlong/audit/service/ApiService.java | 57 ++++-
inlong-audit/sql/apache_inlong_audit_mysql.sql | 27 +++
13 files changed, 655 insertions(+), 27 deletions(-)
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entities/AuditComponent.java
similarity index 73%
copy from
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
copy to
inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entities/AuditComponent.java
index 08ba84b7c7..d7e077e3fa 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entities/AuditComponent.java
@@ -17,9 +17,16 @@
package org.apache.inlong.audit.entities;
-/**
- * OpenAPI type
- */
-public enum ApiType {
- MINUTES, HOUR, DAY, GET_IPS, GET_IDS;
+public enum AuditComponent {
+
+ AGENT("Agent"), DATAPROXY("DataProxy"), SORT("Sort"),
COMMON_AUDIT("Common");
+ private final String component;
+
+ AuditComponent(String component) {
+ this.component = component;
+ }
+
+ public String getComponent() {
+ return component;
+ }
}
diff --git
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
index 2847836da2..68745efd23 100644
---
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
@@ -94,6 +94,14 @@ public class ConfigManager {
return null;
}
+ public String getValue(String key) {
+ ConfigHolder holder = holderMap.get(DEFAULT_CONFIG_PROPERTIES);
+ if (holder != null) {
+ return holder.getHolder().get(key);
+ }
+ return null;
+ }
+
private boolean updatePropertiesHolder(Map<String, String> result,
String holderName, boolean addElseRemove) {
if (StringUtils.isNotEmpty(holderName)) {
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java
new file mode 100644
index 0000000000..ce921f6051
--- /dev/null
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.heartbeat;
+
+import org.apache.inlong.audit.file.ConfigManager;
+import org.apache.inlong.common.util.NetworkUtils;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.apache.inlong.audit.entities.AuditComponent.COMMON_AUDIT;
+
+public class Heartbeat {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Heartbeat.class);
+ private final static String HEARTBEAT_PATH = "/audit/proxy/heartbeat";
+ private static final String AUDIT_HEARTBEAT_INTERVAL_CONFIG_KEY =
"audit.heartbeat.interval";
+ private static final String AUDIT_SERVICE_HOST_CONFIG_KEY =
"audit.service.host";
+ private static final String AUDIT_SERVICE_PORT_CONFIG_KEY =
"agent1.sources.tcp-source.port";
+ private static final String AUDIT_COMPONENT_CONFIG_KEY = "audit.component";
+ private String heartbeatHost;
+ private final ScheduledExecutorService timer =
Executors.newSingleThreadScheduledExecutor();
+ private final String localIp;
+ private final String localPort;
+
+ public Heartbeat() {
+ localIp = NetworkUtils.getLocalIp();
+ localPort = getLocalPort();
+ }
+
+ public void Start() {
+ heartbeatHost = getConfiguredValue(AUDIT_SERVICE_HOST_CONFIG_KEY);
+ timer.scheduleWithFixedDelay(this::heartbeat,
+ 1,
+ getConfiguredInterval(),
+ TimeUnit.MINUTES);
+ }
+
+ private void heartbeat() {
+ if (heartbeatHost == null || localPort == null) {
+ LOGGER.info("Heartbeat is not configure, Don`t need heartbeat");
+ return;
+ }
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ URIBuilder uriBuilder = new URIBuilder("http://" + heartbeatHost +
HEARTBEAT_PATH);
+ uriBuilder.setParameter("component", getConfiguredComponent());
+ uriBuilder.setParameter("host", localIp);
+ uriBuilder.setParameter("port", localPort);
+
+ HttpGet httpGet = new HttpGet(uriBuilder.build());
+
+ try (CloseableHttpResponse response = httpClient.execute(httpGet))
{
+ String responseBody =
EntityUtils.toString(response.getEntity());
+ LOGGER.info("Heartbeat response: {}", responseBody);
+ }
+ } catch (Exception exception) {
+ LOGGER.error("Heartbeat has exception", exception);
+ }
+ }
+
+ private int getConfiguredInterval() {
+ String intervalConfigValue =
getConfiguredValue(AUDIT_HEARTBEAT_INTERVAL_CONFIG_KEY);
+ return intervalConfigValue != null ?
Integer.parseInt(intervalConfigValue) : 1;
+ }
+
+ private String getConfiguredComponent() {
+ String intervalConfigComponent =
getConfiguredValue(AUDIT_COMPONENT_CONFIG_KEY);
+ return intervalConfigComponent != null ? intervalConfigComponent :
COMMON_AUDIT.getComponent();
+ }
+
+ private String getConfiguredValue(String configKey) {
+ return ConfigManager.getInstance().getValue(configKey);
+ }
+
+ private String getLocalPort() {
+ try (Stream<Path> paths = Files.walk(Paths.get("."))) {
+ Path selectedPath = paths.filter(Files::isRegularFile)
+ .filter(p -> p.toString().endsWith(".conf"))
+ .findFirst()
+ .orElse(null);
+ if (selectedPath != null) {
+ String port = loadProperties(selectedPath);
+ LOGGER.info("File: {} , TCP Source Port: {}", selectedPath,
port);
+ if (port != null) {
+ return port;
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.error("Get local port has error", e);
+ }
+ return null;
+ }
+
+ private String loadProperties(Path path) {
+ Properties prop = new Properties();
+ try {
+ prop.load(Files.newInputStream(path));
+ return prop.getProperty(AUDIT_SERVICE_PORT_CONFIG_KEY);
+ } catch (IOException e) {
+ LOGGER.error("Load properties has error", e);
+ }
+ return null;
+ }
+}
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
index 4ab7f29e51..fcd32fb493 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
@@ -18,6 +18,7 @@
package org.apache.inlong.audit.node;
import org.apache.inlong.audit.file.ConfigManager;
+import org.apache.inlong.audit.heartbeat.Heartbeat;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
@@ -76,6 +77,8 @@ public class Application {
private MonitorService monitorServer;
private final ReentrantLock lifecycleLock = new ReentrantLock();
+ private static final Heartbeat heartbeat = new Heartbeat();
+
public Application() {
this(new ArrayList<LifecycleAware>(0));
}
@@ -335,6 +338,8 @@ public class Application {
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
+ heartbeat.Start();
+
// start application
application.start();
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
new file mode 100644
index 0000000000..b51c2b2ff1
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.cache;
+
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.AuditProxy;
+import org.apache.inlong.audit.entities.JdbcConfig;
+import org.apache.inlong.audit.utils.JdbcUtils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.commons.dbcp.BasicDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_MINUTES;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_MINUTES;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_MAX_SIZE;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL;
+
+public class AuditProxyCache {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AuditProxyCache.class);
+ private static final AuditProxyCache instance = new AuditProxyCache();
+ private final Cache<String, HashSet<AuditProxy>> cache;
+ protected final ScheduledExecutorService monitorTimer =
Executors.newSingleThreadScheduledExecutor();
+ private final BasicDataSource dataSource = new BasicDataSource();
+ private final String queryAllAuditProxyHostSQL;
+ private final String queryAuditProxyHostByComponentSQL;
+
+ private final String queryAuditProxyHeartbeatSQL;
+ private final String queryAuditProxyHeartbeatByComponentSQL;
+
+ private AuditProxyCache() {
+ cache = Caffeine.newBuilder()
+
.maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE,
+ DEFAULT_API_CACHE_MAX_SIZE))
+
.expireAfterWrite(Configuration.getInstance().get(KEY_API_CACHE_EXPIRED_MINUTES,
+ DEFAULT_API_CACHE_EXPIRED_MINUTES), TimeUnit.MINUTES)
+ .build();
+ queryAuditProxyHostByComponentSQL =
Configuration.getInstance().get(KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL,
+ DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL);
+
+ queryAllAuditProxyHostSQL =
Configuration.getInstance().get(KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL,
+ DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL);
+ queryAuditProxyHeartbeatSQL =
Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL,
+ DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL);
+ queryAuditProxyHeartbeatByComponentSQL =
+
Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL,
+ DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL);
+
+ initDataSource();
+ monitorTimer.scheduleWithFixedDelay(new Runnable() {
+
+ @Override
+ public void run() {
+ update();
+ }
+ }, 0, 1, TimeUnit.MINUTES);
+ }
+
+ private void initDataSource() {
+ JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
+ dataSource.setDriverClassName(jdbcConfig.getDriverClass());
+ dataSource.setUrl(jdbcConfig.getJdbcUrl());
+ dataSource.setUsername(jdbcConfig.getUserName());
+ dataSource.setPassword(jdbcConfig.getPassword());
+
dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
+ DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
+
dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS,
+ DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS));
+
dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS,
+ DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS));
+
dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
+ DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
+ dataSource.setTestOnBorrow(true);
+ dataSource.setValidationQuery("SELECT 1");
+ dataSource
+
.setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS,
+ DEFAULT_DATASOURCE_DETECT_INTERVAL_MS));
+ }
+
+ public static AuditProxyCache getInstance() {
+ return instance;
+ }
+
+ public List<AuditProxy> getData(String component) {
+ HashSet<AuditProxy> result = cache.getIfPresent(component);
+ if (result != null) {
+ return new ArrayList<>(result);
+ }
+ result = queryAuditProxyInfo(component);
+ if (result.isEmpty()) {
+ result = queryAuditProxyHeartbeat(component);
+ }
+ if (!result.isEmpty()) {
+ cache.put(component, result);
+ }
+ return new ArrayList<>(result);
+ }
+
+ private void update() {
+ Map<String, HashSet<AuditProxy>> auditProxyInfo =
queryAllAuditProxyInfo();
+ if (auditProxyInfo.isEmpty()) {
+ auditProxyInfo = queryAuditProxyHeartbeat();
+ }
+ if (auditProxyInfo.isEmpty()) {
+ return;
+ }
+ for (Map.Entry<String, HashSet<AuditProxy>> entry :
auditProxyInfo.entrySet()) {
+ try {
+ cache.put(entry.getKey(), entry.getValue());
+ } catch (Exception e) {
+ LOGGER.error("Put data into audit proxy cache has exception!
", e);
+ // Decide whether to break or continue based on your
requirement
+ break;
+ }
+ }
+ }
+
+ private Map<String, HashSet<AuditProxy>> queryAllAuditProxyInfo() {
+ Map<String, HashSet<AuditProxy>> result = new HashMap<>();
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(queryAllAuditProxyHostSQL);
+ ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ String component = resultSet.getString("component");
+ AuditProxy auditProxyInfo = new
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
+ result.computeIfAbsent(component, k -> new
HashSet<>()).add(auditProxyInfo);
+ }
+ } catch (Exception exception) {
+ LOGGER.error("Query audit proxy info has exception! ", exception);
+ }
+ return result;
+ }
+
+ private HashSet<AuditProxy> queryAuditProxyInfo(String component) {
+ HashSet<AuditProxy> result = new HashSet<>();
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(queryAuditProxyHostByComponentSQL)) {
+ statement.setString(1, component);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ AuditProxy auditProxyInfo = new
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
+ result.add(auditProxyInfo);
+ }
+ } catch (SQLException sqlException) {
+ LOGGER.error("Query audit proxy info by {} has SQL exception
", component, sqlException);
+ }
+ } catch (Exception exception) {
+ LOGGER.error("Query audit proxy info by {} has exception ",
component, exception);
+ }
+ return result;
+ }
+
+ private Map<String, HashSet<AuditProxy>> queryAuditProxyHeartbeat() {
+ Map<String, HashSet<AuditProxy>> result = new HashMap<>();
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(queryAuditProxyHeartbeatSQL);
+ ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ String component = resultSet.getString("component");
+ AuditProxy auditProxyInfo = new
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
+ result.computeIfAbsent(component, k -> new
HashSet<>()).add(auditProxyInfo);
+ }
+ } catch (Exception exception) {
+ LOGGER.error("Query audit proxy info has exception! ", exception);
+ }
+ return result;
+ }
+
+ private HashSet<AuditProxy> queryAuditProxyHeartbeat(String component) {
+ HashSet<AuditProxy> result = new HashSet<>();
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(queryAuditProxyHeartbeatByComponentSQL)) {
+ statement.setString(1, component);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ AuditProxy auditProxyInfo = new
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
+ result.add(auditProxyInfo);
+ }
+ } catch (SQLException sqlException) {
+ LOGGER.error("Query audit proxy info by {} has SQL exception
", component, sqlException);
+ }
+ } catch (Exception exception) {
+ LOGGER.error("Query audit proxy info by {} has exception ",
component, exception);
+ }
+ return result;
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
index 05643c43bf..1c22a28fad 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
@@ -33,6 +33,10 @@ public class OpenApiConstants {
public static final String DEFAULT_API_GET_IPS_PATH =
"/audit/query/getIps";
public static final String KEY_API_GET_IDS_PATH = "api.get.ids.path";
public static final String DEFAULT_API_GET_IDS_PATH =
"/audit/query/getIds";
+ public static final String KEY_API_GET_AUDIT_PROXY_PATH =
"api.get.audit.proxy";
+ public static final String DEFAULT_API_GET_AUDIT_PROXY_PATH =
"/audit/query/getAuditProxy";
+ public static final String KEY_API_PROXY_HEART_BEAT_PATH =
"api.proxy.heartbeat";
+ public static final String DEFAULT_API_PROXY_HEART_BEAT_PATH =
"/audit/proxy/heartbeat";
public static final String KEY_API_THREAD_POOL_SIZE =
"api.thread.pool.size";
public static final int DEFAULT_API_THREAD_POOL_SIZE = 10;
public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
@@ -47,6 +51,9 @@ public class OpenApiConstants {
public static final String KEY_API_CACHE_EXPIRED_HOURS =
"api.cache.expired.hours";
public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
+ public static final String KEY_API_CACHE_EXPIRED_MINUTES =
"api.cache.expired.minutes";
+ public static final int DEFAULT_API_CACHE_EXPIRED_MINUTES = 1;
+
// Http config
public static final String PARAMS_START_TIME = "startTime";
public static final String PARAMS_END_TIME = "endTime";
@@ -64,5 +71,7 @@ public class OpenApiConstants {
public static final String KEY_HTTP_SERVER_BIND_PORT =
"api.http.server.bind.port";
public static final int DEFAULT_HTTP_SERVER_BIND_PORT = 10080;
public static final int HTTP_RESPOND_CODE = 200;
- public static final String DEFAULT_PARAMS_AUDIT_TAG = "";
+ public static final String PARAMS_AUDIT_COMPONENT = "component";
+ public static final String PARAMS_AUDIT_HOST = "host";
+ public static final String PARAMS_AUDIT_PORT = "port";
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index f0eebca6de..036781d773 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -164,4 +164,65 @@ public class SqlConstants {
"replace into audit_data_temp (log_ts,inlong_group_id,
inlong_stream_id, audit_id,audit_tag,count, size, delay) "
+ " values (?,?,?,?,?,?,?,?)";
+ public static final String KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL =
"audit.proxy.host.query.component.sql";
+ public static final String DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL =
+ "select\n" +
+ " host as host,\n" +
+ " port as port \n" +
+ "from\n" +
+ " audit_proxy_host\n" +
+ "where\n" +
+ " LOWER(component) = LOWER(?)\n" +
+ " and status = 1\n" +
+ "group by\n" +
+ " host,\n" +
+ " port";
+
+ public static final String KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL =
"audit.proxy.host.query.all.sql";
+ public static final String DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL =
+ "select\n" +
+ " component,\n" +
+ " host as host,\n" +
+ " port as port\n" +
+ "from\n" +
+ " audit_proxy_host\n" +
+ "where\n" +
+ " status = 1\n" +
+ "group by\n" +
+ " component,\n" +
+ " host,\n" +
+ " port";
+
+ public static final String KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL =
"audit.proxy.heartbeat.query.sql";
+ public static final String
DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL =
+ "select\n" +
+ " host,\n" +
+ " port\n" +
+ "from\n" +
+ " audit_proxy_heartbeat\n" +
+ "where\n" +
+ " LOWER(component) = LOWER(?)\n" +
+ " and update_time > (NOW() - INTERVAL 2 MINUTE)\n" +
+ "group by\n" +
+ " host,\n" +
+ " port ";
+
+ public static final String KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL =
"audit.proxy.heartbeat.query.all.sql";
+ public static final String DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL =
+ "select\n" +
+ " host,\n" +
+ " port\n" +
+ "from\n" +
+ " audit_proxy_heartbeat\n" +
+ "where\n" +
+ " update_time > (NOW() - INTERVAL 2 MINUTE)\n" +
+ "group by\n" +
+ " host,\n" +
+ " port";
+
+ public static final String KEY_AUDIT_PROXY_HEARTBEAT_SQL =
"audit.proxy.heartbeat.sql";
+ public static final String DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL =
+ "replace into audit_proxy_heartbeat (component, host, port)\n" +
+ "values (?, ?, ?)";
+
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
index 08ba84b7c7..04fc9186df 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
@@ -21,5 +21,5 @@ package org.apache.inlong.audit.entities;
* OpenAPI type
*/
public enum ApiType {
- MINUTES, HOUR, DAY, GET_IPS, GET_IDS;
+ MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY, PROXY_HEARTBEAT;
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/AuditProxy.java
similarity index 84%
copy from
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
copy to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/AuditProxy.java
index 08ba84b7c7..c594ae2ffe 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/AuditProxy.java
@@ -17,9 +17,13 @@
package org.apache.inlong.audit.entities;
-/**
- * OpenAPI type
- */
-public enum ApiType {
- MINUTES, HOUR, DAY, GET_IPS, GET_IDS;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class AuditProxy {
+
+ private String host;
+ private int port;
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/Heartbeat.java
similarity index 82%
copy from
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
copy to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/Heartbeat.java
index 08ba84b7c7..985e3365ab 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/Heartbeat.java
@@ -17,9 +17,14 @@
package org.apache.inlong.audit.entities;
-/**
- * OpenAPI type
- */
-public enum ApiType {
- MINUTES, HOUR, DAY, GET_IPS, GET_IDS;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class Heartbeat {
+
+ private String component;
+ private String host;
+ private int port;
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java
new file mode 100644
index 0000000000..1a46d757f0
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.heartbeat;
+
+import org.apache.inlong.audit.cache.AuditProxyCache;
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.JdbcConfig;
+import org.apache.inlong.audit.utils.JdbcUtils;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_SQL;
+
+public class ProxyHeartbeat {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AuditProxyCache.class);
+ private static final ProxyHeartbeat instance = new ProxyHeartbeat();
+ private final BasicDataSource dataSource = new BasicDataSource();
+ private final String heartbeatSQL;
+
+ ProxyHeartbeat() {
+ heartbeatSQL =
+ Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_SQL,
DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL);
+ initDataSource();
+ }
+
+ private void initDataSource() {
+ JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
+ setDataSourceConfig(jdbcConfig);
+ dataSource.setTestOnBorrow(true);
+ dataSource.setValidationQuery("SELECT 1");
+
dataSource.setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS,
+ DEFAULT_DATASOURCE_DETECT_INTERVAL_MS));
+ }
+
+ private void setDataSourceConfig(JdbcConfig jdbcConfig) {
+ dataSource.setDriverClassName(jdbcConfig.getDriverClass());
+ dataSource.setUrl(jdbcConfig.getJdbcUrl());
+ dataSource.setUsername(jdbcConfig.getUserName());
+ dataSource.setPassword(jdbcConfig.getPassword());
+
dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
+ DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
+
dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS,
+ DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS));
+
dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS,
+ DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS));
+
dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
+ DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
+ }
+
+ public static ProxyHeartbeat getInstance() {
+ return instance;
+ }
+
+ public void heartbeat(String component, String host, int port) {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(heartbeatSQL)) {
+ statement.setString(1, component);
+ statement.setString(2, host);
+ statement.setInt(3, port);
+ statement.executeUpdate();
+ } catch (SQLException exception) {
+ LOGGER.error("Heartbeat {} {} {} has exception ", component, host,
port, exception);
+ }
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
index 3474df4968..8693539282 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
@@ -17,6 +17,7 @@
package org.apache.inlong.audit.service;
+import org.apache.inlong.audit.cache.AuditProxyCache;
import org.apache.inlong.audit.cache.DayCache;
import org.apache.inlong.audit.cache.HalfHourCache;
import org.apache.inlong.audit.cache.HourCache;
@@ -24,8 +25,11 @@ import org.apache.inlong.audit.cache.RealTimeQuery;
import org.apache.inlong.audit.cache.TenMinutesCache;
import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.ApiType;
+import org.apache.inlong.audit.entities.AuditComponent;
import org.apache.inlong.audit.entities.AuditCycle;
+import org.apache.inlong.audit.entities.AuditProxy;
import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.heartbeat.ProxyHeartbeat;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
@@ -48,20 +52,24 @@ import java.util.concurrent.Executors;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_BACKLOG_SIZE;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_DAY_PATH;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_AUDIT_PROXY_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IDS_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IPS_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_HOUR_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTES_PATH;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_PROXY_HEART_BEAT_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
import static
org.apache.inlong.audit.config.OpenApiConstants.HTTP_RESPOND_CODE;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_BACKLOG_SIZE;
import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_DAY_PATH;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_AUDIT_PROXY_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IDS_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IPS_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_HOUR_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTES_PATH;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_PROXY_HEART_BEAT_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
@@ -69,8 +77,11 @@ import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT;
+import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_COMPONENT;
import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_CYCLE;
+import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_HOST;
import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_ID;
+import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_PORT;
import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_TAG;
import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_END_TIME;
import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_GROUP_Id;
@@ -80,14 +91,17 @@ import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_START_TIME;
import static
org.apache.inlong.audit.config.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE;
import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
import static org.apache.inlong.audit.entities.ApiType.DAY;
+import static org.apache.inlong.audit.entities.ApiType.GET_AUDIT_PROXY;
import static org.apache.inlong.audit.entities.ApiType.GET_IDS;
import static org.apache.inlong.audit.entities.ApiType.GET_IPS;
import static org.apache.inlong.audit.entities.ApiType.HOUR;
import static org.apache.inlong.audit.entities.ApiType.MINUTES;
+import static org.apache.inlong.audit.entities.ApiType.PROXY_HEARTBEAT;
public class ApiService {
private static final Logger LOGGER =
LoggerFactory.getLogger(ApiService.class);
+
public void start() {
initHttpServer();
}
@@ -113,6 +127,12 @@ public class ApiService {
new AuditHandler(GET_IDS));
server.createContext(Configuration.getInstance().get(KEY_API_GET_IPS_PATH,
DEFAULT_API_GET_IPS_PATH),
new AuditHandler(GET_IPS));
+ server.createContext(
+
Configuration.getInstance().get(KEY_API_GET_AUDIT_PROXY_PATH,
DEFAULT_API_GET_AUDIT_PROXY_PATH),
+ new AuditHandler(GET_AUDIT_PROXY));
+ server.createContext(
+
Configuration.getInstance().get(KEY_API_PROXY_HEART_BEAT_PATH,
DEFAULT_API_PROXY_HEART_BEAT_PATH),
+ new AuditHandler(PROXY_HEARTBEAT));
server.start();
LOGGER.info("Init http server success. Bind port is: {}",
bindPort);
} catch (Exception e) {
@@ -136,6 +156,7 @@ public class ApiService {
@Override
public void handle(HttpExchange exchange) {
+ LOGGER.info("handle {}", exchange.getRequestURI().toString());
if (null != limiter) {
limiter.acquire();
}
@@ -181,6 +202,7 @@ public class ApiService {
}
}
params.putIfAbsent(PARAMS_AUDIT_TAG, DEFAULT_AUDIT_TAG);
+ params.putIfAbsent(PARAMS_AUDIT_COMPONENT,
AuditComponent.COMMON_AUDIT.getComponent());
return params;
}
@@ -206,6 +228,10 @@ public class ApiService {
&& params.containsKey(PARAMS_END_TIME)
&& params.containsKey(PARAMS_AUDIT_ID)
&& params.containsKey(PARAMS_IP);
+ case PROXY_HEARTBEAT:
+ return params.containsKey(PARAMS_AUDIT_HOST) &&
params.containsKey(PARAMS_AUDIT_PORT);
+ case GET_AUDIT_PROXY:
+ return true;
default:
return false;
}
@@ -219,11 +245,14 @@ public class ApiService {
}
private void handleLegalParams(JsonObject responseJson, Map<String,
String> params) {
- List<StatData> statData = null;
+ responseJson.addProperty(KEY_HTTP_BODY_SUCCESS, true);
+ responseJson.addProperty(KEY_HTTP_BODY_ERR_MSG, "");
+ Gson gson = new Gson();
+ List<StatData> statData;
try {
switch (apiType) {
case MINUTES:
- statData = handleMinutesApi(params);
+ responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(handleMinutesApi(params)));
break;
case HOUR:
statData =
HourCache.getInstance().getData(params.get(PARAMS_START_TIME),
@@ -232,6 +261,7 @@ public class ApiService {
params.get(PARAMS_INLONG_STREAM_Id),
params.get(PARAMS_AUDIT_ID),
params.get(PARAMS_AUDIT_TAG));
+ responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
break;
case DAY:
statData = DayCache.getInstance().getData(
@@ -240,6 +270,7 @@ public class ApiService {
params.get(PARAMS_INLONG_GROUP_Id),
params.get(PARAMS_INLONG_STREAM_Id),
params.get(PARAMS_AUDIT_ID));
+ responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
break;
case GET_IDS:
statData = RealTimeQuery.getInstance().queryIdsByIp(
@@ -247,6 +278,7 @@ public class ApiService {
params.get(PARAMS_END_TIME),
params.get(PARAMS_IP),
params.get(PARAMS_AUDIT_ID));
+ responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
break;
case GET_IPS:
statData = RealTimeQuery.getInstance().queryIpsById(
@@ -255,21 +287,26 @@ public class ApiService {
params.get(PARAMS_INLONG_GROUP_Id),
params.get(PARAMS_INLONG_STREAM_Id),
params.get(PARAMS_AUDIT_ID));
+ responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
+ break;
+ case GET_AUDIT_PROXY:
+ List<AuditProxy> auditProxy =
+
AuditProxyCache.getInstance().getData(params.get(PARAMS_AUDIT_COMPONENT));
+ responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(auditProxy));
+ break;
+ case PROXY_HEARTBEAT:
+
ProxyHeartbeat.getInstance().heartbeat(params.get(PARAMS_AUDIT_COMPONENT),
+ params.get(PARAMS_AUDIT_HOST),
Integer.parseInt(params.get(PARAMS_AUDIT_PORT)));
+ responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(new LinkedList<>()));
break;
default:
LOGGER.error("Unsupported interface type! type is {}",
apiType);
+ responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(new LinkedList<>()));
}
} catch (Exception exception) {
LOGGER.error("Handle legal params has exception ", exception);
+ responseJson.add(KEY_HTTP_BODY_ERR_DATA, gson.toJsonTree(new
LinkedList<>()));
}
-
- if (null == statData)
- statData = new LinkedList<>();
-
- responseJson.addProperty(KEY_HTTP_BODY_SUCCESS, true);
- responseJson.addProperty(KEY_HTTP_BODY_ERR_MSG, "");
- Gson gson = new Gson();
- responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
}
private List<StatData> handleMinutesApi(Map<String, String> params) {
diff --git a/inlong-audit/sql/apache_inlong_audit_mysql.sql
b/inlong-audit/sql/apache_inlong_audit_mysql.sql
index 333052c97f..57016d9558 100644
--- a/inlong-audit/sql/apache_inlong_audit_mysql.sql
+++ b/inlong-audit/sql/apache_inlong_audit_mysql.sql
@@ -132,3 +132,30 @@ CREATE TABLE IF NOT EXISTS `audit_source_config`
PRIMARY KEY (`source_name`, `jdbc_url`)
) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit source config';
+-- ----------------------------
+-- Table structure for audit proxy heartbeat
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `audit_proxy_heartbeat`
+(
+ `component` varchar(64) NOT NULL DEFAULT 'Common' COMMENT 'Component name,
such as: Agent, Sort...',
+ `host` varchar(64) NOT NULL COMMENT 'Audit proxy IP',
+ `port` bigint NOT NULL COMMENT 'Audit Proxy Port',
+ `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'Update time',
+ PRIMARY KEY (`component`, `host`, `port`)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT = 'Audit Proxy Heartbeat';
+
+
+-- ----------------------------
+-- Table structure for audit proxy host
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `audit_proxy_host`
+(
+ `component` varchar(64) NOT NULL DEFAULT 'Common' COMMENT 'Component name,
such as: Agent, Sort...',
+ `host` varchar(128) NOT NULL DEFAULT '' COMMENT 'Component instance, can
be ip, name',
+ `port` bigint NOT NULL COMMENT 'Audit Proxy Port',
+ `status` int(11) DEFAULT '1' COMMENT 'Audit source config status.
0:Offline,1:Online',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create
time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`component`, `host`, `port`)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT = 'Audit Porxy Host';
+