This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ee78b7260 [INLONG-10481][Audit] Optimize Audit domain management 
(#10486)
4ee78b7260 is described below

commit 4ee78b726078d019e2549c549828d509b4c080fd
Author: doleyzi <[email protected]>
AuthorDate: Sat Jun 22 10:34:33 2024 +0800

    [INLONG-10481][Audit] Optimize Audit domain management (#10486)
---
 bin/init-config.sh                                 |   3 +
 docker/docker-compose/docker-compose.yml           |   2 +
 .../apache/inlong/audit/entity/AuditComponent.java |   2 +-
 inlong-audit/audit-docker/Dockerfile               |   3 +
 inlong-audit/audit-docker/audit-docker.sh          |   3 +
 .../apache/inlong/audit/heartbeat/Heartbeat.java   | 135 ------------
 .../org/apache/inlong/audit/node/Application.java  |   5 -
 .../apache/inlong/audit/cache/AuditProxyCache.java | 236 +++++----------------
 .../inlong/audit/config/OpenApiConstants.java      |   7 -
 .../ApiType.java => config/ProxyConstants.java}    |  19 +-
 .../apache/inlong/audit/config/SqlConstants.java   |  65 +-----
 .../org/apache/inlong/audit/entities/ApiType.java  |   2 +-
 .../inlong/audit/heartbeat/ProxyHeartbeat.java     |  96 ---------
 .../apache/inlong/audit/service/ApiService.java    |  25 +--
 .../org/apache/inlong/audit/sink/JdbcSink.java     |  47 +++-
 inlong-audit/conf/audit-service.properties         |   8 +-
 inlong-audit/sql/apache_inlong_audit_mysql.sql     |  54 ++---
 17 files changed, 156 insertions(+), 556 deletions(-)

diff --git a/bin/init-config.sh b/bin/init-config.sh
index dfc58d1c97..faf2e6f44d 100644
--- a/bin/init-config.sh
+++ b/bin/init-config.sh
@@ -75,6 +75,9 @@ init_inlong_audit() {
   $SED_COMMAND 
's#jdbc:mysql://.*apache_inlong_audit#'''jdbc:mysql://${spring_datasource_hostname}:${spring_datasource_port}/apache_inlong_audit'''#g'
 audit-service.properties
   $SED_COMMAND 
's/mysql.username=.*/'''mysql.username=${spring_datasource_username}'''/g' 
audit-service.properties
   $SED_COMMAND 
's/mysql.password=.*/'''mysql.password=${spring_datasource_password}'''/g' 
audit-service.properties
+  $SED_COMMAND 
's/audit.proxy.address.agent=.*/'''audit.proxy.address.agent=${audit_service_ip}:${audit_proxy_port}'''/g'
 audit-service.properties
+  $SED_COMMAND 
's/audit.proxy.address.dataproxy=.*/'''audit.proxy.address.dataproxy=${audit_service_ip}:${audit_proxy_port}'''/g'
 audit-service.properties
+  $SED_COMMAND 
's/audit.proxy.address.sort=.*/'''audit.proxy.address.sort=${audit_service_ip}:${audit_proxy_port}'''/g'
 audit-service.properties
 }
 
 init_inlong_dataproxy() {
diff --git a/docker/docker-compose/docker-compose.yml 
b/docker/docker-compose/docker-compose.yml
index 4ba993a3dc..f086ef0ce7 100644
--- a/docker/docker-compose/docker-compose.yml
+++ b/docker/docker-compose/docker-compose.yml
@@ -114,9 +114,11 @@ services:
       - AUDIT_JDBC_PASSWORD=inlong
       - MANAGER_OPENAPI_IP=manager
       - MANAGER_OPENAPI_PORT=8083
+      - AUDIT_PROXY_ADDRESS=audit:10081
       # pulsar or kafka
       - MQ_TYPE=pulsar
     ports:
+      - "10080:10080"
       - "10081:10081"
 
   # flink jobmanager
diff --git 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entity/AuditComponent.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entity/AuditComponent.java
index b0d2e17b54..180d6930a7 100644
--- 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entity/AuditComponent.java
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entity/AuditComponent.java
@@ -19,7 +19,7 @@ package org.apache.inlong.audit.entity;
 
 public enum AuditComponent {
 
-    AGENT("Agent"), DATAPROXY("DataProxy"), SORT("Sort"), 
COMMON_AUDIT("Common");
+    AGENT("Agent"), DATAPROXY("DataProxy"), SORT("Sort");
     private final String component;
 
     /**
diff --git a/inlong-audit/audit-docker/Dockerfile 
b/inlong-audit/audit-docker/Dockerfile
index 2f842c4b9e..ce8d4b68a2 100644
--- a/inlong-audit/audit-docker/Dockerfile
+++ b/inlong-audit/audit-docker/Dockerfile
@@ -41,6 +41,9 @@ ENV AUDIT_JDBC_URL=127.0.0.1:3306
 ENV AUDIT_JDBC_USERNAME=root
 ENV AUDIT_JDBC_PASSWORD=inlong
 
+# Audit Proxy host
+ENV AUDIT_PROXY_ADDRESS=127.0.0.1:10081
+
 # jvm
 ENV AUDIT_JVM_HEAP_OPTS="-XX:+UseContainerSupport 
-XX:InitialRAMPercentage=40.0 -XX:MaxRAMPercentage=80.0 
-XX:-UseAdaptiveSizePolicy"
 WORKDIR /opt/inlong-audit
diff --git a/inlong-audit/audit-docker/audit-docker.sh 
b/inlong-audit/audit-docker/audit-docker.sh
index 2c73b18c34..354ac70f39 100755
--- a/inlong-audit/audit-docker/audit-docker.sh
+++ b/inlong-audit/audit-docker/audit-docker.sh
@@ -61,6 +61,9 @@ sed -i 
"s/audit.store.jdbc.password=.*$/audit.store.jdbc.password=${AUDIT_JDBC_P
 sed -i 
"s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g"
 "${service_conf_file}"
 sed -i 
"s/mysql.jdbc.username=.*$/mysql.jdbc.username=${AUDIT_JDBC_USERNAME}/g" 
"${service_conf_file}"
 sed -i 
"s/mysql.jdbc.password=.*$/mysql.jdbc.password=${AUDIT_JDBC_PASSWORD}/g" 
"${service_conf_file}"
+sed -i "s/audit.proxy.address.agent=.*$/audit.proxy.address.agent = 
${AUDIT_PROXY_ADDRESS}/g" "${service_conf_file}"
+sed -i "s/audit.proxy.address.dataproxy=.*$/audit.proxy.address.dataproxy = 
${AUDIT_PROXY_ADDRESS}/g" "${service_conf_file}"
+sed -i "s/audit.proxy.address.sort=.*$/audit.proxy.address.sort = 
${AUDIT_PROXY_ADDRESS}/g" "${service_conf_file}"
 
 # Whether the database table exists. If it does not exist, initialize the 
database and skip if it exists.
 if [[ "${AUDIT_JDBC_URL}" =~ (.+):([0-9]+) ]]; then
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java
deleted file mode 100644
index e4f23a6ed8..0000000000
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.heartbeat;
-
-import org.apache.inlong.audit.file.ConfigManager;
-import org.apache.inlong.common.util.NetworkUtils;
-
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-import static org.apache.inlong.audit.entity.AuditComponent.COMMON_AUDIT;
-
-public class Heartbeat {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(Heartbeat.class);
-    private final static String HEARTBEAT_PATH = "/audit/proxy/heartbeat";
-    private static final String AUDIT_HEARTBEAT_INTERVAL_CONFIG_KEY = 
"audit.heartbeat.interval";
-    private static final String AUDIT_SERVICE_HOST_CONFIG_KEY = 
"audit.service.host";
-    private static final String AUDIT_SERVICE_PORT_CONFIG_KEY = 
"agent1.sources.tcp-source.port";
-    private static final String AUDIT_COMPONENT_CONFIG_KEY = "audit.component";
-    private String heartbeatHost;
-    private final ScheduledExecutorService timer = 
Executors.newSingleThreadScheduledExecutor();
-    private final String localIp;
-    private final String localPort;
-
-    public Heartbeat() {
-        localIp = NetworkUtils.getLocalIp();
-        localPort = getLocalPort();
-    }
-
-    public void Start() {
-        heartbeatHost = getConfiguredValue(AUDIT_SERVICE_HOST_CONFIG_KEY);
-        timer.scheduleWithFixedDelay(this::heartbeat,
-                1,
-                getConfiguredInterval(),
-                TimeUnit.MINUTES);
-    }
-
-    private void heartbeat() {
-        if (heartbeatHost == null || localPort == null) {
-            LOGGER.info("Heartbeat is not configure, Don`t need heartbeat");
-            return;
-        }
-        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
-            URIBuilder uriBuilder = new URIBuilder("http://"; + heartbeatHost + 
HEARTBEAT_PATH);
-            uriBuilder.setParameter("component", getConfiguredComponent());
-            uriBuilder.setParameter("host", localIp);
-            uriBuilder.setParameter("port", localPort);
-
-            HttpGet httpGet = new HttpGet(uriBuilder.build());
-
-            try (CloseableHttpResponse response = httpClient.execute(httpGet)) 
{
-                String responseBody = 
EntityUtils.toString(response.getEntity());
-                LOGGER.info("Heartbeat response: {}", responseBody);
-            }
-        } catch (Exception exception) {
-            LOGGER.error("Heartbeat has exception", exception);
-        }
-    }
-
-    private int getConfiguredInterval() {
-        String intervalConfigValue = 
getConfiguredValue(AUDIT_HEARTBEAT_INTERVAL_CONFIG_KEY);
-        return intervalConfigValue != null ? 
Integer.parseInt(intervalConfigValue) : 1;
-    }
-
-    private String getConfiguredComponent() {
-        String intervalConfigComponent = 
getConfiguredValue(AUDIT_COMPONENT_CONFIG_KEY);
-        return intervalConfigComponent != null ? intervalConfigComponent : 
COMMON_AUDIT.getComponent();
-    }
-
-    private String getConfiguredValue(String configKey) {
-        return ConfigManager.getInstance().getValue(configKey);
-    }
-
-    private String getLocalPort() {
-        try (Stream<Path> paths = Files.walk(Paths.get("."))) {
-            Path selectedPath = paths.filter(Files::isRegularFile)
-                    .filter(p -> p.toString().endsWith(".conf"))
-                    .findFirst()
-                    .orElse(null);
-            if (selectedPath != null) {
-                String port = loadProperties(selectedPath);
-                LOGGER.info("File: {} , TCP Source Port: {}", selectedPath, 
port);
-                if (port != null) {
-                    return port;
-                }
-            }
-        } catch (IOException e) {
-            LOGGER.error("Get local port has error", e);
-        }
-        return null;
-    }
-
-    private String loadProperties(Path path) {
-        Properties prop = new Properties();
-        try {
-            prop.load(Files.newInputStream(path));
-            return prop.getProperty(AUDIT_SERVICE_PORT_CONFIG_KEY);
-        } catch (IOException e) {
-            LOGGER.error("Load properties has error", e);
-        }
-        return null;
-    }
-}
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
index fcd32fb493..4ab7f29e51 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.audit.node;
 
 import org.apache.inlong.audit.file.ConfigManager;
-import org.apache.inlong.audit.heartbeat.Heartbeat;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
@@ -77,8 +76,6 @@ public class Application {
     private MonitorService monitorServer;
     private final ReentrantLock lifecycleLock = new ReentrantLock();
 
-    private static final Heartbeat heartbeat = new Heartbeat();
-
     public Application() {
         this(new ArrayList<LifecycleAware>(0));
     }
@@ -338,8 +335,6 @@ public class Application {
                 
application.handleConfigurationEvent(configurationProvider.getConfiguration());
             }
 
-            heartbeat.Start();
-
             // start application
             application.start();
 
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
index f05525096c..183900ce66 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
@@ -18,217 +18,91 @@
 package org.apache.inlong.audit.cache;
 
 import org.apache.inlong.audit.config.Configuration;
-import org.apache.inlong.audit.entities.JdbcConfig;
 import org.apache.inlong.audit.entity.AuditProxy;
-import org.apache.inlong.audit.utils.JdbcUtils;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import org.apache.commons.dbcp.BasicDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_MINUTES;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_MINUTES;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_MAX_SIZE;
-import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static 
org.apache.inlong.audit.config.ProxyConstants.DEFAULT_AUDIT_PROXY_ADDRESS_AGENT;
+import static 
org.apache.inlong.audit.config.ProxyConstants.DEFAULT_AUDIT_PROXY_ADDRESS_DATAPROXY;
+import static 
org.apache.inlong.audit.config.ProxyConstants.DEFAULT_AUDIT_PROXY_ADDRESS_SORT;
+import static org.apache.inlong.audit.config.ProxyConstants.IP_PORT_SEPARATOR;
+import static 
org.apache.inlong.audit.config.ProxyConstants.KEY_AUDIT_PROXY_ADDRESS_AGENT;
+import static 
org.apache.inlong.audit.config.ProxyConstants.KEY_AUDIT_PROXY_ADDRESS_DATAPROXY;
+import static 
org.apache.inlong.audit.config.ProxyConstants.KEY_AUDIT_PROXY_ADDRESS_SORT;
+import static org.apache.inlong.audit.config.ProxyConstants.PROXY_SEPARATOR;
+import static org.apache.inlong.audit.entity.AuditComponent.AGENT;
+import static org.apache.inlong.audit.entity.AuditComponent.DATAPROXY;
+import static org.apache.inlong.audit.entity.AuditComponent.SORT;
 
 public class AuditProxyCache {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditProxyCache.class);
     private static final AuditProxyCache instance = new AuditProxyCache();
-    private final Cache<String, HashSet<AuditProxy>> cache;
-    protected final ScheduledExecutorService monitorTimer = 
Executors.newSingleThreadScheduledExecutor();
-    private final BasicDataSource dataSource = new BasicDataSource();
-    private final String queryAllAuditProxyHostSQL;
-    private final String queryAuditProxyHostByComponentSQL;
-
-    private final String queryAuditProxyHeartbeatSQL;
-    private final String queryAuditProxyHeartbeatByComponentSQL;
+    private final Map<String, List<AuditProxy>> auditProxyCache = new 
HashMap<>();
 
     private AuditProxyCache() {
-        cache = Caffeine.newBuilder()
-                
.maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE,
-                        DEFAULT_API_CACHE_MAX_SIZE))
-                
.expireAfterWrite(Configuration.getInstance().get(KEY_API_CACHE_EXPIRED_MINUTES,
-                        DEFAULT_API_CACHE_EXPIRED_MINUTES), TimeUnit.MINUTES)
-                .build();
-        queryAuditProxyHostByComponentSQL = 
Configuration.getInstance().get(KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL,
-                DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL);
-
-        queryAllAuditProxyHostSQL = 
Configuration.getInstance().get(KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL,
-                DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL);
-        queryAuditProxyHeartbeatSQL = 
Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL,
-                DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL);
-        queryAuditProxyHeartbeatByComponentSQL =
-                
Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL,
-                        DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL);
-
-        initDataSource();
-        monitorTimer.scheduleWithFixedDelay(new Runnable() {
-
-            @Override
-            public void run() {
-                update();
-            }
-        }, 0, 1, TimeUnit.MINUTES);
     }
 
-    private void initDataSource() {
-        JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
-        dataSource.setDriverClassName(jdbcConfig.getDriverClass());
-        dataSource.setUrl(jdbcConfig.getJdbcUrl());
-        dataSource.setUsername(jdbcConfig.getUserName());
-        dataSource.setPassword(jdbcConfig.getPassword());
-        
dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
-                DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
-        
dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS,
-                DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS));
-        
dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS,
-                DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS));
-        
dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
-                DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
-        dataSource.setTestOnBorrow(true);
-        dataSource.setValidationQuery("SELECT 1");
-        dataSource
-                
.setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS,
-                        DEFAULT_DATASOURCE_DETECT_INTERVAL_MS));
+    public boolean init() {
+        return initializeAuditProxyCache();
     }
 
-    public static AuditProxyCache getInstance() {
-        return instance;
-    }
+    private boolean initializeAuditProxyCache() {
+        AtomicBoolean isSuccess = new AtomicBoolean(true);
+        Map<String, String> proxyConfigs = getProxyConfigs();
+        proxyConfigs.forEach((component, proxyList) -> {
+            List<AuditProxy> auditProxies = createAuditProxySet(proxyList);
+            if (auditProxies.isEmpty()) {
+                LOGGER.error("{} Audit Proxy config = {}, is invalid!", 
component, proxyList);
+                isSuccess.set(false);
+            } else {
+                LOGGER.info("{} Audit Proxy config = {}", component, 
proxyList);
+                auditProxyCache.put(component, auditProxies);
+            }
+        });
 
-    public List<AuditProxy> getData(String component) {
-        HashSet<AuditProxy> result = cache.getIfPresent(component);
-        if (result != null) {
-            return new ArrayList<>(result);
-        }
-        result = queryAuditProxyInfo(component);
-        if (result.isEmpty()) {
-            result = queryAuditProxyHeartbeat(component);
-        }
-        if (!result.isEmpty()) {
-            cache.put(component, result);
-        }
-        return new ArrayList<>(result);
+        return isSuccess.get();
     }
 
-    private void update() {
-        Map<String, HashSet<AuditProxy>> auditProxyInfo = 
queryAllAuditProxyInfo();
-        if (auditProxyInfo.isEmpty()) {
-            auditProxyInfo = queryAuditProxyHeartbeat();
-        }
-        if (auditProxyInfo.isEmpty()) {
-            return;
-        }
-        for (Map.Entry<String, HashSet<AuditProxy>> entry : 
auditProxyInfo.entrySet()) {
-            try {
-                cache.put(entry.getKey(), entry.getValue());
-            } catch (Exception e) {
-                LOGGER.error("Put data into audit proxy cache has exception! 
", e);
-                // Decide whether to break or continue based on your 
requirement
-                break;
-            }
-        }
+    private Map<String, String> getProxyConfigs() {
+        Configuration config = Configuration.getInstance();
+        Map<String, String> proxyConfigs = new HashMap<>();
+        proxyConfigs.put(AGENT.getComponent(),
+                config.get(KEY_AUDIT_PROXY_ADDRESS_AGENT, 
DEFAULT_AUDIT_PROXY_ADDRESS_AGENT));
+        proxyConfigs.put(DATAPROXY.getComponent(),
+                config.get(KEY_AUDIT_PROXY_ADDRESS_DATAPROXY, 
DEFAULT_AUDIT_PROXY_ADDRESS_DATAPROXY));
+        proxyConfigs.put(SORT.getComponent(),
+                config.get(KEY_AUDIT_PROXY_ADDRESS_SORT, 
DEFAULT_AUDIT_PROXY_ADDRESS_SORT));
+        return proxyConfigs;
     }
 
-    private Map<String, HashSet<AuditProxy>> queryAllAuditProxyInfo() {
-        Map<String, HashSet<AuditProxy>> result = new HashMap<>();
-        try (Connection connection = dataSource.getConnection();
-                PreparedStatement statement = 
connection.prepareStatement(queryAllAuditProxyHostSQL);
-                ResultSet resultSet = statement.executeQuery()) {
-            while (resultSet.next()) {
-                String component = resultSet.getString("component");
-                AuditProxy auditProxyInfo = new 
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
-                result.computeIfAbsent(component, k -> new 
HashSet<>()).add(auditProxyInfo);
-            }
-        } catch (Exception exception) {
-            LOGGER.error("Query audit proxy info has exception! ", exception);
-        }
-        return result;
+    private List<AuditProxy> createAuditProxySet(String proxyList) {
+        return Arrays.stream(proxyList.split(PROXY_SEPARATOR))
+                .map(element -> element.split(IP_PORT_SEPARATOR))
+                .filter(ipPort -> ipPort.length == 2)
+                .map(ipPort -> new AuditProxy(ipPort[0], 
Integer.parseInt(ipPort[1])))
+                .collect(Collectors.toList());
     }
 
-    private HashSet<AuditProxy> queryAuditProxyInfo(String component) {
-        HashSet<AuditProxy> result = new HashSet<>();
-        try (Connection connection = dataSource.getConnection();
-                PreparedStatement statement = 
connection.prepareStatement(queryAuditProxyHostByComponentSQL)) {
-            statement.setString(1, component);
-            try (ResultSet resultSet = statement.executeQuery()) {
-                while (resultSet.next()) {
-                    AuditProxy auditProxyInfo = new 
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
-                    result.add(auditProxyInfo);
-                }
-            } catch (SQLException sqlException) {
-                LOGGER.error("Query audit proxy info by {} has SQL exception 
", component, sqlException);
-            }
-        } catch (Exception exception) {
-            LOGGER.error("Query audit proxy info by {} has  exception ", 
component, exception);
-        }
-        return result;
+    public static AuditProxyCache getInstance() {
+        return instance;
     }
 
-    private Map<String, HashSet<AuditProxy>> queryAuditProxyHeartbeat() {
-        Map<String, HashSet<AuditProxy>> result = new HashMap<>();
-        try (Connection connection = dataSource.getConnection();
-                PreparedStatement statement = 
connection.prepareStatement(queryAuditProxyHeartbeatSQL);
-                ResultSet resultSet = statement.executeQuery()) {
-            while (resultSet.next()) {
-                String component = resultSet.getString("component");
-                AuditProxy auditProxyInfo = new 
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
-                result.computeIfAbsent(component, k -> new 
HashSet<>()).add(auditProxyInfo);
-            }
-        } catch (Exception exception) {
-            LOGGER.error("Query audit proxy info has exception! ", exception);
+    public List<AuditProxy> getData(String component) {
+        List<AuditProxy> result = auditProxyCache.get(component);
+        if (result == null) {
+            return new LinkedList<>();
         }
         return result;
     }
 
-    private HashSet<AuditProxy> queryAuditProxyHeartbeat(String component) {
-        HashSet<AuditProxy> result = new HashSet<>();
-        try (Connection connection = dataSource.getConnection();
-                PreparedStatement statement = 
connection.prepareStatement(queryAuditProxyHeartbeatByComponentSQL)) {
-            statement.setString(1, component);
-            try (ResultSet resultSet = statement.executeQuery()) {
-                while (resultSet.next()) {
-                    AuditProxy auditProxyInfo = new 
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
-                    result.add(auditProxyInfo);
-                }
-            } catch (SQLException sqlException) {
-                LOGGER.error("Query audit proxy info by {} has SQL exception 
", component, sqlException);
-            }
-        } catch (Exception exception) {
-            LOGGER.error("Query audit proxy info by {} has  exception ", 
component, exception);
-        }
-        return result;
-    }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
index 1c22a28fad..a727eba46b 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
@@ -35,8 +35,6 @@ public class OpenApiConstants {
     public static final String DEFAULT_API_GET_IDS_PATH = 
"/audit/query/getIds";
     public static final String KEY_API_GET_AUDIT_PROXY_PATH = 
"api.get.audit.proxy";
     public static final String DEFAULT_API_GET_AUDIT_PROXY_PATH = 
"/audit/query/getAuditProxy";
-    public static final String KEY_API_PROXY_HEART_BEAT_PATH = 
"api.proxy.heartbeat";
-    public static final String DEFAULT_API_PROXY_HEART_BEAT_PATH = 
"/audit/proxy/heartbeat";
     public static final String KEY_API_THREAD_POOL_SIZE = 
"api.thread.pool.size";
     public static final int DEFAULT_API_THREAD_POOL_SIZE = 10;
     public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
@@ -51,9 +49,6 @@ public class OpenApiConstants {
     public static final String KEY_API_CACHE_EXPIRED_HOURS = 
"api.cache.expired.hours";
     public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
 
-    public static final String KEY_API_CACHE_EXPIRED_MINUTES = 
"api.cache.expired.minutes";
-    public static final int DEFAULT_API_CACHE_EXPIRED_MINUTES = 1;
-
     // Http config
     public static final String PARAMS_START_TIME = "startTime";
     public static final String PARAMS_END_TIME = "endTime";
@@ -72,6 +67,4 @@ public class OpenApiConstants {
     public static final int DEFAULT_HTTP_SERVER_BIND_PORT = 10080;
     public static final int HTTP_RESPOND_CODE = 200;
     public static final String PARAMS_AUDIT_COMPONENT = "component";
-    public static final String PARAMS_AUDIT_HOST = "host";
-    public static final String PARAMS_AUDIT_PORT = "port";
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ProxyConstants.java
similarity index 53%
copy from 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
copy to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ProxyConstants.java
index 04fc9186df..be689f8c8b 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ProxyConstants.java
@@ -15,11 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.entities;
+package org.apache.inlong.audit.config;
 
 /**
- * OpenAPI type
+ * Proxy constants
  */
-public enum ApiType {
-    MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY, PROXY_HEARTBEAT;
+public class ProxyConstants {
+
+    public static final String PROXY_SEPARATOR = ";";
+    public static final String IP_PORT_SEPARATOR = ":";
+    public static final String KEY_AUDIT_PROXY_ADDRESS_AGENT = 
"audit.proxy.address.agent";
+    public static final String DEFAULT_AUDIT_PROXY_ADDRESS_AGENT = "";
+
+    public static final String KEY_AUDIT_PROXY_ADDRESS_DATAPROXY = 
"audit.proxy.address.dataproxy";
+    public static final String DEFAULT_AUDIT_PROXY_ADDRESS_DATAPROXY = "";
+
+    public static final String KEY_AUDIT_PROXY_ADDRESS_SORT = 
"audit.proxy.address.sort";
+    public static final String DEFAULT_AUDIT_PROXY_ADDRESS_SORT = "";
+
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index 9eb3950ea5..1cc794466c 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -163,68 +163,6 @@ public class SqlConstants {
     public static final String DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL =
             "replace into audit_data_temp (log_ts,inlong_group_id, 
inlong_stream_id, audit_id,audit_tag,count, size, delay) "
                     + " values (?,?,?,?,?,?,?,?)";
-
-    public static final String KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL = 
"audit.proxy.host.query.component.sql";
-    public static final String DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL =
-            "select\n" +
-                    "  host as host,\n" +
-                    "  port as port \n" +
-                    "from\n" +
-                    "  audit_proxy_host\n" +
-                    "where\n" +
-                    "  LOWER(component) = LOWER(?)\n" +
-                    "  and status = 1\n" +
-                    "group by\n" +
-                    "  host,\n" +
-                    "  port";
-
-    public static final String KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL = 
"audit.proxy.host.query.all.sql";
-    public static final String DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL =
-            "select\n" +
-                    "  component,\n" +
-                    "  host as host,\n" +
-                    "  port as port\n" +
-                    "from\n" +
-                    "  audit_proxy_host\n" +
-                    "where\n" +
-                    "  status = 1\n" +
-                    "group by\n" +
-                    "  component,\n" +
-                    "  host,\n" +
-                    "  port";
-
-    public static final String KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL = 
"audit.proxy.heartbeat.query.sql";
-    public static final String 
DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL =
-            "select\n" +
-                    "  host,\n" +
-                    "  port\n" +
-                    "from\n" +
-                    "  audit_proxy_heartbeat\n" +
-                    "where\n" +
-                    "  LOWER(component) = LOWER(?)\n" +
-                    "  and update_time > (NOW() - INTERVAL 2 MINUTE)\n" +
-                    "group by\n" +
-                    "  host,\n" +
-                    "  port ";
-
-    public static final String KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL = 
"audit.proxy.heartbeat.query.all.sql";
-    public static final String DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL =
-            "select\n" +
-                    "  host,\n" +
-                    "  port\n" +
-                    "from\n" +
-                    "  audit_proxy_heartbeat\n" +
-                    "where\n" +
-                    "  update_time > (NOW() - INTERVAL 2 MINUTE)\n" +
-                    "group by\n" +
-                    "  host,\n" +
-                    "  port";
-
-    public static final String KEY_AUDIT_PROXY_HEARTBEAT_SQL = 
"audit.proxy.heartbeat.sql";
-    public static final String DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL =
-            "replace into audit_proxy_heartbeat (component, host, port)\n" +
-                    "values (?, ?, ?)";
-
     public static final String KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL = 
"audit.data.temp.add.partition.sql";
     public static final String DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL =
             "ALTER TABLE audit_data_temp ADD PARTITION (PARTITION %s VALUES 
LESS THAN (TO_DAYS('%s')))";
@@ -233,4 +171,7 @@ public class SqlConstants {
     public static final String DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL =
             "ALTER TABLE audit_data_temp DROP PARTITION %s";
 
+    public static final String KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL = 
"audit.data.temp.check.partition.sql";
+    public static final String DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL =
+            "SELECT COUNT(*) AS count FROM INFORMATION_SCHEMA.PARTITIONS WHERE 
TABLE_NAME = 'audit_data_temp' and PARTITION_NAME = ?";
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
index 04fc9186df..d87b7e3aa5 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
@@ -21,5 +21,5 @@ package org.apache.inlong.audit.entities;
  * OpenAPI type
  */
 public enum ApiType {
-    MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY, PROXY_HEARTBEAT;
+    MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY;
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java
deleted file mode 100644
index 1a46d757f0..0000000000
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.heartbeat;
-
-import org.apache.inlong.audit.cache.AuditProxyCache;
-import org.apache.inlong.audit.config.Configuration;
-import org.apache.inlong.audit.entities.JdbcConfig;
-import org.apache.inlong.audit.utils.JdbcUtils;
-
-import org.apache.commons.dbcp.BasicDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_SQL;
-
-public class ProxyHeartbeat {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditProxyCache.class);
-    private static final ProxyHeartbeat instance = new ProxyHeartbeat();
-    private final BasicDataSource dataSource = new BasicDataSource();
-    private final String heartbeatSQL;
-
-    ProxyHeartbeat() {
-        heartbeatSQL =
-                Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_SQL, 
DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL);
-        initDataSource();
-    }
-
-    private void initDataSource() {
-        JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
-        setDataSourceConfig(jdbcConfig);
-        dataSource.setTestOnBorrow(true);
-        dataSource.setValidationQuery("SELECT 1");
-        
dataSource.setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS,
-                DEFAULT_DATASOURCE_DETECT_INTERVAL_MS));
-    }
-
-    private void setDataSourceConfig(JdbcConfig jdbcConfig) {
-        dataSource.setDriverClassName(jdbcConfig.getDriverClass());
-        dataSource.setUrl(jdbcConfig.getJdbcUrl());
-        dataSource.setUsername(jdbcConfig.getUserName());
-        dataSource.setPassword(jdbcConfig.getPassword());
-        
dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
-                DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
-        
dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS,
-                DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS));
-        
dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS,
-                DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS));
-        
dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
-                DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
-    }
-
-    public static ProxyHeartbeat getInstance() {
-        return instance;
-    }
-
-    public void heartbeat(String component, String host, int port) {
-        try (Connection connection = dataSource.getConnection();
-                PreparedStatement statement = 
connection.prepareStatement(heartbeatSQL)) {
-            statement.setString(1, component);
-            statement.setString(2, host);
-            statement.setInt(3, port);
-            statement.executeUpdate();
-        } catch (SQLException exception) {
-            LOGGER.error("Heartbeat {} {} {} has exception ", component, host, 
port, exception);
-        }
-    }
-}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
index 3861ff60a2..fa1c56ab23 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
@@ -27,9 +27,7 @@ import org.apache.inlong.audit.config.Configuration;
 import org.apache.inlong.audit.entities.ApiType;
 import org.apache.inlong.audit.entities.AuditCycle;
 import org.apache.inlong.audit.entities.StatData;
-import org.apache.inlong.audit.entity.AuditComponent;
 import org.apache.inlong.audit.entity.AuditProxy;
-import org.apache.inlong.audit.heartbeat.ProxyHeartbeat;
 
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.gson.Gson;
@@ -57,7 +55,6 @@ import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_ID
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IPS_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_HOUR_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTES_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_PROXY_HEART_BEAT_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
@@ -69,7 +66,6 @@ import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IDS_PA
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IPS_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_HOUR_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTES_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_PROXY_HEART_BEAT_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
@@ -79,9 +75,7 @@ import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_HEADER_CO
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_COMPONENT;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_CYCLE;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_HOST;
 import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_ID;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_PORT;
 import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_TAG;
 import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_END_TIME;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_GROUP_Id;
@@ -96,13 +90,17 @@ import static 
org.apache.inlong.audit.entities.ApiType.GET_IDS;
 import static org.apache.inlong.audit.entities.ApiType.GET_IPS;
 import static org.apache.inlong.audit.entities.ApiType.HOUR;
 import static org.apache.inlong.audit.entities.ApiType.MINUTES;
-import static org.apache.inlong.audit.entities.ApiType.PROXY_HEARTBEAT;
 
 public class ApiService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ApiService.class);
 
     public void start() {
+        if (!AuditProxyCache.getInstance().init()) {
+            LOGGER.error("Audit Proxy cache init failed! exit...");
+            System.exit(1);
+        }
+
         initHttpServer();
     }
 
@@ -130,9 +128,6 @@ public class ApiService {
             server.createContext(
                     
Configuration.getInstance().get(KEY_API_GET_AUDIT_PROXY_PATH, 
DEFAULT_API_GET_AUDIT_PROXY_PATH),
                     new AuditHandler(GET_AUDIT_PROXY));
-            server.createContext(
-                    
Configuration.getInstance().get(KEY_API_PROXY_HEART_BEAT_PATH, 
DEFAULT_API_PROXY_HEART_BEAT_PATH),
-                    new AuditHandler(PROXY_HEARTBEAT));
             server.start();
             LOGGER.info("Init http server success. Bind port is: {}", 
bindPort);
         } catch (Exception e) {
@@ -202,7 +197,6 @@ public class ApiService {
                 }
             }
             params.putIfAbsent(PARAMS_AUDIT_TAG, DEFAULT_AUDIT_TAG);
-            params.putIfAbsent(PARAMS_AUDIT_COMPONENT, 
AuditComponent.COMMON_AUDIT.getComponent());
             return params;
         }
 
@@ -228,10 +222,8 @@ public class ApiService {
                             && params.containsKey(PARAMS_END_TIME)
                             && params.containsKey(PARAMS_AUDIT_ID)
                             && params.containsKey(PARAMS_IP);
-                case PROXY_HEARTBEAT:
-                    return params.containsKey(PARAMS_AUDIT_HOST) && 
params.containsKey(PARAMS_AUDIT_PORT);
                 case GET_AUDIT_PROXY:
-                    return true;
+                    return params.containsKey(PARAMS_AUDIT_COMPONENT);
                 default:
                     return false;
             }
@@ -294,11 +286,6 @@ public class ApiService {
                                 
AuditProxyCache.getInstance().getData(params.get(PARAMS_AUDIT_COMPONENT));
                         responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(auditProxy));
                         break;
-                    case PROXY_HEARTBEAT:
-                        
ProxyHeartbeat.getInstance().heartbeat(params.get(PARAMS_AUDIT_COMPONENT),
-                                params.get(PARAMS_AUDIT_HOST), 
Integer.parseInt(params.get(PARAMS_AUDIT_PORT)));
-                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(new LinkedList<>()));
-                        break;
                     default:
                         LOGGER.error("Unsupported interface type! type is {}", 
apiType);
                         responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(new LinkedList<>()));
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
index d6d4781d3d..ac9afc50d4 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
@@ -31,6 +31,8 @@ import javax.sql.DataSource;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 import java.util.concurrent.Executors;
@@ -63,8 +65,10 @@ import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_
 import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
 import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
 import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL;
 import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
 import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL;
 import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
 
 /**
@@ -83,6 +87,7 @@ public class JdbcSink implements AutoCloseable {
 
     private final DateTimeFormatter FORMATTER_YYMMDDHH = 
DateTimeFormatter.ofPattern("yyyyMMdd");
     private final DateTimeFormatter FORMATTER_YY_MM_DD_HH = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+    private final String checkPartitionSql;
 
     public JdbcSink(DataQueue dataQueue, SinkConfig sinkConfig) {
         this.dataQueue = dataQueue;
@@ -93,6 +98,9 @@ public class JdbcSink implements AutoCloseable {
 
         pullTimeOut = Configuration.getInstance().get(KEY_QUEUE_PULL_TIMEOUT,
                 DEFAULT_QUEUE_PULL_TIMEOUT);
+        checkPartitionSql = 
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL,
+                DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL);
+
     }
 
     /**
@@ -108,6 +116,9 @@ public class JdbcSink implements AutoCloseable {
                 TimeUnit.MILLISECONDS);
         if (Configuration.getInstance().get(KEY_ENABLE_MANAGE_PARTITIONS,
                 DEFAULT_ENABLE_MANAGE_PARTITIONS)) {
+            // Create MySQL data partition of today
+            addPartition(0);
+
             
partitionManagerTimer.scheduleWithFixedDelay(this::managePartitions,
                     0,
                     
Configuration.getInstance().get(KEY_CHECK_PARTITION_INTERVAL_HOURS,
@@ -178,7 +189,9 @@ public class JdbcSink implements AutoCloseable {
     }
 
     private void managePartitions() {
-        addPartition();
+        // Create MySQL data partition of tomorrow
+        addPartition(1);
+
         deletePartition();
     }
 
@@ -186,9 +199,13 @@ public class JdbcSink implements AutoCloseable {
         return "p" + date.format(FORMATTER_YYMMDDHH);
     }
 
-    private void addPartition() {
-        String partitionName = 
formatPartitionName(LocalDate.now().plusDays(1));
-        String partitionValue = 
LocalDate.now().plusDays(2).format(FORMATTER_YY_MM_DD_HH);
+    private void addPartition(long daysToAdd) {
+        String partitionName = 
formatPartitionName(LocalDate.now().plusDays(daysToAdd));
+        if (isPartitionExists(partitionName)) {
+            LOGGER.info("Partition [{}] is exist, dont`t need add this 
partition.", partitionName);
+            return;
+        }
+        String partitionValue = LocalDate.now().plusDays(daysToAdd + 
1).format(FORMATTER_YY_MM_DD_HH);
         String addPartitionSQL = String.format(
                 
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL,
                         DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL),
@@ -200,6 +217,10 @@ public class JdbcSink implements AutoCloseable {
         int daysToSubtract = 
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_STORAGE_DAYS,
                 DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS);
         String partitionName = 
formatPartitionName(LocalDate.now().minusDays(daysToSubtract));
+        if (!isPartitionExists(partitionName)) {
+            LOGGER.info("Partition [{}] is not exist, dont`t need delete this 
partition.", partitionName);
+            return;
+        }
         String deletePartitionSQL = String.format(
                 
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL,
                         DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL),
@@ -217,6 +238,24 @@ public class JdbcSink implements AutoCloseable {
         }
     }
 
+    private boolean isPartitionExists(String partitionName) {
+        try (Connection connection = dataSource.getConnection();
+                PreparedStatement statement = 
connection.prepareStatement(checkPartitionSql)) {
+            statement.setString(1, partitionName);
+
+            try (ResultSet resultSet = statement.executeQuery()) {
+                if (resultSet.next()) {
+                    return resultSet.getInt("count") > 0;
+                }
+            } catch (SQLException sqlException) {
+                LOGGER.error("An error occurred while checking partition [{}] 
existence:", partitionName, sqlException);
+            }
+        } catch (Exception exception) {
+            LOGGER.error("An exception occurred while checking partition 
[{}]existence:", partitionName, exception);
+        }
+        return false;
+    }
+
     public void destroy() {
         sinkTimer.shutdown();
     }
diff --git a/inlong-audit/conf/audit-service.properties 
b/inlong-audit/conf/audit-service.properties
index 4d74ff6959..6c24c96105 100644
--- a/inlong-audit/conf/audit-service.properties
+++ b/inlong-audit/conf/audit-service.properties
@@ -19,4 +19,10 @@
 #  mysql config
 
mysql.jdbc.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useUnicode=true&rewriteBatchedStatements=true
 mysql.username=root
-mysql.password=inlong
\ No newline at end of file
+mysql.password=inlong
+
+# Audit Proxy config.
+# If there are multiple addresses, please configure address according to the 
format: address;address,for example: 127.0.0.1:10081;127.0.0.2:10081
+audit.proxy.address.agent=127.0.0.1:10081
+audit.proxy.address.dataproxy=127.0.0.1:10081
+audit.proxy.address.sort=127.0.0.1:10081
\ No newline at end of file
diff --git a/inlong-audit/sql/apache_inlong_audit_mysql.sql 
b/inlong-audit/sql/apache_inlong_audit_mysql.sql
index 57016d9558..e9e114e2e0 100644
--- a/inlong-audit/sql/apache_inlong_audit_mysql.sql
+++ b/inlong-audit/sql/apache_inlong_audit_mysql.sql
@@ -58,20 +58,21 @@ DEFAULT CHARSET = UTF8 COMMENT ='Inlong audit data table';
 -- You can create daily partitions or hourly partitions through the log_ts 
field.
 -- The specific partition type is determined based on the actual data volume.
 -- ----------------------------
-CREATE TABLE IF NOT EXISTS `audit_data_temp`
-(
-    `log_ts`           datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT 
'log timestamp',
-    `inlong_group_id`  varchar(100) NOT NULL DEFAULT '' COMMENT 'The target 
inlong group id',
-    `inlong_stream_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target 
inlong stream id',
-    `audit_id`         varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id',
-    `audit_tag`        varchar(100) DEFAULT '' COMMENT 'Audit tag',
-    `count`            BIGINT       NOT NULL DEFAULT '0' COMMENT 'Message 
count',
-    `size`             BIGINT       NOT NULL DEFAULT '0' COMMENT 'Message 
size',
-    `delay`            BIGINT       NOT NULL DEFAULT '0' COMMENT 'Message 
delay count',
-    `update_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP COMMENT 'Update time',
-    PRIMARY KEY 
(`log_ts`,`inlong_group_id`,`inlong_stream_id`,`audit_id`,`audit_tag`)
+CREATE TABLE IF NOT EXISTS `audit_data_temp` (
+    `log_ts`            datetime NOT NULL DEFAULT '0000-00-00 00:00:00' 
COMMENT 'log timestamp',
+    `audit_id`          varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id',
+    `inlong_group_id`   varchar(100) NOT NULL DEFAULT '' COMMENT 'The target 
inlong group id',
+    `inlong_stream_id`  varchar(100) NOT NULL DEFAULT '' COMMENT 'The target 
inlong stream id',
+    `audit_tag`         varchar(100) DEFAULT '' COMMENT 'Audit tag',
+    `count`             BIGINT NOT NULL DEFAULT '0' COMMENT 'Message count',
+    `size`              BIGINT NOT NULL DEFAULT '0' COMMENT 'Message size',
+    `delay`             BIGINT NOT NULL DEFAULT '0' COMMENT 'Message delay 
count',
+    `update_time`       timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'Update time',
+    PRIMARY KEY 
(`log_ts`,`audit_id`,`inlong_group_id`,`inlong_stream_id`,`audit_tag`)
 ) ENGINE = InnoDB
-DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data temp table';
+DEFAULT CHARSET = UTF8  COMMENT ='InLong audit data temp table'
+PARTITION BY RANGE (to_days(`log_ts`))
+(PARTITION pDefault VALUES LESS THAN (TO_DAYS('1970-01-01')));
 
 -- ----------------------------
 -- Table structure for audit_data_day
@@ -132,30 +133,3 @@ CREATE TABLE IF NOT EXISTS `audit_source_config`
      PRIMARY KEY (`source_name`, `jdbc_url`)
 ) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit source config';
 
--- ----------------------------
--- Table structure for audit proxy heartbeat
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `audit_proxy_heartbeat`
-(
-    `component` varchar(64) NOT NULL DEFAULT 'Common' COMMENT 'Component name, 
such as: Agent, Sort...',
-    `host` varchar(64) NOT NULL COMMENT 'Audit proxy IP',
-    `port` bigint NOT NULL COMMENT 'Audit Proxy Port',
-    `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'Update time',
-    PRIMARY KEY (`component`, `host`, `port`)
-) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT = 'Audit Proxy Heartbeat';
-
-
--- ----------------------------
--- Table structure for audit proxy host
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `audit_proxy_host`
-(
-    `component` varchar(64) NOT NULL DEFAULT 'Common' COMMENT 'Component name, 
such as: Agent, Sort...',
-    `host` varchar(128) NOT NULL DEFAULT '' COMMENT 'Component instance, can 
be ip, name',
-    `port` bigint NOT NULL COMMENT 'Audit Proxy Port',
-    `status` int(11) DEFAULT '1' COMMENT 'Audit source config status. 
0:Offline,1:Online',
-    `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create 
time',
-    `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`component`, `host`, `port`)
-) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT = 'Audit Porxy Host';
-

Reply via email to