This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d4716983b [INLONG-9977][Audit] Audit-service increases the 
capabilities of openapi (#9978)
3d4716983b is described below

commit 3d4716983b93df61dd5fe88a8de953b85c79a413
Author: doleyzi <[email protected]>
AuthorDate: Thu Apr 11 21:36:34 2024 +0800

    [INLONG-9977][Audit] Audit-service increases the capabilities of openapi 
(#9978)
---
 .../apache/inlong/audit/cache/AbstractCache.java   |  22 ++-
 .../org/apache/inlong/audit/cache/DayCache.java    |  47 ++---
 .../org/apache/inlong/audit/channel/DataQueue.java |   4 +-
 .../inlong/audit/config/ConfigConstants.java       |  61 ++----
 .../apache/inlong/audit/config/Configuration.java  |  16 +-
 .../inlong/audit/config/OpenApiConstants.java      |  63 ++++++
 .../apache/inlong/audit/config/SqlConstants.java   |  43 ++--
 .../JdbcConfig.java}                               |  19 +-
 .../org/apache/inlong/audit/entities/StatData.java |   1 +
 .../org/apache/inlong/audit/main/Application.java  |  57 ++++++
 ...erImpl.java => SelectorChangeListenerImpl.java} |   8 +-
 .../audit/selector/api/SelectorChangeListener.java |   2 +-
 .../inlong/audit/selector/api/SelectorConfig.java  |   8 +-
 .../inlong/audit/selector/api/SelectorFactory.java |   4 +-
 .../inlong/audit/selector/impl/DBDataSource.java   |  54 ++---
 .../inlong/audit/selector/impl/SelectorImpl.java   |  14 +-
 .../inlong/audit/selector/task/DBMonitorTask.java  |  22 +--
 .../apache/inlong/audit/service/ApiService.java    | 220 +++++++++++++++++++++
 .../apache/inlong/audit/service/EtlService.java    | 149 ++++++++------
 .../org/apache/inlong/audit/sink/CacheSink.java    |   4 +-
 .../org/apache/inlong/audit/sink/JdbcSink.java     |   4 +-
 .../org/apache/inlong/audit/source/JdbcSource.java |  49 ++---
 .../org/apache/inlong/audit/utils/JdbcUtils.java   |  84 ++++++++
 inlong-audit/bin/service-start.sh                  |  85 ++++++++
 inlong-audit/bin/service-stop.sh                   |  42 ++++
 inlong-audit/conf/audit-service.properties         |  40 ++++
 inlong-audit/pom.xml                               |   1 +
 .../apcache_inlong_audit_aggregate.sql             |  12 ++
 28 files changed, 868 insertions(+), 267 deletions(-)

diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
index a492a9269b..7a3ac871bd 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
@@ -20,30 +20,30 @@ package org.apache.inlong.audit.cache;
 import org.apache.inlong.audit.config.Configuration;
 import org.apache.inlong.audit.entities.AuditCycle;
 import org.apache.inlong.audit.entities.StatData;
-import org.apache.inlong.audit.source.JdbcSource;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_API_CACHE_MAX_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_API_CACHE_EXPIRED_HOURS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_API_CACHE_MAX_SIZE;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_HOURS;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_MAX_SIZE;
 
 /**
  * Abstract cache.
  */
 public class AbstractCache {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractCache.class);
     protected final Cache<String, StatData> cache;
     protected final ScheduledExecutorService monitorTimer = 
Executors.newSingleThreadScheduledExecutor();
     protected AuditCycle auditCycle;
@@ -82,7 +82,11 @@ public class AbstractCache {
      * @return
      */
     public List<StatData> getData(String key) {
-        return Arrays.asList(cache.getIfPresent(key));
+        StatData statData = cache.getIfPresent(key);
+        if (null == statData) {
+            return new LinkedList<>();
+        }
+        return Collections.singletonList(statData);
     }
 
     /**
@@ -97,6 +101,6 @@ public class AbstractCache {
      * Monitor
      */
     private void monitor() {
-        LOG.info("{} api local cache size={}", auditCycle, 
cache.estimatedSize());
+        LOGGER.info("{} api local cache size={}", auditCycle, 
cache.estimatedSize());
     }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java
index cd8ceda479..06ca4b75ae 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java
@@ -18,7 +18,9 @@
 package org.apache.inlong.audit.cache;
 
 import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.JdbcConfig;
 import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.utils.JdbcUtils;
 
 import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.HikariDataSource;
@@ -33,7 +35,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Objects;
 
 import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
@@ -44,11 +45,6 @@ import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_C
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER;
-import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
@@ -61,7 +57,7 @@ import static 
org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY
  */
 public class DayCache implements AutoCloseable {
 
-    private static final Logger LOG = LoggerFactory.getLogger(DayCache.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DayCache.class);
     private static volatile DayCache dayCache = null;
     private DataSource dataSource;
 
@@ -114,21 +110,21 @@ public class DayCache implements AutoCloseable {
             try (ResultSet resultSet = pstat.executeQuery()) {
                 while (resultSet.next()) {
                     StatData data = new StatData();
-                    data.setLogTs(startTime);
-                    data.setInlongGroupId(resultSet.getString(1));
-                    data.setInlongStreamId(resultSet.getString(2));
-                    data.setAuditId(resultSet.getString(3));
-                    data.setAuditTag(resultSet.getString(4));
-                    data.setCount(resultSet.getLong(5));
-                    data.setSize(resultSet.getLong(6));
-                    data.setDelay(resultSet.getLong(7));
+                    data.setLogTs(resultSet.getString(1));
+                    data.setInlongGroupId(resultSet.getString(2));
+                    data.setInlongStreamId(resultSet.getString(3));
+                    data.setAuditId(resultSet.getString(4));
+                    data.setAuditTag(resultSet.getString(5));
+                    data.setCount(resultSet.getLong(6));
+                    data.setSize(resultSet.getLong(7));
+                    data.setDelay(resultSet.getLong(8));
                     result.add(data);
                 }
             } catch (SQLException sqlException) {
-                LOG.error("Query has SQL exception! ", sqlException);
+                LOGGER.error("Query has SQL exception! ", sqlException);
             }
         } catch (Exception exception) {
-            LOG.error("Query has exception! ", exception);
+            LOGGER.error("Query has exception! ", exception);
         }
         return result;
     }
@@ -137,20 +133,13 @@ public class DayCache implements AutoCloseable {
      * Create data source
      */
     private void createDataSource() {
-        String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER, 
KEY_DEFAULT_MYSQL_DRIVER);
-        String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL);
-        String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME);
-        String passWord = Configuration.getInstance().get(KEY_MYSQL_PASSWORD);
-        assert (Objects.nonNull(driver)
-                && Objects.nonNull(jdbcUrl)
-                && Objects.nonNull(userName)
-                && Objects.nonNull(passWord));
+        JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
 
         HikariConfig config = new HikariConfig();
-        config.setDriverClassName(driver);
-        config.setJdbcUrl(jdbcUrl);
-        config.setUsername(userName);
-        config.setPassword(passWord);
+        config.setDriverClassName(jdbcConfig.getDriverClass());
+        config.setJdbcUrl(jdbcConfig.getJdbcUrl());
+        config.setUsername(jdbcConfig.getUserName());
+        config.setPassword(jdbcConfig.getPassword());
         
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
                 DEFAULT_CONNECTION_TIMEOUT));
         config.addDataSourceProperty(CACHE_PREP_STMTS,
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
index 3169e9d377..31217c586b 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class DataQueue {
 
-    private static final Logger LOG = LoggerFactory.getLogger(DataQueue.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DataQueue.class);
 
     private final LinkedBlockingQueue<StatData> queue;
 
@@ -65,6 +65,6 @@ public class DataQueue {
         if (queue != null) {
             queue.clear();
         }
-        LOG.info("destroy channel!");
+        LOGGER.info("destroy channel!");
     }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
index 64797df982..8209a1c814 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
@@ -37,46 +37,18 @@ public class ConfigConstants {
     public static final String KEY_MYSQL_PASSWORD = "mysql.password";
 
     // Time config
-    public static final String KEY_QUERY_SQL_TIME_OUT = "query.sql.timeout";
-    public static final int DEFAULT_QUERY_SQL_TIME_OUT = 300;
-    public static final String KEY_JDBC_TIME_OUT = "jdbc.timeout.second";
-    public static final int DEFAULT_JDBC_TIME_OUT = 300;
     public static final String KEY_DATASOURCE_CONNECTION_TIMEOUT = 
"datasource.connection.timeout.ms";
     public static final int DEFAULT_CONNECTION_TIMEOUT = 1000 * 60 * 5;
-    public static final String KEY_API_RESPONSE_TIMEOUT = 
"api.response.timeout";
-    public static final int DEFAULT_API_TIMEOUT = 30;
     public static final String KEY_QUEUE_PULL_TIMEOUT = 
"queue.pull.timeout.ms";
     public static final int DEFAULT_QUEUE_PULL_TIMEOUT = 1000;
 
     // Interval config
-    public static final String KEY_SOURCE_CLICKHOUSE_STAT_INTERVAL = 
"source.clickhouse.stat.interval.minute";
-    public static final int DEFAULT_SOURCE_CLICKHOUSE_STAT_INTERVAL = 1;
     public static final String KEY_SOURCE_DB_STAT_INTERVAL = 
"source.db.stat.interval.minute";
     public static final int DEFAULT_SOURCE_DB_STAT_INTERVAL = 1;
     public static final String KEY_SOURCE_DB_SINK_INTERVAL = 
"sink.db.interval.ms";
     public static final int DEFAULT_SOURCE_DB_SINK_INTERVAL = 100;
     public static final String KEY_SOURCE_DB_SINK_BATCH = "sink.db.batch";
     public static final int DEFAULT_SOURCE_DB_SINK_BATCH = 1000;
-    public static final String KEY_SOURCE_DB_SINK_INTERNAL = 
"sink.db.internal.ms";
-    public static final int DEFAULT_SOURCE_DB_SINK_INTERNAL = 100;
-
-    // Api config
-    public static final String KEY_HOUR_API_PATH = "hour.api.path";
-    public static final String DEFAULT_HOUR_API_PATH = "/audit/query/hour";
-    public static final String KEY_DAY_API_PATH = "day.api.path";
-    public static final String DEFAULT_DAY_API_PATH = "/audit/query/day";
-    public static final String KEY_DAY_API_TABLE = "day.api.table";
-    public static final String DEFAULT_DAY_API_TABLE = "audit_data_day";
-    public static final String KEY_MINUTE_API_TABLE = "minute.api.table";
-    public static final String DEFAULT_MINUTE_API_TABLE = "audit_data_temp";
-    public static final String KEY_MINUTE_10_API_PATH = "minute.10.api.path";
-    public static final String DEFAULT_MINUTE_10_API_PATH = 
"/audit/query/minute/10";
-    public static final String KEY_MINUTE_30_API_PATH = "minute.30.api.path";
-    public static final String DEFAULT_MINUTE_30_API_PATH = 
"/audit/query/minute/30";
-    public static final String KEY_API_POOL_SIZE = "api.pool.size";
-    public static final int DEFAULT_POOL_SIZE = 10;
-    public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
-    public static final int DEFAULT_API_BACKLOG_SIZE = 100;
 
     public static final String KEY_DATASOURCE_POOL_SIZE = 
"datasource.pool.size";
     public static final int DEFAULT_DATASOURCE_POOL_SIZE = 1000;
@@ -87,19 +59,14 @@ public class ConfigConstants {
     public static final String DEFAULT_AUDIT_IDS = "3;4;5;6";
 
     // Summary config
-    public static final String KEY_REALTIME_SUMMARY_SOURCE_TABLE = 
"realtime.summary.source.table";
-    public static final String DEFAULT_REALTIME_SUMMARY_SOURCE_TABLE = 
"audit_data";
-    public static final String KEY_REALTIME_SUMMARY_SINK_TABLE = 
"realtime.summary.sink.table";
-    public static final String DEFAULT_REALTIME_SUMMARY_SINK_TABLE = 
"audit_data_temp";
-    public static final String KEY_REALTIME_SUMMARY_STAT_BACK_TIMES = 
"realtime.summary.stat.back.times";
-    public static final int DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES = 6;
-
-    public static final String KEY_DAILY_SUMMARY_SOURCE_TABLE = 
"daily.summary.source.table";
-    public static final String DEFAULT_DAILY_SUMMARY_SOURCE_TABLE = 
"audit_data_temp";
-    public static final String KEY_DAILY_SUMMARY_SINK_TABLE = 
"daily.summary.sink.table";
-    public static final String DEFAULT_DAILY_SUMMARY_SINK_TABLE = 
"audit_data_day";
-    public static final String KEY_DAILY_SUMMARY_STAT_BACK_TIMES = 
"daily.summary.stat.back.times";
-    public static final int DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES = 2;
+    public static final String KEY_SUMMARY_REALTIME_STAT_BACK_TIMES = 
"summary.realtime.stat.back.times";
+    public static final int DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES = 6;
+
+    public static final String KEY_SUMMARY_DAILY_STAT_BACK_TIMES = 
"summary.daily.stat.back.times";
+    public static final int DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES = 2;
+
+    public static final String KEY_STAT_BACK_INITIAL_OFFSET = 
"stat.back.initial.offset";
+    public static final int DEFAULT_STAT_BACK_INITIAL_OFFSET = 0;
 
     // HA selector config
     public static final String KEY_RELEASE_LEADER_INTERVAL = 
"release.leader.interval";
@@ -107,6 +74,11 @@ public class ConfigConstants {
     public static final String KEY_SELECTOR_THREAD_POOL_SIZE = 
"selector.thread.pool.size";
     public static final int DEFAULT_SELECTOR_THREAD_POOL_SIZE = 3;
 
+    public static final String KEY_SELECTOR_SERVICE_ID = "selector.service.id";
+    public static final String DEFAULT_SELECTOR_SERVICE_ID = "audit-service";
+    public static final String KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS = 
"selector.follower.listen.cycle.ms";
+    public static final int DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS = 2000;
+
     // HikariConfig
     public static final String CACHE_PREP_STMTS = "cachePrepStmts";
     public static final String PREP_STMT_CACHE_SIZE = "prepStmtCacheSize";
@@ -124,11 +96,4 @@ public class ConfigConstants {
     public static final int MAX_INIT_COUNT = 2;
     public static final int RANDOM_BOUND = 10;
 
-    // Cache config
-    public static final String KEY_API_CACHE_MAX_SIZE = "api.cache.max.size";
-    public static final int DEFAULT_API_CACHE_MAX_SIZE = 50000000;
-
-    public static final String KEY_API_CACHE_EXPIRED_HOURS = 
"api.cache.expired.hours";
-    public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
-
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
index c04aaf6aac..acfa076de2 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
@@ -29,7 +29,7 @@ import java.util.Properties;
  */
 public class Configuration {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(Configuration.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Configuration.class);
     public static final String DEFAULT_CONFIG_FILE = 
"conf/audit-service.properties";
 
     private static volatile Configuration conf = null;
@@ -42,7 +42,7 @@ public class Configuration {
         try (FileInputStream fileInputStream = new 
FileInputStream(DEFAULT_CONFIG_FILE)) {
             properties.load(fileInputStream);
         } catch (Exception e) {
-            LOG.error("Configuration has exception!", e);
+            LOGGER.error("Configuration has exception!", e);
         }
     }
 
@@ -87,6 +87,18 @@ public class Configuration {
         return value == null ? defaultValue : (Integer) value;
     }
 
+    /**
+     * Get double value
+     *
+     * @param key
+     * @param defaultValue
+     * @return
+     */
+    public double get(String key, double defaultValue) {
+        Object value = properties.get(key);
+        return value == null ? defaultValue : (Double) value;
+    }
+
     /**
      * @param key
      * @return
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
new file mode 100644
index 0000000000..8f6a656c17
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.config;
+
+/**
+ * Open api constants
+ */
+public class OpenApiConstants {
+
+    // Api config
+    public static final String KEY_API_HOUR_PATH = "api.hour.path";
+    public static final String DEFAULT_API_HOUR_PATH = "/audit/query/hour";
+    public static final String KEY_API_DAY_PATH = "api.day.path";
+    public static final String DEFAULT_API_DAY_PATH = "/audit/query/day";
+    public static final String KEY_API_MINUTE_10_PATH = "api.minute.10.path";
+    public static final String DEFAULT_API_MINUTE_10_PATH = 
"/audit/query/minute/10";
+    public static final String KEY_API_MINUTE_30_PATH = "api.minute.30.path";
+    public static final String DEFAULT_API_MINUTE_30_PATH = 
"/audit/query/minute/30";
+    public static final String KEY_API_POOL_SIZE = "api.pool.size";
+    public static final int DEFAULT_POOL_SIZE = 10;
+    public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
+    public static final int DEFAULT_API_BACKLOG_SIZE = 100;
+    public static final String KEY_API_REAL_LIMITER_QPS = 
"api.real.limiter.qps";
+    public static final double DEFAULT_API_REAL_LIMITER_QPS = 100.0;
+
+    // Cache config
+    public static final String KEY_API_CACHE_MAX_SIZE = "api.cache.max.size";
+    public static final int DEFAULT_API_CACHE_MAX_SIZE = 50000000;
+
+    public static final String KEY_API_CACHE_EXPIRED_HOURS = 
"api.cache.expired.hours";
+    public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
+
+    // Http config
+    public static final String START_TIME = "startTime";
+    public static final String END_TIME = "endTime";
+    public static final String AUDIT_ID = "auditId";
+    public static final String AUDIT_TAG = "auditTag";
+    public static final String INLONG_GROUP_Id = "inlongGroupId";
+    public static final String INLONG_STREAM_Id = "inlongStreamId";
+    public static final String KEY_HTTP_BODY_SUCCESS = "success";
+    public static final String KEY_HTTP_BODY_ERR_MSG = "errMsg";
+    public static final String KEY_HTTP_BODY_ERR_DATA = "data";
+    public static final String KEY_HTTP_HEADER_CONTENT_TYPE = "Content-Type";
+    public static final String VALUE_HTTP_HEADER_CONTENT_TYPE = 
"application/json;charset=utf-8";
+    public static final int BIND_PORT = 80;
+    public static final int HTTP_RESPOND_CODE = 200;
+    public static final String DEFAULT_AUDIT_TAG = "";
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index 31d73aad81..04b4d8a4ed 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -37,34 +37,25 @@ public class SqlConstants {
     // ClickHouse query sql
     public static final String KEY_CLICKHOUSE_SOURCE_QUERY_SQL = 
"clickhouse.source.query.sql";
     public static final String DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL =
-            "SELECT inlong_group_id, inlong_stream_id, audit_id, audit_tag\n" +
-                    "    , sum(cnt) AS cnt, sum(size) AS size\n" +
-                    "    , sum(delay) AS delay\n" +
+            "SELECT inlong_group_id, inlong_stream_id, audit_id\n" +
+                    "\t, audit_tag, cnt, size, delay,MAX(audit_version)\n" +
                     "FROM (\n" +
-                    "    SELECT max(audit_version), ip, docker_id, 
thread_id\n" +
-                    "        , inlong_group_id, inlong_stream_id, audit_id, 
audit_tag, cnt\n" +
-                    "        , size, delay\n" +
-                    "    FROM (\n" +
-                    "        SELECT audit_version, ip, docker_id, thread_id, 
inlong_group_id\n" +
-                    "            , inlong_stream_id, audit_id, audit_tag, 
sum(count) AS cnt\n" +
-                    "            , sum(size) AS size, sum(delay) AS delay\n" +
-                    "        FROM (\n" +
-                    "            SELECT audit_version, docker_id, thread_id, 
sdk_ts, packet_id\n" +
-                    "                , log_ts, ip, inlong_group_id, 
inlong_stream_id, audit_id\n" +
-                    "                , audit_tag, count, size, delay\n" +
-                    "            FROM audit_data \n" +
-                    "            WHERE log_ts BETWEEN ? AND ? \n" +
-                    "                AND audit_id = ? \n" +
-                    "            GROUP BY audit_version, docker_id, thread_id, 
sdk_ts, packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id, 
audit_tag, count, size, delay\n"
-                    +
-                    "        ) t1\n" +
-                    "        GROUP BY audit_version, ip, docker_id, thread_id, 
inlong_group_id, inlong_stream_id, audit_id, audit_tag\n"
+                    "\tSELECT audit_version, inlong_group_id, 
inlong_stream_id, audit_id, audit_tag\n" +
+                    "\t\t, SUM(count) AS cnt, SUM(size) AS size\n" +
+                    "\t\t, SUM(delay) AS delay\n" +
+                    "\tFROM (\n" +
+                    "\t\tSELECT audit_version, docker_id, thread_id, sdk_ts, 
packet_id\n" +
+                    "\t\t\t, log_ts, ip, inlong_group_id, inlong_stream_id, 
audit_id\n" +
+                    "\t\t\t, audit_tag, count, size, delay\n" +
+                    "\t\tFROM audit_data\n" +
+                    "\t\tWHERE log_ts BETWEEN ? AND ? \n" +
+                    "\t\t\tAND audit_id = ? \n" +
+                    "\t\tGROUP BY audit_version, docker_id, thread_id, sdk_ts, 
packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id, audit_tag, 
count, size, delay\n"
                     +
-                    "    ) t2\n" +
-                    "    GROUP BY ip, docker_id, thread_id, inlong_group_id, 
inlong_stream_id, audit_id, audit_tag, cnt, size, delay\n"
-                    +
-                    ") t3\n" +
-                    "GROUP BY inlong_group_id, inlong_stream_id, audit_id, 
audit_tag";
+                    "\t) t1\n" +
+                    "\tGROUP BY audit_version, inlong_group_id, 
inlong_stream_id, audit_id, audit_tag\n" +
+                    ") t2\n" +
+                    "GROUP BY inlong_group_id, inlong_stream_id, audit_id, 
audit_tag, cnt, size, delay";
 
     // Mysql query sql
     public static final String KEY_MYSQL_SOURCE_QUERY_TEMP_SQL = 
"mysql.query.temp.sql";
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/JdbcConfig.java
similarity index 77%
copy from 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
copy to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/JdbcConfig.java
index f914269150..ae9487cbbe 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/JdbcConfig.java
@@ -15,12 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.selector.api;
+package org.apache.inlong.audit.entities;
 
-/**
- * Selector change listener
- */
-public abstract interface SelectorChangeListener {
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class JdbcConfig {
 
-    public abstract void leaderChanged(boolean paramBoolean);
-}
\ No newline at end of file
+    String driverClass;
+    String JdbcUrl;
+    String userName;
+    String password;
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
index b5f450b27d..c6faed687c 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
@@ -24,6 +24,7 @@ import java.sql.Timestamp;
 @Data
 public class StatData {
 
+    private String auditVersion;
     private String logTs;
     private String inlongGroupId;
     private String inlongStreamId;
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
new file mode 100644
index 0000000000..e2cace012c
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.main;
+
+import org.apache.inlong.audit.service.ApiService;
+import org.apache.inlong.audit.service.EtlService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Application {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Application.class);
+    private static final EtlService etlService = new EtlService();
+    private static final ApiService apiService = new ApiService();
+
+    public static void main(String[] args) {
+        try {
+            // Etl service aggregate the data from the data source and store 
the aggregated data to the target storage
+            etlService.start();
+
+            // Api service provide audit data interface to external services
+            apiService.start();
+
+            stopIfKilled();
+        } catch (Exception ex) {
+            LOGGER.error("Running exception: ", ex);
+        }
+    }
+
+    private static void stopIfKilled() {
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+                etlService.stop();
+                apiService.stop();
+                LOGGER.info("Stopping gracefully");
+            } catch (Exception ex) {
+                LOGGER.error("Stop error: ", ex);
+            }
+        }));
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/ElectorChangeListenerImpl.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/SelectorChangeListenerImpl.java
similarity index 79%
rename from 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/ElectorChangeListenerImpl.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/SelectorChangeListenerImpl.java
index d579250975..f9ff758d68 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/ElectorChangeListenerImpl.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/SelectorChangeListenerImpl.java
@@ -23,13 +23,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Elector change listener impl
+ * Selector change listener impl
  */
-public class ElectorChangeListenerImpl implements SelectorChangeListener {
+public class SelectorChangeListenerImpl implements SelectorChangeListener {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ElectorChangeListenerImpl.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SelectorChangeListenerImpl.class);
 
     public void leaderChanged(boolean currentNodeIsLeader) {
-        logger.info("LeaderChanged {}:", currentNodeIsLeader);
+        LOGGER.info("Leader changed {}:", currentNodeIsLeader);
     }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
index f914269150..064b84ee2e 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
@@ -22,5 +22,5 @@ package org.apache.inlong.audit.selector.api;
  */
 public abstract interface SelectorChangeListener {
 
-    public abstract void leaderChanged(boolean paramBoolean);
+    public abstract void leaderChanged(boolean currentNodeIsLeader);
 }
\ No newline at end of file
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
index b4dd399021..5110c7b69a 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
@@ -31,7 +31,7 @@ import java.util.Objects;
 @Data
 public class SelectorConfig {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(SelectorConfig.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SelectorConfig.class);
     public static final String MONITOR_COMMON_NAME = "audit";
     private final String serviceId;
     private final String leaderId;
@@ -41,7 +41,7 @@ public class SelectorConfig {
     private String dbUrl;
     private String dbUser;
     private String dbPasswd;
-    private String electorDbName = "leader_election";
+    private String selectorDbName = "leader_selector";
     private int leaderTimeout = 20;
     private int tryToBeLeaderInterval = 5;
     private int dbMonitorRunInterval = 20;
@@ -52,7 +52,7 @@ public class SelectorConfig {
     private String cachePrepStmts = "true";
     private int prepStmtCacheSize = 250;
     private int prepStmtCacheSqlLimit = 2048;
-    private String monitorName = "elector_leader_state";
+    private String monitorName = "selector_leader_state";
     private String ip;
     private SelectorChangeListener selectorChangeListener;
 
@@ -79,7 +79,7 @@ public class SelectorConfig {
             try {
                 ip = InetAddress.getLocalHost().getHostAddress();
             } catch (Exception e) {
-                logger.error("Get local ip has exception:{}", e.getMessage());
+                LOGGER.error("Get local ip has exception:{}", e.getMessage());
                 ip = "N/A";
             }
         return ip;
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
index d7a38fbf39..3f107e48c3 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
@@ -24,7 +24,7 @@ import org.apache.inlong.audit.selector.impl.SelectorImpl;
  */
 public class SelectorFactory {
 
-    public static Selector getNewElector(SelectorConfig electorConfig) {
-        return new SelectorImpl(electorConfig);
+    public static Selector getNewElector(SelectorConfig selectorConfig) {
+        return new SelectorImpl(selectorConfig);
     }
 }
\ No newline at end of file
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
index 2751321345..889cf87cb2 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class DBDataSource {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(DBDataSource.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DBDataSource.class);
     private String selectorSql = SqlConstants.SELECTOR_SQL;
     private String replaceLeaderSql = SqlConstants.REPLACE_LEADER_SQL;
     private String reLeaseSql = SqlConstants.RELEASE_SQL;
@@ -63,12 +63,12 @@ public class DBDataSource {
             if (!selectorConfig.isUseDefaultLeader()) {
                 initDataSource();
                 if (needFormatSql) {
-                    formatSql(selectorConfig.getElectorDbName(), 
selectorConfig.getServiceId(),
+                    formatSql(selectorConfig.getSelectorDbName(), 
selectorConfig.getServiceId(),
                             selectorConfig.getLeaderId());
                 }
             }
         } catch (Exception exception) {
-            logger.error(exception.getMessage());
+            LOGGER.error(exception.getMessage());
             throw exception;
         }
     }
@@ -88,7 +88,7 @@ public class DBDataSource {
                 if (datasource == null || datasource.isClosed()) {
                     HikariConfig config = new HikariConfig();
                     config.setDriverClassName(selectorConfig.getDbDriver());
-                    logger.info("Init dataSource:{}", 
selectorConfig.getDbUrl());
+                    LOGGER.info("Init dataSource:{}", 
selectorConfig.getDbUrl());
                     config.setJdbcUrl(selectorConfig.getDbUrl());
                     config.setUsername(selectorConfig.getDbUser());
                     config.setPassword(selectorConfig.getDbPasswd());
@@ -107,7 +107,7 @@ public class DBDataSource {
 
                 initSucc = true;
             } catch (Exception exception) {
-                logger.error("DB url:{},user name:{},password:{},exception:{}",
+                LOGGER.error("DB url:{},user name:{},password:{},exception:{}",
                         selectorConfig.getDbUrl(),
                         selectorConfig.getDbUser(),
                         selectorConfig.getDbPasswd(),
@@ -144,15 +144,15 @@ public class DBDataSource {
                 try (PreparedStatement pstmt = 
connection.prepareStatement(sql)) {
                     result = pstmt.executeUpdate();
                 } catch (Exception executeUpdatEexception) {
-                    logger.error("Exception :{}", 
executeUpdatEexception.getMessage());
+                    LOGGER.error("Exception :{}", 
executeUpdatEexception.getMessage());
                 }
             } catch (Exception pstmtEexception) {
-                logger.error("Exception :{}", pstmtEexception.getMessage());
+                LOGGER.error("Exception :{}", pstmtEexception.getMessage());
             }
             getConnectionFailTimes.set(0);
         } catch (Exception exception) {
             getConnectionFailTimes.addAndGet(1);
-            logger.warn("Get Connection fail. {}", exception.getMessage());
+            LOGGER.warn("Get Connection fail. {}", exception.getMessage());
         }
         return result;
     }
@@ -165,12 +165,12 @@ public class DBDataSource {
             try {
                 int result = executeUpdate(selectorSql);
                 if (result == 2) {
-                    logger.info("{} get the leader", 
selectorConfig.getLeaderId());
+                    LOGGER.info("{} get the leader", 
selectorConfig.getLeaderId());
                 } else if (result == 1) {
-                    logger.info("{} do not get the leader", 
selectorConfig.getLeaderId());
+                    LOGGER.info("{} do not get the leader", 
selectorConfig.getLeaderId());
                 }
             } catch (Exception exception) {
-                logger.error("Exception: {} ,sql:{}", exception.getMessage(), 
selectorSql);
+                LOGGER.error("Exception: {} ,sql:{}", exception.getMessage(), 
selectorSql);
             }
 
         }
@@ -187,13 +187,13 @@ public class DBDataSource {
         try {
             int result = executeUpdate(replaceLeaderSql);
             if (result > 0) {
-                logger.info("Replace leader success.sql:{}", replaceLeaderSql);
+                LOGGER.info("Replace leader success.sql:{}", replaceLeaderSql);
             } else {
-                logger.warn("Replace leader failed. sql:" + replaceLeaderSql);
+                LOGGER.warn("Replace leader failed. sql:" + replaceLeaderSql);
             }
 
         } catch (Exception exception) {
-            logger.error("Exception :{} ", exception.getMessage());
+            LOGGER.error("Exception :{} ", exception.getMessage());
         }
     }
 
@@ -203,12 +203,12 @@ public class DBDataSource {
     public void releaseLeader() {
         try {
             int result = executeUpdate(reLeaseSql);
-            logger.info("ReleaseLeader sql:{}", reLeaseSql);
+            LOGGER.info("ReleaseLeader sql:{}", reLeaseSql);
             if (result == 1) {
-                logger.info("{} release the leader success", 
selectorConfig.getLeaderId());
+                LOGGER.info("{} release the leader success", 
selectorConfig.getLeaderId());
             }
         } catch (Exception exception) {
-            logger.error("ReLease sql:{},exception {}:,", reLeaseSql, 
exception.getMessage());
+            LOGGER.error("ReLease sql:{},exception {}:,", reLeaseSql, 
exception.getMessage());
         }
 
     }
@@ -226,7 +226,7 @@ public class DBDataSource {
 
             try {
                 if (null == datasource || datasource.isClosed()) {
-                    logger.warn("DataSource is closed init is again");
+                    LOGGER.warn("DataSource is closed init is again");
                     initDataSource();
                 }
                 try (Connection connection = datasource.getConnection()) {
@@ -236,13 +236,13 @@ public class DBDataSource {
                             leaderId = resultSet.getString("leader");
                         }
                     } catch (Exception exception) {
-                        logger.error("Exception {}", exception.getMessage());
+                        LOGGER.error("Exception {}", exception.getMessage());
                     }
                 } catch (Throwable connectionException) {
-                    logger.error("Exception {}", 
connectionException.getMessage());
+                    LOGGER.error("Exception {}", 
connectionException.getMessage());
                 }
             } catch (Exception datasourceException) {
-                logger.error("Exception {}", datasourceException.getMessage());
+                LOGGER.error("Exception {}", datasourceException.getMessage());
             }
 
             return leaderId;
@@ -263,7 +263,7 @@ public class DBDataSource {
                 }
                 return false;
             } catch (Exception exception) {
-                logger.error("Exception {}", exception.getMessage());
+                LOGGER.error("Exception {}", exception.getMessage());
                 return true;
             }
         }
@@ -278,14 +278,14 @@ public class DBDataSource {
     public void formatSql(String... params) {
         selectorSql = MessageFormat.format(selectorSql, params);
         selectorSql = selectorSql.replaceAll("#", 
selectorConfig.getLeaderTimeout() + "");
-        logger.info(selectorSql);
+        LOGGER.info(selectorSql);
         replaceLeaderSql = MessageFormat.format(replaceLeaderSql, params);
-        logger.info(replaceLeaderSql);
+        LOGGER.info(replaceLeaderSql);
         reLeaseSql = MessageFormat.format(reLeaseSql, params);
-        logger.info("ReLeaseSql:{}", reLeaseSql);
+        LOGGER.info("ReLeaseSql:{}", reLeaseSql);
         isLeaderSql = MessageFormat.format(isLeaderSql, params);
-        logger.info(isLeaderSql);
+        LOGGER.info(isLeaderSql);
         searchCurrentLeaderSql = MessageFormat.format(searchCurrentLeaderSql, 
params);
-        logger.info(searchCurrentLeaderSql);
+        LOGGER.info(searchCurrentLeaderSql);
     }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
index 45352be5c2..c62de775db 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
@@ -37,7 +37,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class SelectorImpl extends Selector {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(SelectorImpl.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SelectorImpl.class);
     private final SelectorConfig selectorConfig;
     private final ExecutorService fixedThreadPool;
     private boolean canElector = true;
@@ -60,7 +60,7 @@ public class SelectorImpl extends Selector {
      */
     public void init() throws Exception {
         try {
-            logger.info("Init selector impl...");
+            LOGGER.info("Init selector impl...");
 
             dbDataSource.init(true);
 
@@ -68,7 +68,7 @@ public class SelectorImpl extends Selector {
 
             fixedThreadPool.execute(new DBMonitorTask(selectorConfig, 
dbDataSource));
         } catch (Exception exception) {
-            logger.error("Failed to init selector", exception);
+            LOGGER.error("Failed to init selector", exception);
         }
     }
 
@@ -89,14 +89,14 @@ public class SelectorImpl extends Selector {
             try {
                 dbDataSource.releaseLeader();
             } catch (Exception exception) {
-                logger.error("Exception :{}", exception.getMessage());
+                LOGGER.error("Exception :{}", exception.getMessage());
             }
 
         try {
             
TimeUnit.SECONDS.sleep(Configuration.getInstance().get(ConfigConstants.KEY_RELEASE_LEADER_INTERVAL,
                     ConfigConstants.DEFAULT_RELEASE_LEADER_INTERVAL));
         } catch (Exception exception) {
-            logger.error("Exception :{}", exception.getMessage());
+            LOGGER.error("Exception :{}", exception.getMessage());
         }
     }
 
@@ -142,7 +142,7 @@ public class SelectorImpl extends Selector {
             dbDataSource.init(false);
             canSelect(true);
         } catch (Exception exception) {
-            logger.error("Exception :{}", exception.getMessage());
+            LOGGER.error("Exception :{}", exception.getMessage());
             return false;
         }
         return true;
@@ -198,7 +198,7 @@ public class SelectorImpl extends Selector {
                 try {
                     TimeUnit.SECONDS.sleep(sleepTime);
                 } catch (Exception exception) {
-                    logger.error("Exception :{}", exception.getMessage());
+                    LOGGER.error("Exception :{}", exception.getMessage());
                 }
             }
         }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
index c1dc8e3d5c..4d980aa632 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
@@ -30,34 +30,34 @@ import java.util.concurrent.TimeUnit;
  */
 public class DBMonitorTask implements Runnable {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(DBMonitorTask.class);
-    private SelectorConfig electorConfig;
-    private DBDataSource dbDataSource;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DBMonitorTask.class);
+    private final SelectorConfig selectorConfig;
+    private final DBDataSource dbDataSource;
     private int dbClosedTimes = 0;
-    private boolean replaced = true;
+    private final boolean replaced = true;
 
-    public DBMonitorTask(SelectorConfig electorConfig, DBDataSource 
dbDataSource) {
-        this.electorConfig = electorConfig;
+    public DBMonitorTask(SelectorConfig selectorConfig, DBDataSource 
dbDataSource) {
+        this.selectorConfig = selectorConfig;
         this.dbDataSource = dbDataSource;
     }
 
     public void run() {
         try {
             while (true) {
-                logger.info("DB monitor task run once");
-                
TimeUnit.SECONDS.sleep(electorConfig.getDbMonitorRunInterval());
+                LOGGER.info("DB monitor task run once");
+                
TimeUnit.SECONDS.sleep(selectorConfig.getDbMonitorRunInterval());
 
-                if (!(electorConfig.isUseDefaultLeader()))
+                if (!(selectorConfig.isUseDefaultLeader()))
                     break;
             }
             if (dbDataSource.isDBDataSourceClosed()) {
                 dbClosedTimes += 1;
-                logger.info("DB closed times :{}", dbClosedTimes);
+                LOGGER.info("DB closed times :{}", dbClosedTimes);
             } else {
                 dbClosedTimes = 0;
             }
         } catch (Exception e) {
-            logger.error("DB monitor task has exception {}", e.getMessage());
+            LOGGER.error("DB monitor task has exception {}", e.getMessage());
         }
     }
 }
\ No newline at end of file
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
new file mode 100644
index 0000000000..6dac009c70
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service;
+
+import org.apache.inlong.audit.cache.DayCache;
+import org.apache.inlong.audit.cache.HalfHourCache;
+import org.apache.inlong.audit.cache.HourCache;
+import org.apache.inlong.audit.cache.TenMinutesCache;
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.AuditCycle;
+import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.utils.CacheUtils;
+
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+import static org.apache.inlong.audit.config.OpenApiConstants.AUDIT_ID;
+import static org.apache.inlong.audit.config.OpenApiConstants.AUDIT_TAG;
+import static org.apache.inlong.audit.config.OpenApiConstants.BIND_PORT;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_BACKLOG_SIZE;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_DAY_PATH;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_HOUR_PATH;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTE_10_PATH;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTE_30_PATH;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_AUDIT_TAG;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_POOL_SIZE;
+import static org.apache.inlong.audit.config.OpenApiConstants.END_TIME;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.HTTP_RESPOND_CODE;
+import static org.apache.inlong.audit.config.OpenApiConstants.INLONG_GROUP_Id;
+import static org.apache.inlong.audit.config.OpenApiConstants.INLONG_STREAM_Id;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_BACKLOG_SIZE;
+import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_DAY_PATH;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_HOUR_PATH;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTE_10_PATH;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTE_30_PATH;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_POOL_SIZE;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE;
+import static org.apache.inlong.audit.config.OpenApiConstants.START_TIME;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE;
+
+public class ApiService {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ApiService.class);
+
+    public void start() {
+        initHttpServer();
+    }
+
+    public void stop() {
+
+    }
+
+    private void initHttpServer() {
+        try {
+            HttpServer server = HttpServer.create(new 
InetSocketAddress(BIND_PORT),
+                    Configuration.getInstance().get(KEY_API_BACKLOG_SIZE, 
DEFAULT_API_BACKLOG_SIZE));
+            server.setExecutor(Executors.newFixedThreadPool(
+                    Configuration.getInstance().get(KEY_API_POOL_SIZE, 
DEFAULT_POOL_SIZE)));
+            
server.createContext(Configuration.getInstance().get(KEY_API_DAY_PATH, 
DEFAULT_API_DAY_PATH),
+                    new AuditHandler(AuditCycle.DAY));
+            
server.createContext(Configuration.getInstance().get(KEY_API_HOUR_PATH, 
DEFAULT_API_HOUR_PATH),
+                    new AuditHandler(AuditCycle.HOUR));
+            server.createContext(
+                    Configuration.getInstance().get(KEY_API_MINUTE_10_PATH, 
DEFAULT_API_MINUTE_10_PATH),
+                    new AuditHandler(AuditCycle.MINUTE_10));
+            
server.createContext(Configuration.getInstance().get(KEY_API_MINUTE_30_PATH, 
DEFAULT_API_MINUTE_30_PATH),
+                    new AuditHandler(AuditCycle.MINUTE_30));
+            server.start();
+        } catch (Exception e) {
+            LOGGER.error("Init http server has exception!", e);
+        }
+    }
+
+    static class AuditHandler implements HttpHandler, AutoCloseable {
+
+        private final AuditCycle apiType;
+        private final RateLimiter limiter;
+
+        public AuditHandler(AuditCycle apiType) {
+            this.apiType = apiType;
+            limiter = 
RateLimiter.create(Configuration.getInstance().get(KEY_API_REAL_LIMITER_QPS,
+                    DEFAULT_API_REAL_LIMITER_QPS));
+        }
+
+        @Override
+        public void handle(HttpExchange exchange) {
+            if (null != limiter) {
+                limiter.acquire();
+            }
+
+            try (OutputStream os = exchange.getResponseBody()) {
+                JsonObject responseJson = new JsonObject();
+
+                Map<String, String> params = 
parseRequestURI(exchange.getRequestURI().getQuery());
+                if (!checkParams(params)) {
+                    handleInvalidParams(responseJson, exchange);
+                } else {
+                    handleLegalParams(responseJson, params);
+                }
+
+                byte[] bytes = 
responseJson.toString().getBytes(StandardCharsets.UTF_8);
+
+                exchange.getResponseHeaders().set(KEY_HTTP_HEADER_CONTENT_TYPE,
+                        VALUE_HTTP_HEADER_CONTENT_TYPE);
+                exchange.sendResponseHeaders(HTTP_RESPOND_CODE, bytes.length);
+                os.write(bytes);
+            } catch (Exception e) {
+                LOGGER.error("Audit handler has exception!", e);
+            }
+        }
+
+        private Map<String, String> parseRequestURI(String query) {
+            Map<String, String> params = new HashMap<>();
+            if (query != null) {
+                String[] pairs = query.split("&");
+                for (String pair : pairs) {
+                    String[] keyValue = pair.split("=");
+                    if (keyValue.length == 2) {
+                        String key = keyValue[0];
+                        String value = keyValue[1];
+                        params.put(key, value);
+                    }
+                }
+            }
+            params.putIfAbsent(AUDIT_TAG, DEFAULT_AUDIT_TAG);
+            return params;
+        }
+
+        private boolean checkParams(Map<String, String> params) {
+            return params.containsKey(START_TIME)
+                    && params.containsKey(END_TIME)
+                    && params.containsKey(AUDIT_ID)
+                    && params.containsKey(INLONG_GROUP_Id)
+                    && params.containsKey(INLONG_STREAM_Id);
+        }
+
+        private void handleInvalidParams(JsonObject responseJson, HttpExchange 
exchange) {
+            responseJson.addProperty(KEY_HTTP_BODY_SUCCESS, false);
+            responseJson.addProperty(KEY_HTTP_BODY_ERR_MSG, "Invalid params! " 
+ exchange.getRequestURI());
+            Gson gson = new Gson();
+            responseJson.add(KEY_HTTP_BODY_ERR_DATA, gson.toJsonTree(new 
LinkedList<>()));
+        }
+
+        private void handleLegalParams(JsonObject responseJson, Map<String, 
String> params) {
+            String cacheKey = CacheUtils.buildCacheKey(params.get(START_TIME), 
params.get(INLONG_GROUP_Id),
+                    params.get(INLONG_STREAM_Id), params.get(AUDIT_ID), 
params.get(AUDIT_TAG));
+            LOGGER.info("handleLegalParams cacheKey {}", cacheKey);
+            List<StatData> statData = null;
+            switch (apiType) {
+                case MINUTE_10:
+                    statData = TenMinutesCache.getInstance().getData(cacheKey);
+                    break;
+                case MINUTE_30:
+                    statData = HalfHourCache.getInstance().getData(cacheKey);
+                    break;
+                case HOUR:
+                    statData = HourCache.getInstance().getData(cacheKey);
+                    break;
+                case DAY:
+                    statData = DayCache.getInstance().getData(
+                            params.get(START_TIME),
+                            params.get(END_TIME),
+                            params.get(INLONG_GROUP_Id),
+                            params.get(INLONG_STREAM_Id),
+                            params.get(AUDIT_ID),
+                            params.get(AUDIT_TAG));
+                    break;
+                default:
+                    LOGGER.error("Unsupported interface type! type is {}", 
apiType);
+            }
+            if (null == statData)
+                statData = new LinkedList<>();
+
+            responseJson.addProperty(KEY_HTTP_BODY_SUCCESS, true);
+            responseJson.addProperty(KEY_HTTP_BODY_ERR_MSG, "");
+            Gson gson = new Gson();
+            responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
+        }
+
+        @Override
+        public void close() throws Exception {
+
+        }
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
index 0574ac3771..eb3f40e5b4 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
@@ -23,33 +23,33 @@ import org.apache.inlong.audit.cache.TenMinutesCache;
 import org.apache.inlong.audit.channel.DataQueue;
 import org.apache.inlong.audit.config.Configuration;
 import org.apache.inlong.audit.entities.AuditCycle;
+import org.apache.inlong.audit.entities.JdbcConfig;
 import org.apache.inlong.audit.entities.SinkConfig;
 import org.apache.inlong.audit.entities.SourceConfig;
+import org.apache.inlong.audit.selector.api.Selector;
+import org.apache.inlong.audit.selector.api.SelectorConfig;
+import org.apache.inlong.audit.selector.api.SelectorFactory;
 import org.apache.inlong.audit.sink.CacheSink;
 import org.apache.inlong.audit.sink.JdbcSink;
 import org.apache.inlong.audit.source.JdbcSource;
+import org.apache.inlong.audit.utils.JdbcUtils;
+import org.apache.inlong.common.util.NetworkUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Objects;
+import java.util.UUID;
 
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CLICKHOUSE_DRIVER;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATA_QUEUE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_DRIVER;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_JDBC_URL;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_PASSWORD;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_USERNAME;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DAILY_SUMMARY_STAT_BACK_TIMES;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATA_QUEUE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER;
-import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_REALTIME_SUMMARY_STAT_BACK_TIMES;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_SERVICE_ID;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_DAILY_STAT_BACK_TIMES;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_REALTIME_STAT_BACK_TIMES;
 import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL;
 import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SINK_INSERT_DAY_SQL;
 import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL;
@@ -64,7 +64,7 @@ import static 
org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY
  */
 public class EtlService {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EtlService.class);
     private JdbcSource mysqlSourceOfTemp;
     private JdbcSource mysqlSourceOfTenMinutesCache;
     private JdbcSource mysqlSourceOfHalfHourCache;
@@ -77,23 +77,27 @@ public class EtlService {
     private CacheSink cacheSinkOfHourCache;
     private final int queueSize;
     private final int statBackTimes;
+    private static Selector selector;
+    private boolean running = true;
 
     public EtlService() {
         queueSize = Configuration.getInstance().get(KEY_DATA_QUEUE_SIZE,
                 DEFAULT_DATA_QUEUE_SIZE);
-        statBackTimes = 
Configuration.getInstance().get(KEY_REALTIME_SUMMARY_STAT_BACK_TIMES,
-                DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES);
+        statBackTimes = 
Configuration.getInstance().get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
+                DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES);
     }
 
     /**
      * Start the etl service.
      */
     public void start() {
-        clickhouseToMysql();
         mysqlToMysqlOfDay();
         mysqlToTenMinutesCache();
         mysqlToHalfHourCache();
         mysqlToHourCache();
+
+        initSelector();
+        waitToBeLeader();
     }
 
     /**
@@ -104,8 +108,8 @@ public class EtlService {
         DataQueue dataQueue = new DataQueue(queueSize);
 
         mysqlSourceOfTemp = new JdbcSource(dataQueue, 
buildMysqlSourceConfig(AuditCycle.DAY,
-                
Configuration.getInstance().get(KEY_DAILY_SUMMARY_STAT_BACK_TIMES,
-                        DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES)));
+                
Configuration.getInstance().get(KEY_SUMMARY_DAILY_STAT_BACK_TIMES,
+                        DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES)));
         mysqlSourceOfTemp.start();
 
         SinkConfig sinkConfig = 
buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_DAY_SQL,
@@ -175,21 +179,13 @@ public class EtlService {
      * @return
      */
     private SinkConfig buildMysqlSinkConfig(String insertSql) {
-        String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER, 
KEY_DEFAULT_MYSQL_DRIVER);
-        String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL);
-        String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME);
-        String passWord = Configuration.getInstance().get(KEY_MYSQL_PASSWORD);
-        assert (Objects.nonNull(driver)
-                && Objects.nonNull(jdbcUrl)
-                && Objects.nonNull(userName)
-                && Objects.nonNull(passWord));
-
+        JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
         return new SinkConfig(
                 insertSql,
-                driver,
-                jdbcUrl,
-                userName,
-                passWord);
+                jdbcConfig.getDriverClass(),
+                jdbcConfig.getJdbcUrl(),
+                jdbcConfig.getUserName(),
+                jdbcConfig.getPassword());
     }
 
     /**
@@ -198,22 +194,15 @@ public class EtlService {
      * @return
      */
     private SourceConfig buildMysqlSourceConfig(AuditCycle auditCycle, int 
statBackTimes) {
-        String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER, 
KEY_DEFAULT_MYSQL_DRIVER);
-        String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL);
-        String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME);
-        String passWord = Configuration.getInstance().get(KEY_MYSQL_PASSWORD);
-        assert (Objects.nonNull(driver)
-                && Objects.nonNull(jdbcUrl)
-                && Objects.nonNull(userName));
-
+        JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
         return new SourceConfig(auditCycle,
                 
Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL,
                         DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL),
                 statBackTimes,
-                driver,
-                jdbcUrl,
-                userName,
-                passWord);
+                jdbcConfig.getDriverClass(),
+                jdbcConfig.getJdbcUrl(),
+                jdbcConfig.getUserName(),
+                jdbcConfig.getPassword());
     }
 
     /**
@@ -222,30 +211,66 @@ public class EtlService {
      * @return
      */
     private SourceConfig buildClickhouseSourceConfig() {
-        String driver = Configuration.getInstance().get(KEY_CLICKHOUSE_DRIVER, 
DEFAULT_CLICKHOUSE_DRIVER);
-        String jdbcUrl = 
Configuration.getInstance().get(KEY_CLICKHOUSE_JDBC_URL);
-        String userName = 
Configuration.getInstance().get(KEY_CLICKHOUSE_USERNAME);
-        String passWord = 
Configuration.getInstance().get(KEY_CLICKHOUSE_PASSWORD);
-        assert (Objects.nonNull(driver)
-                && Objects.nonNull(jdbcUrl)
-                && Objects.nonNull(userName)
-                && Objects.nonNull(passWord));
-
+        JdbcConfig jdbcConfig = JdbcUtils.buildClickhouseConfig();
         return new SourceConfig(AuditCycle.MINUTE_5,
                 
Configuration.getInstance().get(KEY_CLICKHOUSE_SOURCE_QUERY_SQL,
                         DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL),
-                
Configuration.getInstance().get(KEY_REALTIME_SUMMARY_STAT_BACK_TIMES,
-                        DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES),
-                driver,
-                jdbcUrl,
-                userName,
-                passWord);
+                
Configuration.getInstance().get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
+                        DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES),
+                jdbcConfig.getDriverClass(),
+                jdbcConfig.getJdbcUrl(),
+                jdbcConfig.getUserName(),
+                jdbcConfig.getPassword());
+    }
+
+    /**
+     * Init selector
+     */
+    private void initSelector() {
+        JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
+        String leaderId = NetworkUtils.getLocalIp() + "-" + UUID.randomUUID();
+        LOGGER.info("Init selector. Leader id is :{}", leaderId);
+        if (selector == null) {
+            SelectorConfig electorConfig = new SelectorConfig(
+                    Configuration.getInstance().get(KEY_SELECTOR_SERVICE_ID, 
DEFAULT_SELECTOR_SERVICE_ID),
+                    leaderId,
+                    jdbcConfig.getJdbcUrl(),
+                    jdbcConfig.getUserName(), jdbcConfig.getPassword(), 
jdbcConfig.getDriverClass());
+
+            selector = SelectorFactory.getNewElector(electorConfig);
+            try {
+                selector.init();
+            } catch (Exception e) {
+                LOGGER.error("Init selector has exception:", e);
+            }
+        }
+    }
+
+    /**
+     * Wait to be leader
+     */
+    public void waitToBeLeader() {
+        while (running) {
+            try {
+                
Thread.sleep(Configuration.getInstance().get(KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS,
+                        DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS));
+            } catch (Exception e) {
+                LOGGER.error("Wait to be Leader has exception! lost 
Leadership!", e);
+            }
+
+            if (selector.isLeader()) {
+                LOGGER.info("I get Leadership! Begin to aggregate clickhouse 
data to mysql");
+                clickhouseToMysql();
+                return;
+            }
+        }
     }
 
     /**
      * Stop the etl service,and destroy related resources.
      */
     public void stop() {
+        running = false;
         mysqlSourceOfTemp.destroy();
         mysqlSinkOfDay.destroy();
 
@@ -259,5 +284,7 @@ public class EtlService {
         cacheSinkOfTenMinutesCache.destroy();
         cacheSinkOfHalfHourCache.destroy();
         cacheSinkOfHourCache.destroy();
+
+        selector.close();
     }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
index 8077817b1d..6f5f809a3d 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
@@ -40,7 +40,7 @@ import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_
  */
 public class CacheSink {
 
-    private static final Logger LOG = LoggerFactory.getLogger(CacheSink.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CacheSink.class);
     private final ScheduledExecutorService sinkTimer = 
Executors.newSingleThreadScheduledExecutor();
     private final DataQueue dataQueue;
     private final Cache<String, StatData> cache;
@@ -77,7 +77,7 @@ public class CacheSink {
                 data = dataQueue.pull(pullTimeOut, TimeUnit.MILLISECONDS);
             }
         } catch (Exception exception) {
-            LOG.error("Process exception! ", exception);
+            LOGGER.error("Process exception! ", exception);
         }
     }
 
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
index 451cbdc50a..6f308ac3a9 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
@@ -60,7 +60,7 @@ import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL
  */
 public class JdbcSink implements AutoCloseable {
 
-    private static final Logger LOG = LoggerFactory.getLogger(JdbcSink.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcSink.class);
     private final ScheduledExecutorService sinkTimer = 
Executors.newSingleThreadScheduledExecutor();
     private final DataQueue dataQueue;
     private final int insertBatch;
@@ -126,7 +126,7 @@ public class JdbcSink implements AutoCloseable {
                 preparedStatement.clearBatch();
             }
         } catch (Exception e) {
-            LOG.error("Process exception! {}", e.getMessage());
+            LOGGER.error("Process exception! {}", e.getMessage());
         }
     }
 
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
index e4e8e1dd0d..ec828a4bf6 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
@@ -56,6 +56,7 @@ import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_STAT_INTERVAL;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_STAT_BACK_INITIAL_OFFSET;
 import static org.apache.inlong.audit.config.ConfigConstants.KEY_AUDIT_IDS;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
@@ -63,8 +64,10 @@ import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_STAT_INTERVAL;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_STAT_BACK_INITIAL_OFFSET;
 import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
 import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_AUDIT_TAG;
 import static org.apache.inlong.audit.entities.AuditCycle.DAY;
 import static org.apache.inlong.audit.entities.AuditCycle.HOUR;
 
@@ -74,7 +77,7 @@ import static 
org.apache.inlong.audit.entities.AuditCycle.HOUR;
 @Data
 public class JdbcSource {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcSource.class);
     private final ConcurrentHashMap<Integer, ScheduledExecutorService> 
statTimers = new ConcurrentHashMap<>();
     private DataQueue dataQueue;
     private List<String> auditIds;
@@ -106,12 +109,14 @@ public class JdbcSource {
         if (sourceConfig.getAuditCycle() == DAY) {
             statInterval = HOUR.getValue();
         }
-        for (int statBackTime = 1; statBackTime < 
sourceConfig.getStatBackTimes(); statBackTime++) {
+        int offset = 
Configuration.getInstance().get(KEY_STAT_BACK_INITIAL_OFFSET,
+                DEFAULT_STAT_BACK_INITIAL_OFFSET);
+        for (int statBackTime = 0; statBackTime < 
sourceConfig.getStatBackTimes(); statBackTime++) {
             ScheduledExecutorService timer =
                     statTimers.computeIfAbsent(statBackTime, k -> 
Executors.newSingleThreadScheduledExecutor());
-            timer.scheduleWithFixedDelay(new StatServer(statBackTime),
+            timer.scheduleWithFixedDelay(new StatServer(offset++),
                     0,
-                    statInterval, TimeUnit.MINUTES);
+                    statInterval + statBackTime, TimeUnit.MINUTES);
         }
     }
 
@@ -218,12 +223,12 @@ public class JdbcSource {
 
         public void run() {
             long currentTimestamp = System.currentTimeMillis();
-            LOG.info("Stat source data at {},stat back times:{}", 
currentTimestamp, statBackTimes);
+            LOGGER.info("Stat source data at {},stat back times:{}", 
currentTimestamp, statBackTimes);
 
             statByStep();
 
             long timeCost = System.currentTimeMillis() - currentTimestamp;
-            LOG.info("Stat source data cost time:{}ms,stat back times:{}", 
timeCost, statBackTimes);
+            LOGGER.info("Stat source data cost time:{}ms,stat back times:{}", 
timeCost, statBackTimes);
         }
 
         /**
@@ -255,7 +260,7 @@ public class JdbcSource {
                 long currentTimestamp = System.currentTimeMillis();
                 query(statCycle.getStartTime(), statCycle.getEndTime(), 
auditId);
                 long timeCost = System.currentTimeMillis() - currentTimestamp;
-                LOG.info("[{}]-[{}],{},stat back times:{},audit 
id:{},cost:{}ms",
+                LOGGER.info("[{}]-[{}],{},stat back times:{},audit 
id:{},cost:{}ms",
                         statCycle.getStartTime(), statCycle.getEndTime(),
                         sourceConfig.getAuditCycle(),
                         statBackTimes, auditId, timeCost);
@@ -280,29 +285,27 @@ public class JdbcSource {
                 pstat.setString(3, auditId);
                 try (ResultSet resultSet = pstat.executeQuery()) {
                     while (resultSet.next()) {
-                        String inlongGroupID = resultSet.getString(1);
-                        String InlongStreamID = resultSet.getString(2);
-                        String AuditId = resultSet.getString(3);
-                        String AuditTag = resultSet.getString(4);
-                        long count = resultSet.getLong(5);
-                        long size = resultSet.getLong(6);
-                        long delay = resultSet.getLong(7);
                         StatData data = new StatData();
                         data.setLogTs(startTime);
-                        data.setInlongGroupId(inlongGroupID);
-                        data.setInlongStreamId(InlongStreamID);
-                        data.setAuditId(AuditId);
-                        data.setAuditTag(AuditTag);
-                        data.setCount(count);
-                        data.setSize(size);
-                        data.setDelay(delay);
+                        data.setInlongGroupId(resultSet.getString(1));
+                        data.setInlongStreamId(resultSet.getString(2));
+                        data.setAuditId(resultSet.getString(3));
+                        String auditTag = resultSet.getString(4);
+                        if (null == auditTag) {
+                            data.setAuditTag(DEFAULT_AUDIT_TAG);
+                        } else {
+                            data.setAuditTag(auditTag);
+                        }
+                        data.setCount(resultSet.getLong(5));
+                        data.setSize(resultSet.getLong(6));
+                        data.setDelay(resultSet.getLong(7));
                         dataQueue.push(data);
                     }
                 } catch (SQLException sqlException) {
-                    LOG.error("Query has SQL exception! ", sqlException);
+                    LOGGER.error("Query has SQL exception! ", sqlException);
                 }
             } catch (Exception exception) {
-                LOG.error("Query has exception! ", exception);
+                LOGGER.error("Query has exception! ", exception);
             }
         }
 
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
new file mode 100644
index 0000000000..97c28d68be
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.JdbcConfig;
+
+import java.util.Objects;
+
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CLICKHOUSE_DRIVER;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_DRIVER;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_JDBC_URL;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_PASSWORD;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_USERNAME;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER;
+import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME;
+
+/**
+ * Jdbc utils
+ */
+public class JdbcUtils {
+
+    /**
+     * Build mysql config
+     * @return
+     */
+    public static JdbcConfig buildMysqlConfig() {
+        return doBuild(Configuration.getInstance().get(KEY_MYSQL_DRIVER, 
KEY_DEFAULT_MYSQL_DRIVER),
+                Configuration.getInstance().get(KEY_MYSQL_JDBC_URL),
+                Configuration.getInstance().get(KEY_MYSQL_USERNAME),
+                Configuration.getInstance().get(KEY_MYSQL_PASSWORD));
+    }
+
+    /**
+     * Build clickhouse config
+     * @return
+     */
+    public static JdbcConfig buildClickhouseConfig() {
+        return doBuild(
+                Configuration.getInstance().get(KEY_CLICKHOUSE_DRIVER, 
DEFAULT_CLICKHOUSE_DRIVER),
+                Configuration.getInstance().get(KEY_CLICKHOUSE_JDBC_URL),
+                Configuration.getInstance().get(KEY_CLICKHOUSE_USERNAME),
+                Configuration.getInstance().get(KEY_CLICKHOUSE_PASSWORD));
+    }
+
+    /**
+     * Do build config
+     * @param driverClass
+     * @param jdbcUrl
+     * @param userName
+     * @param password
+     * @return
+     */
+    private static JdbcConfig doBuild(String driverClass, String jdbcUrl, 
String userName, String password) {
+        assert (Objects.nonNull(driverClass)
+                && Objects.nonNull(jdbcUrl)
+                && Objects.nonNull(userName)
+                && Objects.nonNull(password));
+
+        return new JdbcConfig(
+                driverClass,
+                jdbcUrl,
+                userName,
+                password);
+    }
+}
diff --git a/inlong-audit/bin/service-start.sh 
b/inlong-audit/bin/service-start.sh
new file mode 100644
index 0000000000..f1ea65a31e
--- /dev/null
+++ b/inlong-audit/bin/service-start.sh
@@ -0,0 +1,85 @@
+#!/bin/bash
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+BASE_DIR=$(cd "$(dirname "$0")"/../;pwd)
+
+# Enter the root directory path
+# shellcheck disable=SC2164
+cd "$BASE_DIR"
+
+# Prepare common dependency
+ROOT_DIR=$BASE_DIR/../..
+if [ -e $ROOT_DIR/bin/prepare_module_dependencies.sh ]; then
+    $ROOT_DIR/bin/prepare_module_dependencies.sh ./inlong-audit/lib
+fi
+
+PID=$(ps -ef | grep "audit-service" | grep -v grep | awk '{ print $2}')
+LOG_DIR="${BASE_DIR}/logs"
+
+if [ -n "$PID" ]; then
+ echo "Application has already started."
+ exit 0
+fi
+
+if [[ -z $JAVA_HOME ]]; then
+    JAVA=$(which java)
+    if [ $? != 0 ]; then
+        echo "Error: JAVA_HOME not set, and no java executable found in 
$PATH." 1>&2
+        exit 1
+    fi
+else
+    JAVA=$JAVA_HOME/bin/java
+fi
+
+if [ ! -d "${LOG_DIR}" ]; then
+  mkdir "${LOG_DIR}"
+fi
+
+JAVA_OPTS="-server -XX:SurvivorRatio=2 -XX:+UseParallelGC"
+
+if [ -z "$AUDIT_JVM_HEAP_OPTS" ]; then
+  HEAP_OPTS="-Xms1g -Xmx1g"
+else
+  HEAP_OPTS="$AUDIT_JVM_HEAP_OPTS"
+fi
+JAVA_OPTS="${JAVA_OPTS} ${HEAP_OPTS}"
+
+# shellcheck disable=SC2010
+SERVER_JAR=$(ls -lt "${BASE_DIR}"/lib |grep audit-service | head -2 | tail -1 
| awk '{print $NF}')
+
+nohup "$JAVA" $JAVA_OPTS -Daudit.log.path="$LOG_DIR" 
-Dloader.path="$BASE_DIR/conf,$BASE_DIR/lib/" -jar "$BASE_DIR/lib/$SERVER_JAR"  
1>"${LOG_DIR}"/service.log  2>"${LOG_DIR}"/service-error.log &
+
+PID_FILE="$BASE_DIR/bin/PID"
+
+# shellcheck disable=SC2009
+PID=$(ps -ef | grep "$BASE_DIR" | grep "$SERVER_JAR"| grep -v grep | awk '{ 
print $2}')
+
+sleep 3
+
+if [ -n "$PID" ]; then
+  echo -n "$PID" > "$PID_FILE"
+  echo "Application started."
+  exit 0
+else
+  echo "Application start failed."
+  exit 0
+fi
diff --git a/inlong-audit/bin/service-stop.sh b/inlong-audit/bin/service-stop.sh
new file mode 100644
index 0000000000..75cca34949
--- /dev/null
+++ b/inlong-audit/bin/service-stop.sh
@@ -0,0 +1,42 @@
+#!/bin/bash
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# this program kills the audit service
+for i in {1..5}
+do
+  pid=$(ps aux | grep "audit-service" | grep -v "grep" | awk '{print $2}')
+  if [ -z "$pid" ]; then
+      echo "audit-service is not running"
+      break
+  else
+      kill $pid
+  fi
+  sleep 10
+done
+
+pid=$(ps aux | grep "audit-service" | grep -v "grep" | awk '{print $2}')
+if [ -z "$pid" ]; then
+    echo "audit-service has stopped"
+else
+    kill -9 $pid
+fi
+
+echo "Stop audit-service successfully."
diff --git a/inlong-audit/conf/audit-service.properties 
b/inlong-audit/conf/audit-service.properties
new file mode 100644
index 0000000000..66fa9e0fd5
--- /dev/null
+++ b/inlong-audit/conf/audit-service.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#  clickhouse config
+clickhouse.jdbc.url=jdbc:clickhouse://*****:***/db_inlong_audit?socket_timeout=600000
+clickhouse.username=*****
+clickhouse.password=*****
+
+#  mysql config
+mysql.jdbc.url=jdbc:mysql://*****:***/apache_inlong_audit?characterEncoding=utf8&useUnicode=true&rewriteBatchedStatements=true
+mysql.username=*****
+mysql.password=*****
+
+#  summary config
+summary.realtime.stat.back.times=6
+summary.daily.stat.back.times=2
+audit.ids=3;4;5;6
+
+#  api config
+api.cache.max.size=50000000
+api.cache.expired.hours=12
+api.real.limiter.qps=1000.0
+api.pool.size=10
+api.backlog.size=100
+
diff --git a/inlong-audit/pom.xml b/inlong-audit/pom.xml
index fa98312e8b..53996d2917 100644
--- a/inlong-audit/pom.xml
+++ b/inlong-audit/pom.xml
@@ -33,6 +33,7 @@
     <modules>
         <module>audit-proxy</module>
         <module>audit-store</module>
+        <module>audit-service</module>
         <module>audit-common</module>
         <module>audit-sdk</module>
         <module>audit-release</module>
diff --git a/inlong-audit/sql/audit-service/apcache_inlong_audit_aggregate.sql 
b/inlong-audit/sql/audit-service/apcache_inlong_audit_aggregate.sql
index 5794355533..5e15717bff 100644
--- a/inlong-audit/sql/audit-service/apcache_inlong_audit_aggregate.sql
+++ b/inlong-audit/sql/audit-service/apcache_inlong_audit_aggregate.sql
@@ -66,3 +66,15 @@ CREATE TABLE IF NOT EXISTS `audit_data_day`
     PRIMARY KEY 
(`log_ts`,`inlong_group_id`,`inlong_stream_id`,`audit_id`,`audit_tag`)
 ) ENGINE = InnoDB
 DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data day table';
+
+-- ----------------------------
+-- Table structure for selector
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `leader_selector` (
+   `service_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci 
NOT NULL,
+   `leader_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci 
NOT NULL,
+   `last_seen_active` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+   PRIMARY KEY (`service_id`)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_general_ci 
COMMENT = 'selector db'
+
+


Reply via email to