This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 09a1f073ec [INLONG-10321][Audit] Audit supports the Audit Proxy 
service discovery and management (#10386)
09a1f073ec is described below

commit 09a1f073ec2829290b557c654b9451341b2eec53
Author: doleyzi <[email protected]>
AuthorDate: Tue Jun 11 19:22:25 2024 +0800

    [INLONG-10321][Audit] Audit supports the Audit Proxy service discovery and 
management (#10386)
    
    * Audit supports the Audit Proxy service discovery and management
    
    * ADD SQL
---
 .../inlong/audit/entities/AuditComponent.java}     |  17 +-
 .../apache/inlong/audit/file/ConfigManager.java    |   8 +
 .../apache/inlong/audit/heartbeat/Heartbeat.java   | 135 ++++++++++++
 .../org/apache/inlong/audit/node/Application.java  |   5 +
 .../apache/inlong/audit/cache/AuditProxyCache.java | 234 +++++++++++++++++++++
 .../inlong/audit/config/OpenApiConstants.java      |  11 +-
 .../apache/inlong/audit/config/SqlConstants.java   |  61 ++++++
 .../org/apache/inlong/audit/entities/ApiType.java  |   2 +-
 .../entities/{ApiType.java => AuditProxy.java}     |  14 +-
 .../entities/{ApiType.java => Heartbeat.java}      |  15 +-
 .../inlong/audit/heartbeat/ProxyHeartbeat.java     |  96 +++++++++
 .../apache/inlong/audit/service/ApiService.java    |  57 ++++-
 inlong-audit/sql/apache_inlong_audit_mysql.sql     |  27 +++
 13 files changed, 655 insertions(+), 27 deletions(-)

diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entities/AuditComponent.java
similarity index 73%
copy from 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
copy to 
inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entities/AuditComponent.java
index 08ba84b7c7..d7e077e3fa 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entities/AuditComponent.java
@@ -17,9 +17,16 @@
 
 package org.apache.inlong.audit.entities;
 
-/**
- * OpenAPI type
- */
-public enum ApiType {
-    MINUTES, HOUR, DAY, GET_IPS, GET_IDS;
+public enum AuditComponent {
+
+    AGENT("Agent"), DATAPROXY("DataProxy"), SORT("Sort"), 
COMMON_AUDIT("Common");
+    private final String component;
+
+    AuditComponent(String component) {
+        this.component = component;
+    }
+
+    public String getComponent() {
+        return component;
+    }
 }
diff --git 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
index 2847836da2..68745efd23 100644
--- 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
@@ -94,6 +94,14 @@ public class ConfigManager {
         return null;
     }
 
+    public String getValue(String key) {
+        ConfigHolder holder = holderMap.get(DEFAULT_CONFIG_PROPERTIES);
+        if (holder != null) {
+            return holder.getHolder().get(key);
+        }
+        return null;
+    }
+
     private boolean updatePropertiesHolder(Map<String, String> result,
             String holderName, boolean addElseRemove) {
         if (StringUtils.isNotEmpty(holderName)) {
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java
new file mode 100644
index 0000000000..ce921f6051
--- /dev/null
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.heartbeat;
+
+import org.apache.inlong.audit.file.ConfigManager;
+import org.apache.inlong.common.util.NetworkUtils;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.apache.inlong.audit.entities.AuditComponent.COMMON_AUDIT;
+
+public class Heartbeat {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Heartbeat.class);
+    private final static String HEARTBEAT_PATH = "/audit/proxy/heartbeat";
+    private static final String AUDIT_HEARTBEAT_INTERVAL_CONFIG_KEY = 
"audit.heartbeat.interval";
+    private static final String AUDIT_SERVICE_HOST_CONFIG_KEY = 
"audit.service.host";
+    private static final String AUDIT_SERVICE_PORT_CONFIG_KEY = 
"agent1.sources.tcp-source.port";
+    private static final String AUDIT_COMPONENT_CONFIG_KEY = "audit.component";
+    private String heartbeatHost;
+    private final ScheduledExecutorService timer = 
Executors.newSingleThreadScheduledExecutor();
+    private final String localIp;
+    private final String localPort;
+
+    public Heartbeat() {
+        localIp = NetworkUtils.getLocalIp();
+        localPort = getLocalPort();
+    }
+
+    public void Start() {
+        heartbeatHost = getConfiguredValue(AUDIT_SERVICE_HOST_CONFIG_KEY);
+        timer.scheduleWithFixedDelay(this::heartbeat,
+                1,
+                getConfiguredInterval(),
+                TimeUnit.MINUTES);
+    }
+
+    private void heartbeat() {
+        if (heartbeatHost == null || localPort == null) {
+            LOGGER.info("Heartbeat is not configure, Don`t need heartbeat");
+            return;
+        }
+        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+            URIBuilder uriBuilder = new URIBuilder("http://"; + heartbeatHost + 
HEARTBEAT_PATH);
+            uriBuilder.setParameter("component", getConfiguredComponent());
+            uriBuilder.setParameter("host", localIp);
+            uriBuilder.setParameter("port", localPort);
+
+            HttpGet httpGet = new HttpGet(uriBuilder.build());
+
+            try (CloseableHttpResponse response = httpClient.execute(httpGet)) 
{
+                String responseBody = 
EntityUtils.toString(response.getEntity());
+                LOGGER.info("Heartbeat response: {}", responseBody);
+            }
+        } catch (Exception exception) {
+            LOGGER.error("Heartbeat has exception", exception);
+        }
+    }
+
+    private int getConfiguredInterval() {
+        String intervalConfigValue = 
getConfiguredValue(AUDIT_HEARTBEAT_INTERVAL_CONFIG_KEY);
+        return intervalConfigValue != null ? 
Integer.parseInt(intervalConfigValue) : 1;
+    }
+
+    private String getConfiguredComponent() {
+        String intervalConfigComponent = 
getConfiguredValue(AUDIT_COMPONENT_CONFIG_KEY);
+        return intervalConfigComponent != null ? intervalConfigComponent : 
COMMON_AUDIT.getComponent();
+    }
+
+    private String getConfiguredValue(String configKey) {
+        return ConfigManager.getInstance().getValue(configKey);
+    }
+
+    private String getLocalPort() {
+        try (Stream<Path> paths = Files.walk(Paths.get("."))) {
+            Path selectedPath = paths.filter(Files::isRegularFile)
+                    .filter(p -> p.toString().endsWith(".conf"))
+                    .findFirst()
+                    .orElse(null);
+            if (selectedPath != null) {
+                String port = loadProperties(selectedPath);
+                LOGGER.info("File: {} , TCP Source Port: {}", selectedPath, 
port);
+                if (port != null) {
+                    return port;
+                }
+            }
+        } catch (IOException e) {
+            LOGGER.error("Get local port has error", e);
+        }
+        return null;
+    }
+
+    private String loadProperties(Path path) {
+        Properties prop = new Properties();
+        try {
+            prop.load(Files.newInputStream(path));
+            return prop.getProperty(AUDIT_SERVICE_PORT_CONFIG_KEY);
+        } catch (IOException e) {
+            LOGGER.error("Load properties has error", e);
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
index 4ab7f29e51..fcd32fb493 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.audit.node;
 
 import org.apache.inlong.audit.file.ConfigManager;
+import org.apache.inlong.audit.heartbeat.Heartbeat;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
@@ -76,6 +77,8 @@ public class Application {
     private MonitorService monitorServer;
     private final ReentrantLock lifecycleLock = new ReentrantLock();
 
+    private static final Heartbeat heartbeat = new Heartbeat();
+
     public Application() {
         this(new ArrayList<LifecycleAware>(0));
     }
@@ -335,6 +338,8 @@ public class Application {
                 
application.handleConfigurationEvent(configurationProvider.getConfiguration());
             }
 
+            heartbeat.Start();
+
             // start application
             application.start();
 
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
new file mode 100644
index 0000000000..b51c2b2ff1
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.cache;
+
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.AuditProxy;
+import org.apache.inlong.audit.entities.JdbcConfig;
+import org.apache.inlong.audit.utils.JdbcUtils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.commons.dbcp.BasicDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_MINUTES;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_MINUTES;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_MAX_SIZE;
+import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL;
+
+public class AuditProxyCache {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditProxyCache.class);
+    private static final AuditProxyCache instance = new AuditProxyCache();
+    private final Cache<String, HashSet<AuditProxy>> cache;
+    protected final ScheduledExecutorService monitorTimer = 
Executors.newSingleThreadScheduledExecutor();
+    private final BasicDataSource dataSource = new BasicDataSource();
+    private final String queryAllAuditProxyHostSQL;
+    private final String queryAuditProxyHostByComponentSQL;
+
+    private final String queryAuditProxyHeartbeatSQL;
+    private final String queryAuditProxyHeartbeatByComponentSQL;
+
+    private AuditProxyCache() {
+        cache = Caffeine.newBuilder()
+                
.maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE,
+                        DEFAULT_API_CACHE_MAX_SIZE))
+                
.expireAfterWrite(Configuration.getInstance().get(KEY_API_CACHE_EXPIRED_MINUTES,
+                        DEFAULT_API_CACHE_EXPIRED_MINUTES), TimeUnit.MINUTES)
+                .build();
+        queryAuditProxyHostByComponentSQL = 
Configuration.getInstance().get(KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL,
+                DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL);
+
+        queryAllAuditProxyHostSQL = 
Configuration.getInstance().get(KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL,
+                DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL);
+        queryAuditProxyHeartbeatSQL = 
Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL,
+                DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL);
+        queryAuditProxyHeartbeatByComponentSQL =
+                
Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL,
+                        DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL);
+
+        initDataSource();
+        monitorTimer.scheduleWithFixedDelay(new Runnable() {
+
+            @Override
+            public void run() {
+                update();
+            }
+        }, 0, 1, TimeUnit.MINUTES);
+    }
+
+    private void initDataSource() {
+        JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
+        dataSource.setDriverClassName(jdbcConfig.getDriverClass());
+        dataSource.setUrl(jdbcConfig.getJdbcUrl());
+        dataSource.setUsername(jdbcConfig.getUserName());
+        dataSource.setPassword(jdbcConfig.getPassword());
+        
dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
+                DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
+        
dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS,
+                DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS));
+        
dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS,
+                DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS));
+        
dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
+                DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
+        dataSource.setTestOnBorrow(true);
+        dataSource.setValidationQuery("SELECT 1");
+        dataSource
+                
.setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS,
+                        DEFAULT_DATASOURCE_DETECT_INTERVAL_MS));
+    }
+
+    public static AuditProxyCache getInstance() {
+        return instance;
+    }
+
+    public List<AuditProxy> getData(String component) {
+        HashSet<AuditProxy> result = cache.getIfPresent(component);
+        if (result != null) {
+            return new ArrayList<>(result);
+        }
+        result = queryAuditProxyInfo(component);
+        if (result.isEmpty()) {
+            result = queryAuditProxyHeartbeat(component);
+        }
+        if (!result.isEmpty()) {
+            cache.put(component, result);
+        }
+        return new ArrayList<>(result);
+    }
+
+    private void update() {
+        Map<String, HashSet<AuditProxy>> auditProxyInfo = 
queryAllAuditProxyInfo();
+        if (auditProxyInfo.isEmpty()) {
+            auditProxyInfo = queryAuditProxyHeartbeat();
+        }
+        if (auditProxyInfo.isEmpty()) {
+            return;
+        }
+        for (Map.Entry<String, HashSet<AuditProxy>> entry : 
auditProxyInfo.entrySet()) {
+            try {
+                cache.put(entry.getKey(), entry.getValue());
+            } catch (Exception e) {
+                LOGGER.error("Put data into audit proxy cache has exception! 
", e);
+                // Decide whether to break or continue based on your 
requirement
+                break;
+            }
+        }
+    }
+
+    private Map<String, HashSet<AuditProxy>> queryAllAuditProxyInfo() {
+        Map<String, HashSet<AuditProxy>> result = new HashMap<>();
+        try (Connection connection = dataSource.getConnection();
+                PreparedStatement statement = 
connection.prepareStatement(queryAllAuditProxyHostSQL);
+                ResultSet resultSet = statement.executeQuery()) {
+            while (resultSet.next()) {
+                String component = resultSet.getString("component");
+                AuditProxy auditProxyInfo = new 
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
+                result.computeIfAbsent(component, k -> new 
HashSet<>()).add(auditProxyInfo);
+            }
+        } catch (Exception exception) {
+            LOGGER.error("Query audit proxy info has exception! ", exception);
+        }
+        return result;
+    }
+
+    private HashSet<AuditProxy> queryAuditProxyInfo(String component) {
+        HashSet<AuditProxy> result = new HashSet<>();
+        try (Connection connection = dataSource.getConnection();
+                PreparedStatement statement = 
connection.prepareStatement(queryAuditProxyHostByComponentSQL)) {
+            statement.setString(1, component);
+            try (ResultSet resultSet = statement.executeQuery()) {
+                while (resultSet.next()) {
+                    AuditProxy auditProxyInfo = new 
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
+                    result.add(auditProxyInfo);
+                }
+            } catch (SQLException sqlException) {
+                LOGGER.error("Query audit proxy info by {} has SQL exception 
", component, sqlException);
+            }
+        } catch (Exception exception) {
+            LOGGER.error("Query audit proxy info by {} has  exception ", 
component, exception);
+        }
+        return result;
+    }
+
+    private Map<String, HashSet<AuditProxy>> queryAuditProxyHeartbeat() {
+        Map<String, HashSet<AuditProxy>> result = new HashMap<>();
+        try (Connection connection = dataSource.getConnection();
+                PreparedStatement statement = 
connection.prepareStatement(queryAuditProxyHeartbeatSQL);
+                ResultSet resultSet = statement.executeQuery()) {
+            while (resultSet.next()) {
+                String component = resultSet.getString("component");
+                AuditProxy auditProxyInfo = new 
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
+                result.computeIfAbsent(component, k -> new 
HashSet<>()).add(auditProxyInfo);
+            }
+        } catch (Exception exception) {
+            LOGGER.error("Query audit proxy info has exception! ", exception);
+        }
+        return result;
+    }
+
+    private HashSet<AuditProxy> queryAuditProxyHeartbeat(String component) {
+        HashSet<AuditProxy> result = new HashSet<>();
+        try (Connection connection = dataSource.getConnection();
+                PreparedStatement statement = 
connection.prepareStatement(queryAuditProxyHeartbeatByComponentSQL)) {
+            statement.setString(1, component);
+            try (ResultSet resultSet = statement.executeQuery()) {
+                while (resultSet.next()) {
+                    AuditProxy auditProxyInfo = new 
AuditProxy(resultSet.getString("host"), resultSet.getInt("port"));
+                    result.add(auditProxyInfo);
+                }
+            } catch (SQLException sqlException) {
+                LOGGER.error("Query audit proxy info by {} has SQL exception 
", component, sqlException);
+            }
+        } catch (Exception exception) {
+            LOGGER.error("Query audit proxy info by {} has  exception ", 
component, exception);
+        }
+        return result;
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
index 05643c43bf..1c22a28fad 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
@@ -33,6 +33,10 @@ public class OpenApiConstants {
     public static final String DEFAULT_API_GET_IPS_PATH = 
"/audit/query/getIps";
     public static final String KEY_API_GET_IDS_PATH = "api.get.ids.path";
     public static final String DEFAULT_API_GET_IDS_PATH = 
"/audit/query/getIds";
+    public static final String KEY_API_GET_AUDIT_PROXY_PATH = 
"api.get.audit.proxy";
+    public static final String DEFAULT_API_GET_AUDIT_PROXY_PATH = 
"/audit/query/getAuditProxy";
+    public static final String KEY_API_PROXY_HEART_BEAT_PATH = 
"api.proxy.heartbeat";
+    public static final String DEFAULT_API_PROXY_HEART_BEAT_PATH = 
"/audit/proxy/heartbeat";
     public static final String KEY_API_THREAD_POOL_SIZE = 
"api.thread.pool.size";
     public static final int DEFAULT_API_THREAD_POOL_SIZE = 10;
     public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
@@ -47,6 +51,9 @@ public class OpenApiConstants {
     public static final String KEY_API_CACHE_EXPIRED_HOURS = 
"api.cache.expired.hours";
     public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
 
+    public static final String KEY_API_CACHE_EXPIRED_MINUTES = 
"api.cache.expired.minutes";
+    public static final int DEFAULT_API_CACHE_EXPIRED_MINUTES = 1;
+
     // Http config
     public static final String PARAMS_START_TIME = "startTime";
     public static final String PARAMS_END_TIME = "endTime";
@@ -64,5 +71,7 @@ public class OpenApiConstants {
     public static final String KEY_HTTP_SERVER_BIND_PORT = 
"api.http.server.bind.port";
     public static final int DEFAULT_HTTP_SERVER_BIND_PORT = 10080;
     public static final int HTTP_RESPOND_CODE = 200;
-    public static final String DEFAULT_PARAMS_AUDIT_TAG = "";
+    public static final String PARAMS_AUDIT_COMPONENT = "component";
+    public static final String PARAMS_AUDIT_HOST = "host";
+    public static final String PARAMS_AUDIT_PORT = "port";
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index f0eebca6de..036781d773 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -164,4 +164,65 @@ public class SqlConstants {
             "replace into audit_data_temp (log_ts,inlong_group_id, 
inlong_stream_id, audit_id,audit_tag,count, size, delay) "
                     + " values (?,?,?,?,?,?,?,?)";
 
+    public static final String KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL = 
"audit.proxy.host.query.component.sql";
+    public static final String DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL =
+            "select\n" +
+                    "  host as host,\n" +
+                    "  port as port \n" +
+                    "from\n" +
+                    "  audit_proxy_host\n" +
+                    "where\n" +
+                    "  LOWER(component) = LOWER(?)\n" +
+                    "  and status = 1\n" +
+                    "group by\n" +
+                    "  host,\n" +
+                    "  port";
+
+    public static final String KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL = 
"audit.proxy.host.query.all.sql";
+    public static final String DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL =
+            "select\n" +
+                    "  component,\n" +
+                    "  host as host,\n" +
+                    "  port as port\n" +
+                    "from\n" +
+                    "  audit_proxy_host\n" +
+                    "where\n" +
+                    "  status = 1\n" +
+                    "group by\n" +
+                    "  component,\n" +
+                    "  host,\n" +
+                    "  port";
+
+    public static final String KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL = 
"audit.proxy.heartbeat.query.sql";
+    public static final String 
DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL =
+            "select\n" +
+                    "  host,\n" +
+                    "  port\n" +
+                    "from\n" +
+                    "  audit_proxy_heartbeat\n" +
+                    "where\n" +
+                    "  LOWER(component) = LOWER(?)\n" +
+                    "  and update_time > (NOW() - INTERVAL 2 MINUTE)\n" +
+                    "group by\n" +
+                    "  host,\n" +
+                    "  port ";
+
+    public static final String KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL = 
"audit.proxy.heartbeat.query.all.sql";
+    public static final String DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL =
+            "select\n" +
+                    "  host,\n" +
+                    "  port\n" +
+                    "from\n" +
+                    "  audit_proxy_heartbeat\n" +
+                    "where\n" +
+                    "  update_time > (NOW() - INTERVAL 2 MINUTE)\n" +
+                    "group by\n" +
+                    "  host,\n" +
+                    "  port";
+
+    public static final String KEY_AUDIT_PROXY_HEARTBEAT_SQL = 
"audit.proxy.heartbeat.sql";
+    public static final String DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL =
+            "replace into audit_proxy_heartbeat (component, host, port)\n" +
+                    "values (?, ?, ?)";
+
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
index 08ba84b7c7..04fc9186df 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
@@ -21,5 +21,5 @@ package org.apache.inlong.audit.entities;
  * OpenAPI type
  */
 public enum ApiType {
-    MINUTES, HOUR, DAY, GET_IPS, GET_IDS;
+    MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY, PROXY_HEARTBEAT;
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/AuditProxy.java
similarity index 84%
copy from 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
copy to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/AuditProxy.java
index 08ba84b7c7..c594ae2ffe 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/AuditProxy.java
@@ -17,9 +17,13 @@
 
 package org.apache.inlong.audit.entities;
 
-/**
- * OpenAPI type
- */
-public enum ApiType {
-    MINUTES, HOUR, DAY, GET_IPS, GET_IDS;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class AuditProxy {
+
+    private String host;
+    private int port;
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/Heartbeat.java
similarity index 82%
copy from 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
copy to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/Heartbeat.java
index 08ba84b7c7..985e3365ab 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/Heartbeat.java
@@ -17,9 +17,14 @@
 
 package org.apache.inlong.audit.entities;
 
-/**
- * OpenAPI type
- */
-public enum ApiType {
-    MINUTES, HOUR, DAY, GET_IPS, GET_IDS;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class Heartbeat {
+
+    private String component;
+    private String host;
+    private int port;
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java
new file mode 100644
index 0000000000..1a46d757f0
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.heartbeat;
+
+import org.apache.inlong.audit.cache.AuditProxyCache;
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.JdbcConfig;
+import org.apache.inlong.audit.utils.JdbcUtils;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_SQL;
+
+public class ProxyHeartbeat {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditProxyCache.class);
+    private static final ProxyHeartbeat instance = new ProxyHeartbeat();
+    private final BasicDataSource dataSource = new BasicDataSource();
+    private final String heartbeatSQL;
+
+    ProxyHeartbeat() {
+        heartbeatSQL =
+                Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_SQL, 
DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL);
+        initDataSource();
+    }
+
+    private void initDataSource() {
+        JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
+        setDataSourceConfig(jdbcConfig);
+        dataSource.setTestOnBorrow(true);
+        dataSource.setValidationQuery("SELECT 1");
+        
dataSource.setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS,
+                DEFAULT_DATASOURCE_DETECT_INTERVAL_MS));
+    }
+
+    private void setDataSourceConfig(JdbcConfig jdbcConfig) {
+        dataSource.setDriverClassName(jdbcConfig.getDriverClass());
+        dataSource.setUrl(jdbcConfig.getJdbcUrl());
+        dataSource.setUsername(jdbcConfig.getUserName());
+        dataSource.setPassword(jdbcConfig.getPassword());
+        
dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
+                DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
+        
dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS,
+                DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS));
+        
dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS,
+                DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS));
+        
dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
+                DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
+    }
+
+    public static ProxyHeartbeat getInstance() {
+        return instance;
+    }
+
+    public void heartbeat(String component, String host, int port) {
+        try (Connection connection = dataSource.getConnection();
+                PreparedStatement statement = 
connection.prepareStatement(heartbeatSQL)) {
+            statement.setString(1, component);
+            statement.setString(2, host);
+            statement.setInt(3, port);
+            statement.executeUpdate();
+        } catch (SQLException exception) {
+            LOGGER.error("Heartbeat {} {} {} has exception ", component, host, 
port, exception);
+        }
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
index 3474df4968..8693539282 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.audit.service;
 
+import org.apache.inlong.audit.cache.AuditProxyCache;
 import org.apache.inlong.audit.cache.DayCache;
 import org.apache.inlong.audit.cache.HalfHourCache;
 import org.apache.inlong.audit.cache.HourCache;
@@ -24,8 +25,11 @@ import org.apache.inlong.audit.cache.RealTimeQuery;
 import org.apache.inlong.audit.cache.TenMinutesCache;
 import org.apache.inlong.audit.config.Configuration;
 import org.apache.inlong.audit.entities.ApiType;
+import org.apache.inlong.audit.entities.AuditComponent;
 import org.apache.inlong.audit.entities.AuditCycle;
+import org.apache.inlong.audit.entities.AuditProxy;
 import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.heartbeat.ProxyHeartbeat;
 
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.gson.Gson;
@@ -48,20 +52,24 @@ import java.util.concurrent.Executors;
 
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_BACKLOG_SIZE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_DAY_PATH;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_AUDIT_PROXY_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IDS_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IPS_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_HOUR_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTES_PATH;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_PROXY_HEART_BEAT_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.HTTP_RESPOND_CODE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_BACKLOG_SIZE;
 import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_DAY_PATH;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_AUDIT_PROXY_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IDS_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IPS_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_HOUR_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTES_PATH;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_PROXY_HEART_BEAT_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
@@ -69,8 +77,11 @@ import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_COMPONENT;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_CYCLE;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_HOST;
 import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_ID;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_PORT;
 import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_TAG;
 import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_END_TIME;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_GROUP_Id;
@@ -80,14 +91,17 @@ import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_START_TIME;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE;
 import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
 import static org.apache.inlong.audit.entities.ApiType.DAY;
+import static org.apache.inlong.audit.entities.ApiType.GET_AUDIT_PROXY;
 import static org.apache.inlong.audit.entities.ApiType.GET_IDS;
 import static org.apache.inlong.audit.entities.ApiType.GET_IPS;
 import static org.apache.inlong.audit.entities.ApiType.HOUR;
 import static org.apache.inlong.audit.entities.ApiType.MINUTES;
+import static org.apache.inlong.audit.entities.ApiType.PROXY_HEARTBEAT;
 
 public class ApiService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ApiService.class);
+
     public void start() {
         initHttpServer();
     }
@@ -113,6 +127,12 @@ public class ApiService {
                     new AuditHandler(GET_IDS));
             
server.createContext(Configuration.getInstance().get(KEY_API_GET_IPS_PATH, 
DEFAULT_API_GET_IPS_PATH),
                     new AuditHandler(GET_IPS));
+            server.createContext(
+                    
Configuration.getInstance().get(KEY_API_GET_AUDIT_PROXY_PATH, 
DEFAULT_API_GET_AUDIT_PROXY_PATH),
+                    new AuditHandler(GET_AUDIT_PROXY));
+            server.createContext(
+                    
Configuration.getInstance().get(KEY_API_PROXY_HEART_BEAT_PATH, 
DEFAULT_API_PROXY_HEART_BEAT_PATH),
+                    new AuditHandler(PROXY_HEARTBEAT));
             server.start();
             LOGGER.info("Init http server success. Bind port is: {}", 
bindPort);
         } catch (Exception e) {
@@ -136,6 +156,7 @@ public class ApiService {
 
         @Override
         public void handle(HttpExchange exchange) {
+            LOGGER.info("handle {}", exchange.getRequestURI().toString());
             if (null != limiter) {
                 limiter.acquire();
             }
@@ -181,6 +202,7 @@ public class ApiService {
                 }
             }
             params.putIfAbsent(PARAMS_AUDIT_TAG, DEFAULT_AUDIT_TAG);
+            params.putIfAbsent(PARAMS_AUDIT_COMPONENT, 
AuditComponent.COMMON_AUDIT.getComponent());
             return params;
         }
 
@@ -206,6 +228,10 @@ public class ApiService {
                             && params.containsKey(PARAMS_END_TIME)
                             && params.containsKey(PARAMS_AUDIT_ID)
                             && params.containsKey(PARAMS_IP);
+                case PROXY_HEARTBEAT:
+                    return params.containsKey(PARAMS_AUDIT_HOST) && 
params.containsKey(PARAMS_AUDIT_PORT);
+                case GET_AUDIT_PROXY:
+                    return true;
                 default:
                     return false;
             }
@@ -219,11 +245,14 @@ public class ApiService {
         }
 
         private void handleLegalParams(JsonObject responseJson, Map<String, 
String> params) {
-            List<StatData> statData = null;
+            responseJson.addProperty(KEY_HTTP_BODY_SUCCESS, true);
+            responseJson.addProperty(KEY_HTTP_BODY_ERR_MSG, "");
+            Gson gson = new Gson();
+            List<StatData> statData;
             try {
                 switch (apiType) {
                     case MINUTES:
-                        statData = handleMinutesApi(params);
+                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(handleMinutesApi(params)));
                         break;
                     case HOUR:
                         statData = 
HourCache.getInstance().getData(params.get(PARAMS_START_TIME),
@@ -232,6 +261,7 @@ public class ApiService {
                                 params.get(PARAMS_INLONG_STREAM_Id),
                                 params.get(PARAMS_AUDIT_ID),
                                 params.get(PARAMS_AUDIT_TAG));
+                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
                         break;
                     case DAY:
                         statData = DayCache.getInstance().getData(
@@ -240,6 +270,7 @@ public class ApiService {
                                 params.get(PARAMS_INLONG_GROUP_Id),
                                 params.get(PARAMS_INLONG_STREAM_Id),
                                 params.get(PARAMS_AUDIT_ID));
+                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
                         break;
                     case GET_IDS:
                         statData = RealTimeQuery.getInstance().queryIdsByIp(
@@ -247,6 +278,7 @@ public class ApiService {
                                 params.get(PARAMS_END_TIME),
                                 params.get(PARAMS_IP),
                                 params.get(PARAMS_AUDIT_ID));
+                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
                         break;
                     case GET_IPS:
                         statData = RealTimeQuery.getInstance().queryIpsById(
@@ -255,21 +287,26 @@ public class ApiService {
                                 params.get(PARAMS_INLONG_GROUP_Id),
                                 params.get(PARAMS_INLONG_STREAM_Id),
                                 params.get(PARAMS_AUDIT_ID));
+                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
+                        break;
+                    case GET_AUDIT_PROXY:
+                        List<AuditProxy> auditProxy =
+                                
AuditProxyCache.getInstance().getData(params.get(PARAMS_AUDIT_COMPONENT));
+                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(auditProxy));
+                        break;
+                    case PROXY_HEARTBEAT:
+                        
ProxyHeartbeat.getInstance().heartbeat(params.get(PARAMS_AUDIT_COMPONENT),
+                                params.get(PARAMS_AUDIT_HOST), 
Integer.parseInt(params.get(PARAMS_AUDIT_PORT)));
+                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(new LinkedList<>()));
                         break;
                     default:
                         LOGGER.error("Unsupported interface type! type is {}", 
apiType);
+                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(new LinkedList<>()));
                 }
             } catch (Exception exception) {
                 LOGGER.error("Handle legal params has exception ", exception);
+                responseJson.add(KEY_HTTP_BODY_ERR_DATA, gson.toJsonTree(new 
LinkedList<>()));
             }
-
-            if (null == statData)
-                statData = new LinkedList<>();
-
-            responseJson.addProperty(KEY_HTTP_BODY_SUCCESS, true);
-            responseJson.addProperty(KEY_HTTP_BODY_ERR_MSG, "");
-            Gson gson = new Gson();
-            responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
         }
 
         private List<StatData> handleMinutesApi(Map<String, String> params) {
diff --git a/inlong-audit/sql/apache_inlong_audit_mysql.sql 
b/inlong-audit/sql/apache_inlong_audit_mysql.sql
index 333052c97f..57016d9558 100644
--- a/inlong-audit/sql/apache_inlong_audit_mysql.sql
+++ b/inlong-audit/sql/apache_inlong_audit_mysql.sql
@@ -132,3 +132,30 @@ CREATE TABLE IF NOT EXISTS `audit_source_config`
      PRIMARY KEY (`source_name`, `jdbc_url`)
 ) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit source config';
 
+-- ----------------------------
+-- Table structure for audit proxy heartbeat
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `audit_proxy_heartbeat`
+(
+    `component` varchar(64) NOT NULL DEFAULT 'Common' COMMENT 'Component name, 
such as: Agent, Sort...',
+    `host` varchar(64) NOT NULL COMMENT 'Audit proxy IP',
+    `port` bigint NOT NULL COMMENT 'Audit Proxy Port',
+    `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'Update time',
+    PRIMARY KEY (`component`, `host`, `port`)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT = 'Audit Proxy Heartbeat';
+
+
+-- ----------------------------
+-- Table structure for audit proxy host
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `audit_proxy_host`
+(
+    `component` varchar(64) NOT NULL DEFAULT 'Common' COMMENT 'Component name, 
such as: Agent, Sort...',
+    `host` varchar(128) NOT NULL DEFAULT '' COMMENT 'Component instance, can 
be ip, name',
+    `port` bigint NOT NULL COMMENT 'Audit Proxy Port',
+    `status` int(11) DEFAULT '1' COMMENT 'Audit source config status. 
0:Offline,1:Online',
+    `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create 
time',
+    `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`component`, `host`, `port`)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT = 'Audit Porxy Host';
+

Reply via email to