This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4ee78b7260 [INLONG-10481][Audit] Optimize Audit domain management
(#10486)
4ee78b7260 is described below
commit 4ee78b726078d019e2549c549828d509b4c080fd
Author: doleyzi <[email protected]>
AuthorDate: Sat Jun 22 10:34:33 2024 +0800
[INLONG-10481][Audit] Optimize Audit domain management (#10486)
---
bin/init-config.sh | 3 +
docker/docker-compose/docker-compose.yml | 2 +
.../apache/inlong/audit/entity/AuditComponent.java | 2 +-
inlong-audit/audit-docker/Dockerfile | 3 +
inlong-audit/audit-docker/audit-docker.sh | 3 +
.../apache/inlong/audit/heartbeat/Heartbeat.java | 135 ------------
.../org/apache/inlong/audit/node/Application.java | 5 -
.../apache/inlong/audit/cache/AuditProxyCache.java | 236 +++++----------------
.../inlong/audit/config/OpenApiConstants.java | 7 -
.../ApiType.java => config/ProxyConstants.java} | 19 +-
.../apache/inlong/audit/config/SqlConstants.java | 65 +-----
.../org/apache/inlong/audit/entities/ApiType.java | 2 +-
.../inlong/audit/heartbeat/ProxyHeartbeat.java | 96 ---------
.../apache/inlong/audit/service/ApiService.java | 25 +--
.../org/apache/inlong/audit/sink/JdbcSink.java | 47 +++-
inlong-audit/conf/audit-service.properties | 8 +-
inlong-audit/sql/apache_inlong_audit_mysql.sql | 54 ++---
17 files changed, 156 insertions(+), 556 deletions(-)
diff --git a/bin/init-config.sh b/bin/init-config.sh
index dfc58d1c97..faf2e6f44d 100644
--- a/bin/init-config.sh
+++ b/bin/init-config.sh
@@ -75,6 +75,9 @@ init_inlong_audit() {
$SED_COMMAND
's#jdbc:mysql://.*apache_inlong_audit#'''jdbc:mysql://${spring_datasource_hostname}:${spring_datasource_port}/apache_inlong_audit'''#g'
audit-service.properties
$SED_COMMAND
's/mysql.username=.*/'''mysql.username=${spring_datasource_username}'''/g'
audit-service.properties
$SED_COMMAND
's/mysql.password=.*/'''mysql.password=${spring_datasource_password}'''/g'
audit-service.properties
+ $SED_COMMAND
's/audit.proxy.address.agent=.*/'''audit.proxy.address.agent=${audit_service_ip}:${audit_proxy_port}'''/g'
audit-service.properties
+ $SED_COMMAND
's/audit.proxy.address.dataproxy=.*/'''audit.proxy.address.dataproxy=${audit_service_ip}:${audit_proxy_port}'''/g'
audit-service.properties
+ $SED_COMMAND
's/audit.proxy.address.sort=.*/'''audit.proxy.address.sort=${audit_service_ip}:${audit_proxy_port}'''/g'
audit-service.properties
}
init_inlong_dataproxy() {
diff --git a/docker/docker-compose/docker-compose.yml
b/docker/docker-compose/docker-compose.yml
index 4ba993a3dc..f086ef0ce7 100644
--- a/docker/docker-compose/docker-compose.yml
+++ b/docker/docker-compose/docker-compose.yml
@@ -114,9 +114,11 @@ services:
- AUDIT_JDBC_PASSWORD=inlong
- MANAGER_OPENAPI_IP=manager
- MANAGER_OPENAPI_PORT=8083
+ - AUDIT_PROXY_ADDRESS=audit:10081
# pulsar or kafka
- MQ_TYPE=pulsar
ports:
+ - "10080:10080"
- "10081:10081"
# flink jobmanager
diff --git
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entity/AuditComponent.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entity/AuditComponent.java
index b0d2e17b54..180d6930a7 100644
---
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entity/AuditComponent.java
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entity/AuditComponent.java
@@ -19,7 +19,7 @@ package org.apache.inlong.audit.entity;
public enum AuditComponent {
- AGENT("Agent"), DATAPROXY("DataProxy"), SORT("Sort"),
COMMON_AUDIT("Common");
+ AGENT("Agent"), DATAPROXY("DataProxy"), SORT("Sort");
private final String component;
/**
diff --git a/inlong-audit/audit-docker/Dockerfile
b/inlong-audit/audit-docker/Dockerfile
index 2f842c4b9e..ce8d4b68a2 100644
--- a/inlong-audit/audit-docker/Dockerfile
+++ b/inlong-audit/audit-docker/Dockerfile
@@ -41,6 +41,9 @@ ENV AUDIT_JDBC_URL=127.0.0.1:3306
ENV AUDIT_JDBC_USERNAME=root
ENV AUDIT_JDBC_PASSWORD=inlong
+# Audit Proxy host
+ENV AUDIT_PROXY_ADDRESS=127.0.0.1:10081
+
# jvm
ENV AUDIT_JVM_HEAP_OPTS="-XX:+UseContainerSupport
-XX:InitialRAMPercentage=40.0 -XX:MaxRAMPercentage=80.0
-XX:-UseAdaptiveSizePolicy"
WORKDIR /opt/inlong-audit
diff --git a/inlong-audit/audit-docker/audit-docker.sh
b/inlong-audit/audit-docker/audit-docker.sh
index 2c73b18c34..354ac70f39 100755
--- a/inlong-audit/audit-docker/audit-docker.sh
+++ b/inlong-audit/audit-docker/audit-docker.sh
@@ -61,6 +61,9 @@ sed -i
"s/audit.store.jdbc.password=.*$/audit.store.jdbc.password=${AUDIT_JDBC_P
sed -i
"s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g"
"${service_conf_file}"
sed -i
"s/mysql.jdbc.username=.*$/mysql.jdbc.username=${AUDIT_JDBC_USERNAME}/g"
"${service_conf_file}"
sed -i
"s/mysql.jdbc.password=.*$/mysql.jdbc.password=${AUDIT_JDBC_PASSWORD}/g"
"${service_conf_file}"
+sed -i "s/audit.proxy.address.agent=.*$/audit.proxy.address.agent =
${AUDIT_PROXY_ADDRESS}/g" "${service_conf_file}"
+sed -i "s/audit.proxy.address.dataproxy=.*$/audit.proxy.address.dataproxy =
${AUDIT_PROXY_ADDRESS}/g" "${service_conf_file}"
+sed -i "s/audit.proxy.address.sort=.*$/audit.proxy.address.sort =
${AUDIT_PROXY_ADDRESS}/g" "${service_conf_file}"
# Whether the database table exists. If it does not exist, initialize the
database and skip if it exists.
if [[ "${AUDIT_JDBC_URL}" =~ (.+):([0-9]+) ]]; then
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java
deleted file mode 100644
index e4f23a6ed8..0000000000
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.heartbeat;
-
-import org.apache.inlong.audit.file.ConfigManager;
-import org.apache.inlong.common.util.NetworkUtils;
-
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-import static org.apache.inlong.audit.entity.AuditComponent.COMMON_AUDIT;
-
-public class Heartbeat {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(Heartbeat.class);
- private final static String HEARTBEAT_PATH = "/audit/proxy/heartbeat";
- private static final String AUDIT_HEARTBEAT_INTERVAL_CONFIG_KEY =
"audit.heartbeat.interval";
- private static final String AUDIT_SERVICE_HOST_CONFIG_KEY =
"audit.service.host";
- private static final String AUDIT_SERVICE_PORT_CONFIG_KEY =
"agent1.sources.tcp-source.port";
- private static final String AUDIT_COMPONENT_CONFIG_KEY = "audit.component";
- private String heartbeatHost;
- private final ScheduledExecutorService timer =
Executors.newSingleThreadScheduledExecutor();
- private final String localIp;
- private final String localPort;
-
- public Heartbeat() {
- localIp = NetworkUtils.getLocalIp();
- localPort = getLocalPort();
- }
-
- public void Start() {
- heartbeatHost = getConfiguredValue(AUDIT_SERVICE_HOST_CONFIG_KEY);
- timer.scheduleWithFixedDelay(this::heartbeat,
- 1,
- getConfiguredInterval(),
- TimeUnit.MINUTES);
- }
-
- private void heartbeat() {
- if (heartbeatHost == null || localPort == null) {
- LOGGER.info("Heartbeat is not configure, Don`t need heartbeat");
- return;
- }
- try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
- URIBuilder uriBuilder = new URIBuilder("http://" + heartbeatHost +
HEARTBEAT_PATH);
- uriBuilder.setParameter("component", getConfiguredComponent());
- uriBuilder.setParameter("host", localIp);
- uriBuilder.setParameter("port", localPort);
-
- HttpGet httpGet = new HttpGet(uriBuilder.build());
-
- try (CloseableHttpResponse response = httpClient.execute(httpGet))
{
- String responseBody =
EntityUtils.toString(response.getEntity());
- LOGGER.info("Heartbeat response: {}", responseBody);
- }
- } catch (Exception exception) {
- LOGGER.error("Heartbeat has exception", exception);
- }
- }
-
- private int getConfiguredInterval() {
- String intervalConfigValue =
getConfiguredValue(AUDIT_HEARTBEAT_INTERVAL_CONFIG_KEY);
- return intervalConfigValue != null ?
Integer.parseInt(intervalConfigValue) : 1;
- }
-
- private String getConfiguredComponent() {
- String intervalConfigComponent =
getConfiguredValue(AUDIT_COMPONENT_CONFIG_KEY);
- return intervalConfigComponent != null ? intervalConfigComponent :
COMMON_AUDIT.getComponent();
- }
-
- private String getConfiguredValue(String configKey) {
- return ConfigManager.getInstance().getValue(configKey);
- }
-
- private String getLocalPort() {
- try (Stream<Path> paths = Files.walk(Paths.get("."))) {
- Path selectedPath = paths.filter(Files::isRegularFile)
- .filter(p -> p.toString().endsWith(".conf"))
- .findFirst()
- .orElse(null);
- if (selectedPath != null) {
- String port = loadProperties(selectedPath);
- LOGGER.info("File: {} , TCP Source Port: {}", selectedPath,
port);
- if (port != null) {
- return port;
- }
- }
- } catch (IOException e) {
- LOGGER.error("Get local port has error", e);
- }
- return null;
- }
-
- private String loadProperties(Path path) {
- Properties prop = new Properties();
- try {
- prop.load(Files.newInputStream(path));
- return prop.getProperty(AUDIT_SERVICE_PORT_CONFIG_KEY);
- } catch (IOException e) {
- LOGGER.error("Load properties has error", e);
- }
- return null;
- }
-}
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
index fcd32fb493..4ab7f29e51 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
@@ -18,7 +18,6 @@
package org.apache.inlong.audit.node;
import org.apache.inlong.audit.file.ConfigManager;
-import org.apache.inlong.audit.heartbeat.Heartbeat;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
@@ -77,8 +76,6 @@ public class Application {
private MonitorService monitorServer;
private final ReentrantLock lifecycleLock = new ReentrantLock();
- private static final Heartbeat heartbeat = new Heartbeat();
-
public Application() {
this(new ArrayList<LifecycleAware>(0));
}
@@ -338,8 +335,6 @@ public class Application {
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
- heartbeat.Start();
-
// start application
application.start();
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
index f05525096c..183900ce66 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
@@ -18,217 +18,91 @@
package org.apache.inlong.audit.cache;
import org.apache.inlong.audit.config.Configuration;
-import org.apache.inlong.audit.entities.JdbcConfig;
import org.apache.inlong.audit.entity.AuditProxy;
-import org.apache.inlong.audit.utils.JdbcUtils;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import org.apache.commons.dbcp.BasicDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_MINUTES;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_MINUTES;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_MAX_SIZE;
-import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static
org.apache.inlong.audit.config.ProxyConstants.DEFAULT_AUDIT_PROXY_ADDRESS_AGENT;
+import static
org.apache.inlong.audit.config.ProxyConstants.DEFAULT_AUDIT_PROXY_ADDRESS_DATAPROXY;
+import static
org.apache.inlong.audit.config.ProxyConstants.DEFAULT_AUDIT_PROXY_ADDRESS_SORT;
+import static org.apache.inlong.audit.config.ProxyConstants.IP_PORT_SEPARATOR;
+import static
org.apache.inlong.audit.config.ProxyConstants.KEY_AUDIT_PROXY_ADDRESS_AGENT;
+import static
org.apache.inlong.audit.config.ProxyConstants.KEY_AUDIT_PROXY_ADDRESS_DATAPROXY;
+import static
org.apache.inlong.audit.config.ProxyConstants.KEY_AUDIT_PROXY_ADDRESS_SORT;
+import static org.apache.inlong.audit.config.ProxyConstants.PROXY_SEPARATOR;
+import static org.apache.inlong.audit.entity.AuditComponent.AGENT;
+import static org.apache.inlong.audit.entity.AuditComponent.DATAPROXY;
+import static org.apache.inlong.audit.entity.AuditComponent.SORT;
public class AuditProxyCache {
private static final Logger LOGGER =
LoggerFactory.getLogger(AuditProxyCache.class);
private static final AuditProxyCache instance = new AuditProxyCache();
- private final Cache<String, HashSet<AuditProxy>> cache;
- protected final ScheduledExecutorService monitorTimer =
Executors.newSingleThreadScheduledExecutor();
- private final BasicDataSource dataSource = new BasicDataSource();
- private final String queryAllAuditProxyHostSQL;
- private final String queryAuditProxyHostByComponentSQL;
-
- private final String queryAuditProxyHeartbeatSQL;
- private final String queryAuditProxyHeartbeatByComponentSQL;
+ private final Map<String, List<AuditProxy>> auditProxyCache = new
HashMap<>();
private AuditProxyCache() {
- cache = Caffeine.newBuilder()
-
.maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE,
- DEFAULT_API_CACHE_MAX_SIZE))
-
.expireAfterWrite(Configuration.getInstance().get(KEY_API_CACHE_EXPIRED_MINUTES,
- DEFAULT_API_CACHE_EXPIRED_MINUTES), TimeUnit.MINUTES)
- .build();
- queryAuditProxyHostByComponentSQL =
Configuration.getInstance().get(KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL,
- DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL);
-
- queryAllAuditProxyHostSQL =
Configuration.getInstance().get(KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL,
- DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL);
- queryAuditProxyHeartbeatSQL =
Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL,
- DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL);
- queryAuditProxyHeartbeatByComponentSQL =
-
Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL,
- DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL);
-
- initDataSource();
- monitorTimer.scheduleWithFixedDelay(new Runnable() {
-
- @Override
- public void run() {
- update();
- }
- }, 0, 1, TimeUnit.MINUTES);
}
- private void initDataSource() {
- JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
- dataSource.setDriverClassName(jdbcConfig.getDriverClass());
- dataSource.setUrl(jdbcConfig.getJdbcUrl());
- dataSource.setUsername(jdbcConfig.getUserName());
- dataSource.setPassword(jdbcConfig.getPassword());
-
dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
- DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
-
dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS,
- DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS));
-
dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS,
- DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS));
-
dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
- DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
- dataSource.setTestOnBorrow(true);
- dataSource.setValidationQuery("SELECT 1");
- dataSource
-
.setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS,
- DEFAULT_DATASOURCE_DETECT_INTERVAL_MS));
+ public boolean init() {
+ return initializeAuditProxyCache();
}
- public static AuditProxyCache getInstance() {
- return instance;
- }
+ private boolean initializeAuditProxyCache() {
+ AtomicBoolean isSuccess = new AtomicBoolean(true);
+ Map<String, String> proxyConfigs = getProxyConfigs();
+ proxyConfigs.forEach((component, proxyList) -> {
+ List<AuditProxy> auditProxies = createAuditProxySet(proxyList);
+ if (auditProxies.isEmpty()) {
+ LOGGER.error("{} Audit Proxy config = {}, is invalid!",
component, proxyList);
+ isSuccess.set(false);
+ } else {
+ LOGGER.info("{} Audit Proxy config = {}", component,
proxyList);
+ auditProxyCache.put(component, auditProxies);
+ }
+ });
- public List<AuditProxy> getData(String component) {
- HashSet<AuditProxy> result = cache.getIfPresent(component);
- if (result != null) {
- return new ArrayList<>(result);
- }
- result = queryAuditProxyInfo(component);
- if (result.isEmpty()) {
- result = queryAuditProxyHeartbeat(component);
- }
- if (!result.isEmpty()) {
- cache.put(component, result);
- }
- return new ArrayList<>(result);
+ return isSuccess.get();
}
- private void update() {
- Map<String, HashSet<AuditProxy>> auditProxyInfo =
queryAllAuditProxyInfo();
- if (auditProxyInfo.isEmpty()) {
- auditProxyInfo = queryAuditProxyHeartbeat();
- }
- if (auditProxyInfo.isEmpty()) {
- return;
- }
- for (Map.Entry<String, HashSet<AuditProxy>> entry :
auditProxyInfo.entrySet()) {
- try {
- cache.put(entry.getKey(), entry.getValue());
- } catch (Exception e) {
- LOGGER.error("Put data into audit proxy cache has exception!
", e);
- // Decide whether to break or continue based on your
requirement
- break;
- }
- }
+ private Map<String, String> getProxyConfigs() {
+ Configuration config = Configuration.getInstance();
+ Map<String, String> proxyConfigs = new HashMap<>();
+ proxyConfigs.put(AGENT.getComponent(),
+ config.get(KEY_AUDIT_PROXY_ADDRESS_AGENT,
DEFAULT_AUDIT_PROXY_ADDRESS_AGENT));
+ proxyConfigs.put(DATAPROXY.getComponent(),
+ config.get(KEY_AUDIT_PROXY_ADDRESS_DATAPROXY,
DEFAULT_AUDIT_PROXY_ADDRESS_DATAPROXY));
+ proxyConfigs.put(SORT.getComponent(),
+ config.get(KEY_AUDIT_PROXY_ADDRESS_SORT,
DEFAULT_AUDIT_PROXY_ADDRESS_SORT));
+ return proxyConfigs;
}
- private Map<String, HashSet<AuditProxy>> queryAllAuditProxyInfo() {
- Map<String, HashSet<AuditProxy>> result = new HashMap<>();
- try (Connection connection = dataSource.getConnection();
- PreparedStatement statement =
connection.prepareStatement(queryAllAuditProxyHostSQL);
- ResultSet resultSet = statement.executeQuery()) {
- while (resultSet.next()) {
- String component = resultSet.getString("component");
- AuditProxy auditProxyInfo = new
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
- result.computeIfAbsent(component, k -> new
HashSet<>()).add(auditProxyInfo);
- }
- } catch (Exception exception) {
- LOGGER.error("Query audit proxy info has exception! ", exception);
- }
- return result;
+ private List<AuditProxy> createAuditProxySet(String proxyList) {
+ return Arrays.stream(proxyList.split(PROXY_SEPARATOR))
+ .map(element -> element.split(IP_PORT_SEPARATOR))
+ .filter(ipPort -> ipPort.length == 2)
+ .map(ipPort -> new AuditProxy(ipPort[0],
Integer.parseInt(ipPort[1])))
+ .collect(Collectors.toList());
}
- private HashSet<AuditProxy> queryAuditProxyInfo(String component) {
- HashSet<AuditProxy> result = new HashSet<>();
- try (Connection connection = dataSource.getConnection();
- PreparedStatement statement =
connection.prepareStatement(queryAuditProxyHostByComponentSQL)) {
- statement.setString(1, component);
- try (ResultSet resultSet = statement.executeQuery()) {
- while (resultSet.next()) {
- AuditProxy auditProxyInfo = new
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
- result.add(auditProxyInfo);
- }
- } catch (SQLException sqlException) {
- LOGGER.error("Query audit proxy info by {} has SQL exception
", component, sqlException);
- }
- } catch (Exception exception) {
- LOGGER.error("Query audit proxy info by {} has exception ",
component, exception);
- }
- return result;
+ public static AuditProxyCache getInstance() {
+ return instance;
}
- private Map<String, HashSet<AuditProxy>> queryAuditProxyHeartbeat() {
- Map<String, HashSet<AuditProxy>> result = new HashMap<>();
- try (Connection connection = dataSource.getConnection();
- PreparedStatement statement =
connection.prepareStatement(queryAuditProxyHeartbeatSQL);
- ResultSet resultSet = statement.executeQuery()) {
- while (resultSet.next()) {
- String component = resultSet.getString("component");
- AuditProxy auditProxyInfo = new
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
- result.computeIfAbsent(component, k -> new
HashSet<>()).add(auditProxyInfo);
- }
- } catch (Exception exception) {
- LOGGER.error("Query audit proxy info has exception! ", exception);
+ public List<AuditProxy> getData(String component) {
+ List<AuditProxy> result = auditProxyCache.get(component);
+ if (result == null) {
+ return new LinkedList<>();
}
return result;
}
- private HashSet<AuditProxy> queryAuditProxyHeartbeat(String component) {
- HashSet<AuditProxy> result = new HashSet<>();
- try (Connection connection = dataSource.getConnection();
- PreparedStatement statement =
connection.prepareStatement(queryAuditProxyHeartbeatByComponentSQL)) {
- statement.setString(1, component);
- try (ResultSet resultSet = statement.executeQuery()) {
- while (resultSet.next()) {
- AuditProxy auditProxyInfo = new
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
- result.add(auditProxyInfo);
- }
- } catch (SQLException sqlException) {
- LOGGER.error("Query audit proxy info by {} has SQL exception
", component, sqlException);
- }
- } catch (Exception exception) {
- LOGGER.error("Query audit proxy info by {} has exception ",
component, exception);
- }
- return result;
- }
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
index 1c22a28fad..a727eba46b 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
@@ -35,8 +35,6 @@ public class OpenApiConstants {
public static final String DEFAULT_API_GET_IDS_PATH =
"/audit/query/getIds";
public static final String KEY_API_GET_AUDIT_PROXY_PATH =
"api.get.audit.proxy";
public static final String DEFAULT_API_GET_AUDIT_PROXY_PATH =
"/audit/query/getAuditProxy";
- public static final String KEY_API_PROXY_HEART_BEAT_PATH =
"api.proxy.heartbeat";
- public static final String DEFAULT_API_PROXY_HEART_BEAT_PATH =
"/audit/proxy/heartbeat";
public static final String KEY_API_THREAD_POOL_SIZE =
"api.thread.pool.size";
public static final int DEFAULT_API_THREAD_POOL_SIZE = 10;
public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
@@ -51,9 +49,6 @@ public class OpenApiConstants {
public static final String KEY_API_CACHE_EXPIRED_HOURS =
"api.cache.expired.hours";
public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
- public static final String KEY_API_CACHE_EXPIRED_MINUTES =
"api.cache.expired.minutes";
- public static final int DEFAULT_API_CACHE_EXPIRED_MINUTES = 1;
-
// Http config
public static final String PARAMS_START_TIME = "startTime";
public static final String PARAMS_END_TIME = "endTime";
@@ -72,6 +67,4 @@ public class OpenApiConstants {
public static final int DEFAULT_HTTP_SERVER_BIND_PORT = 10080;
public static final int HTTP_RESPOND_CODE = 200;
public static final String PARAMS_AUDIT_COMPONENT = "component";
- public static final String PARAMS_AUDIT_HOST = "host";
- public static final String PARAMS_AUDIT_PORT = "port";
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ProxyConstants.java
similarity index 53%
copy from
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
copy to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ProxyConstants.java
index 04fc9186df..be689f8c8b 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ProxyConstants.java
@@ -15,11 +15,22 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.entities;
+package org.apache.inlong.audit.config;
/**
- * OpenAPI type
+ * Proxy constants
*/
-public enum ApiType {
- MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY, PROXY_HEARTBEAT;
+public class ProxyConstants {
+
+ public static final String PROXY_SEPARATOR = ";";
+ public static final String IP_PORT_SEPARATOR = ":";
+ public static final String KEY_AUDIT_PROXY_ADDRESS_AGENT =
"audit.proxy.address.agent";
+ public static final String DEFAULT_AUDIT_PROXY_ADDRESS_AGENT = "";
+
+ public static final String KEY_AUDIT_PROXY_ADDRESS_DATAPROXY =
"audit.proxy.address.dataproxy";
+ public static final String DEFAULT_AUDIT_PROXY_ADDRESS_DATAPROXY = "";
+
+ public static final String KEY_AUDIT_PROXY_ADDRESS_SORT =
"audit.proxy.address.sort";
+ public static final String DEFAULT_AUDIT_PROXY_ADDRESS_SORT = "";
+
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index 9eb3950ea5..1cc794466c 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -163,68 +163,6 @@ public class SqlConstants {
public static final String DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL =
"replace into audit_data_temp (log_ts,inlong_group_id,
inlong_stream_id, audit_id,audit_tag,count, size, delay) "
+ " values (?,?,?,?,?,?,?,?)";
-
- public static final String KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL =
"audit.proxy.host.query.component.sql";
- public static final String DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL =
- "select\n" +
- " host as host,\n" +
- " port as port \n" +
- "from\n" +
- " audit_proxy_host\n" +
- "where\n" +
- " LOWER(component) = LOWER(?)\n" +
- " and status = 1\n" +
- "group by\n" +
- " host,\n" +
- " port";
-
- public static final String KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL =
"audit.proxy.host.query.all.sql";
- public static final String DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL =
- "select\n" +
- " component,\n" +
- " host as host,\n" +
- " port as port\n" +
- "from\n" +
- " audit_proxy_host\n" +
- "where\n" +
- " status = 1\n" +
- "group by\n" +
- " component,\n" +
- " host,\n" +
- " port";
-
- public static final String KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL =
"audit.proxy.heartbeat.query.sql";
- public static final String
DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL =
- "select\n" +
- " host,\n" +
- " port\n" +
- "from\n" +
- " audit_proxy_heartbeat\n" +
- "where\n" +
- " LOWER(component) = LOWER(?)\n" +
- " and update_time > (NOW() - INTERVAL 2 MINUTE)\n" +
- "group by\n" +
- " host,\n" +
- " port ";
-
- public static final String KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL =
"audit.proxy.heartbeat.query.all.sql";
- public static final String DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL =
- "select\n" +
- " host,\n" +
- " port\n" +
- "from\n" +
- " audit_proxy_heartbeat\n" +
- "where\n" +
- " update_time > (NOW() - INTERVAL 2 MINUTE)\n" +
- "group by\n" +
- " host,\n" +
- " port";
-
- public static final String KEY_AUDIT_PROXY_HEARTBEAT_SQL =
"audit.proxy.heartbeat.sql";
- public static final String DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL =
- "replace into audit_proxy_heartbeat (component, host, port)\n" +
- "values (?, ?, ?)";
-
public static final String KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL =
"audit.data.temp.add.partition.sql";
public static final String DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL =
"ALTER TABLE audit_data_temp ADD PARTITION (PARTITION %s VALUES
LESS THAN (TO_DAYS('%s')))";
@@ -233,4 +171,7 @@ public class SqlConstants {
public static final String DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL =
"ALTER TABLE audit_data_temp DROP PARTITION %s";
+ public static final String KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL =
"audit.data.temp.check.partition.sql";
+ public static final String DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL =
+ "SELECT COUNT(*) AS count FROM INFORMATION_SCHEMA.PARTITIONS WHERE
TABLE_NAME = 'audit_data_temp' and PARTITION_NAME = ?";
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
index 04fc9186df..d87b7e3aa5 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
@@ -21,5 +21,5 @@ package org.apache.inlong.audit.entities;
* OpenAPI type
*/
public enum ApiType {
- MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY, PROXY_HEARTBEAT;
+ MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY;
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java
deleted file mode 100644
index 1a46d757f0..0000000000
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.heartbeat;
-
-import org.apache.inlong.audit.cache.AuditProxyCache;
-import org.apache.inlong.audit.config.Configuration;
-import org.apache.inlong.audit.entities.JdbcConfig;
-import org.apache.inlong.audit.utils.JdbcUtils;
-
-import org.apache.commons.dbcp.BasicDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
-import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_SQL;
-
-public class ProxyHeartbeat {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(AuditProxyCache.class);
- private static final ProxyHeartbeat instance = new ProxyHeartbeat();
- private final BasicDataSource dataSource = new BasicDataSource();
- private final String heartbeatSQL;
-
- ProxyHeartbeat() {
- heartbeatSQL =
- Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_SQL,
DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL);
- initDataSource();
- }
-
- private void initDataSource() {
- JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
- setDataSourceConfig(jdbcConfig);
- dataSource.setTestOnBorrow(true);
- dataSource.setValidationQuery("SELECT 1");
-
dataSource.setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS,
- DEFAULT_DATASOURCE_DETECT_INTERVAL_MS));
- }
-
- private void setDataSourceConfig(JdbcConfig jdbcConfig) {
- dataSource.setDriverClassName(jdbcConfig.getDriverClass());
- dataSource.setUrl(jdbcConfig.getJdbcUrl());
- dataSource.setUsername(jdbcConfig.getUserName());
- dataSource.setPassword(jdbcConfig.getPassword());
-
dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
- DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
-
dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS,
- DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS));
-
dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS,
- DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS));
-
dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
- DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
- }
-
- public static ProxyHeartbeat getInstance() {
- return instance;
- }
-
- public void heartbeat(String component, String host, int port) {
- try (Connection connection = dataSource.getConnection();
- PreparedStatement statement =
connection.prepareStatement(heartbeatSQL)) {
- statement.setString(1, component);
- statement.setString(2, host);
- statement.setInt(3, port);
- statement.executeUpdate();
- } catch (SQLException exception) {
- LOGGER.error("Heartbeat {} {} {} has exception ", component, host,
port, exception);
- }
- }
-}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
index 3861ff60a2..fa1c56ab23 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
@@ -27,9 +27,7 @@ import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.ApiType;
import org.apache.inlong.audit.entities.AuditCycle;
import org.apache.inlong.audit.entities.StatData;
-import org.apache.inlong.audit.entity.AuditComponent;
import org.apache.inlong.audit.entity.AuditProxy;
-import org.apache.inlong.audit.heartbeat.ProxyHeartbeat;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
@@ -57,7 +55,6 @@ import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_ID
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IPS_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_HOUR_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTES_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_PROXY_HEART_BEAT_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
@@ -69,7 +66,6 @@ import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IDS_PA
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IPS_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_HOUR_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTES_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_PROXY_HEART_BEAT_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
@@ -79,9 +75,7 @@ import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_HEADER_CO
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT;
import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_COMPONENT;
import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_CYCLE;
-import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_HOST;
import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_ID;
-import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_PORT;
import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_TAG;
import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_END_TIME;
import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_GROUP_Id;
@@ -96,13 +90,17 @@ import static
org.apache.inlong.audit.entities.ApiType.GET_IDS;
import static org.apache.inlong.audit.entities.ApiType.GET_IPS;
import static org.apache.inlong.audit.entities.ApiType.HOUR;
import static org.apache.inlong.audit.entities.ApiType.MINUTES;
-import static org.apache.inlong.audit.entities.ApiType.PROXY_HEARTBEAT;
public class ApiService {
private static final Logger LOGGER =
LoggerFactory.getLogger(ApiService.class);
public void start() {
+ if (!AuditProxyCache.getInstance().init()) {
+ LOGGER.error("Audit Proxy cache init failed! exit...");
+ System.exit(1);
+ }
+
initHttpServer();
}
@@ -130,9 +128,6 @@ public class ApiService {
server.createContext(
Configuration.getInstance().get(KEY_API_GET_AUDIT_PROXY_PATH,
DEFAULT_API_GET_AUDIT_PROXY_PATH),
new AuditHandler(GET_AUDIT_PROXY));
- server.createContext(
-
Configuration.getInstance().get(KEY_API_PROXY_HEART_BEAT_PATH,
DEFAULT_API_PROXY_HEART_BEAT_PATH),
- new AuditHandler(PROXY_HEARTBEAT));
server.start();
LOGGER.info("Init http server success. Bind port is: {}",
bindPort);
} catch (Exception e) {
@@ -202,7 +197,6 @@ public class ApiService {
}
}
params.putIfAbsent(PARAMS_AUDIT_TAG, DEFAULT_AUDIT_TAG);
- params.putIfAbsent(PARAMS_AUDIT_COMPONENT,
AuditComponent.COMMON_AUDIT.getComponent());
return params;
}
@@ -228,10 +222,8 @@ public class ApiService {
&& params.containsKey(PARAMS_END_TIME)
&& params.containsKey(PARAMS_AUDIT_ID)
&& params.containsKey(PARAMS_IP);
- case PROXY_HEARTBEAT:
- return params.containsKey(PARAMS_AUDIT_HOST) &&
params.containsKey(PARAMS_AUDIT_PORT);
case GET_AUDIT_PROXY:
- return true;
+ return params.containsKey(PARAMS_AUDIT_COMPONENT);
default:
return false;
}
@@ -294,11 +286,6 @@ public class ApiService {
AuditProxyCache.getInstance().getData(params.get(PARAMS_AUDIT_COMPONENT));
responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(auditProxy));
break;
- case PROXY_HEARTBEAT:
-
ProxyHeartbeat.getInstance().heartbeat(params.get(PARAMS_AUDIT_COMPONENT),
- params.get(PARAMS_AUDIT_HOST),
Integer.parseInt(params.get(PARAMS_AUDIT_PORT)));
- responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(new LinkedList<>()));
- break;
default:
LOGGER.error("Unsupported interface type! type is {}",
apiType);
responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(new LinkedList<>()));
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
index d6d4781d3d..ac9afc50d4 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
@@ -31,6 +31,8 @@ import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executors;
@@ -63,8 +65,10 @@ import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_
import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
/**
@@ -83,6 +87,7 @@ public class JdbcSink implements AutoCloseable {
private final DateTimeFormatter FORMATTER_YYMMDDHH =
DateTimeFormatter.ofPattern("yyyyMMdd");
private final DateTimeFormatter FORMATTER_YY_MM_DD_HH =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ private final String checkPartitionSql;
public JdbcSink(DataQueue dataQueue, SinkConfig sinkConfig) {
this.dataQueue = dataQueue;
@@ -93,6 +98,9 @@ public class JdbcSink implements AutoCloseable {
pullTimeOut = Configuration.getInstance().get(KEY_QUEUE_PULL_TIMEOUT,
DEFAULT_QUEUE_PULL_TIMEOUT);
+ checkPartitionSql =
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL,
+ DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL);
+
}
/**
@@ -108,6 +116,9 @@ public class JdbcSink implements AutoCloseable {
TimeUnit.MILLISECONDS);
if (Configuration.getInstance().get(KEY_ENABLE_MANAGE_PARTITIONS,
DEFAULT_ENABLE_MANAGE_PARTITIONS)) {
+ // Create MySQL data partition of today
+ addPartition(0);
+
partitionManagerTimer.scheduleWithFixedDelay(this::managePartitions,
0,
Configuration.getInstance().get(KEY_CHECK_PARTITION_INTERVAL_HOURS,
@@ -178,7 +189,9 @@ public class JdbcSink implements AutoCloseable {
}
private void managePartitions() {
- addPartition();
+ // Create MySQL data partition of tomorrow
+ addPartition(1);
+
deletePartition();
}
@@ -186,9 +199,13 @@ public class JdbcSink implements AutoCloseable {
return "p" + date.format(FORMATTER_YYMMDDHH);
}
- private void addPartition() {
- String partitionName =
formatPartitionName(LocalDate.now().plusDays(1));
- String partitionValue =
LocalDate.now().plusDays(2).format(FORMATTER_YY_MM_DD_HH);
+ private void addPartition(long daysToAdd) {
+ String partitionName =
formatPartitionName(LocalDate.now().plusDays(daysToAdd));
+ if (isPartitionExists(partitionName)) {
+ LOGGER.info("Partition [{}] is exist, dont`t need add this
partition.", partitionName);
+ return;
+ }
+ String partitionValue = LocalDate.now().plusDays(daysToAdd +
1).format(FORMATTER_YY_MM_DD_HH);
String addPartitionSQL = String.format(
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL,
DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL),
@@ -200,6 +217,10 @@ public class JdbcSink implements AutoCloseable {
int daysToSubtract =
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_STORAGE_DAYS,
DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS);
String partitionName =
formatPartitionName(LocalDate.now().minusDays(daysToSubtract));
+ if (!isPartitionExists(partitionName)) {
+ LOGGER.info("Partition [{}] is not exist, dont`t need delete this
partition.", partitionName);
+ return;
+ }
String deletePartitionSQL = String.format(
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL,
DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL),
@@ -217,6 +238,24 @@ public class JdbcSink implements AutoCloseable {
}
}
+ private boolean isPartitionExists(String partitionName) {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(checkPartitionSql)) {
+ statement.setString(1, partitionName);
+
+ try (ResultSet resultSet = statement.executeQuery()) {
+ if (resultSet.next()) {
+ return resultSet.getInt("count") > 0;
+ }
+ } catch (SQLException sqlException) {
+ LOGGER.error("An error occurred while checking partition [{}]
existence:", partitionName, sqlException);
+ }
+ } catch (Exception exception) {
+ LOGGER.error("An exception occurred while checking partition
[{}]existence:", partitionName, exception);
+ }
+ return false;
+ }
+
public void destroy() {
sinkTimer.shutdown();
}
diff --git a/inlong-audit/conf/audit-service.properties
b/inlong-audit/conf/audit-service.properties
index 4d74ff6959..6c24c96105 100644
--- a/inlong-audit/conf/audit-service.properties
+++ b/inlong-audit/conf/audit-service.properties
@@ -19,4 +19,10 @@
# mysql config
mysql.jdbc.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useUnicode=true&rewriteBatchedStatements=true
mysql.username=root
-mysql.password=inlong
\ No newline at end of file
+mysql.password=inlong
+
+# Audit Proxy config.
+# If there are multiple addresses, please configure address according to the
format: address;address,for example: 127.0.0.1:10081;127.0.0.2:10081
+audit.proxy.address.agent=127.0.0.1:10081
+audit.proxy.address.dataproxy=127.0.0.1:10081
+audit.proxy.address.sort=127.0.0.1:10081
\ No newline at end of file
diff --git a/inlong-audit/sql/apache_inlong_audit_mysql.sql
b/inlong-audit/sql/apache_inlong_audit_mysql.sql
index 57016d9558..e9e114e2e0 100644
--- a/inlong-audit/sql/apache_inlong_audit_mysql.sql
+++ b/inlong-audit/sql/apache_inlong_audit_mysql.sql
@@ -58,20 +58,21 @@ DEFAULT CHARSET = UTF8 COMMENT ='Inlong audit data table';
-- You can create daily partitions or hourly partitions through the log_ts
field.
-- The specific partition type is determined based on the actual data volume.
-- ----------------------------
-CREATE TABLE IF NOT EXISTS `audit_data_temp`
-(
- `log_ts` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT
'log timestamp',
- `inlong_group_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target
inlong group id',
- `inlong_stream_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target
inlong stream id',
- `audit_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id',
- `audit_tag` varchar(100) DEFAULT '' COMMENT 'Audit tag',
- `count` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message
count',
- `size` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message
size',
- `delay` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message
delay count',
- `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON
UPDATE CURRENT_TIMESTAMP COMMENT 'Update time',
- PRIMARY KEY
(`log_ts`,`inlong_group_id`,`inlong_stream_id`,`audit_id`,`audit_tag`)
+CREATE TABLE IF NOT EXISTS `audit_data_temp` (
+ `log_ts` datetime NOT NULL DEFAULT '0000-00-00 00:00:00'
COMMENT 'log timestamp',
+ `audit_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id',
+ `inlong_group_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target
inlong group id',
+ `inlong_stream_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target
inlong stream id',
+ `audit_tag` varchar(100) DEFAULT '' COMMENT 'Audit tag',
+ `count` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message count',
+ `size` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message size',
+ `delay` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message delay
count',
+ `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'Update time',
+ PRIMARY KEY
(`log_ts`,`audit_id`,`inlong_group_id`,`inlong_stream_id`,`audit_tag`)
) ENGINE = InnoDB
-DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data temp table';
+DEFAULT CHARSET = UTF8 COMMENT ='InLong audit data temp table'
+PARTITION BY RANGE (to_days(`log_ts`))
+(PARTITION pDefault VALUES LESS THAN (TO_DAYS('1970-01-01')));
-- ----------------------------
-- Table structure for audit_data_day
@@ -132,30 +133,3 @@ CREATE TABLE IF NOT EXISTS `audit_source_config`
PRIMARY KEY (`source_name`, `jdbc_url`)
) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit source config';
--- ----------------------------
--- Table structure for audit proxy heartbeat
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `audit_proxy_heartbeat`
-(
- `component` varchar(64) NOT NULL DEFAULT 'Common' COMMENT 'Component name,
such as: Agent, Sort...',
- `host` varchar(64) NOT NULL COMMENT 'Audit proxy IP',
- `port` bigint NOT NULL COMMENT 'Audit Proxy Port',
- `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'Update time',
- PRIMARY KEY (`component`, `host`, `port`)
-) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT = 'Audit Proxy Heartbeat';
-
-
--- ----------------------------
--- Table structure for audit proxy host
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `audit_proxy_host`
-(
- `component` varchar(64) NOT NULL DEFAULT 'Common' COMMENT 'Component name,
such as: Agent, Sort...',
- `host` varchar(128) NOT NULL DEFAULT '' COMMENT 'Component instance, can
be ip, name',
- `port` bigint NOT NULL COMMENT 'Audit Proxy Port',
- `status` int(11) DEFAULT '1' COMMENT 'Audit source config status.
0:Offline,1:Online',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create
time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`component`, `host`, `port`)
-) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT = 'Audit Porxy Host';
-