This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new da4c77e2cd [INLONG-9920][Audit] Audit-service add codes of source 
(#9941)
da4c77e2cd is described below

commit da4c77e2cd6ccd80949139c2ba59a08281a3d45e
Author: doleyzi <[email protected]>
AuthorDate: Tue Apr 9 14:08:21 2024 +0800

    [INLONG-9920][Audit] Audit-service add codes of source (#9941)
---
 .../src/main/java/config/SqlConstants.java         |  37 ---
 .../apache/inlong/audit}/channel/DataQueue.java    |   5 +-
 .../inlong/audit}/config/ConfigConstants.java      |  11 +-
 .../apache/inlong/audit}/config/Configuration.java |   7 +-
 .../apache/inlong/audit/config/SqlConstants.java   |  80 ++++++
 .../apache/inlong/audit}/entities/AuditCycle.java  |   2 +-
 .../apache/inlong/audit}/entities/Request.java     |   2 +-
 .../inlong/audit}/entities/SourceConfig.java       |  26 +-
 .../inlong/audit}/entities/StartEndTime.java       |   2 +-
 .../apache/inlong/audit}/entities/StatData.java    |   2 +-
 .../audit/selector}/ElectorChangeListenerImpl.java |   5 +-
 .../inlong/audit/selector}/api/Selector.java       |   2 +-
 .../selector}/api/SelectorChangeListener.java      |   2 +-
 .../inlong/audit/selector}/api/SelectorConfig.java |   2 +-
 .../audit/selector}/api/SelectorFactory.java       |   4 +-
 .../inlong/audit/selector}/impl/DBDataSource.java  |  39 ++-
 .../inlong/audit/selector}/impl/SelectorImpl.java  |  28 +-
 .../inlong/audit/selector}/task/DBMonitorTask.java |   7 +-
 .../org/apache/inlong/audit/source/JdbcSource.java | 314 +++++++++++++++++++++
 .../src/main/java/source/AbstractSource.java       | 125 --------
 .../src/main/java/source/SourceStat.java           |  30 --
 inlong-audit/pom.xml                               |   6 +
 22 files changed, 483 insertions(+), 255 deletions(-)

diff --git a/inlong-audit/audit-service/src/main/java/config/SqlConstants.java 
b/inlong-audit/audit-service/src/main/java/config/SqlConstants.java
deleted file mode 100644
index a7164f2bdf..0000000000
--- a/inlong-audit/audit-service/src/main/java/config/SqlConstants.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package config;
-
-/**
- * Sql constants
- */
-public class SqlConstants {
-
-    // HA selector sql
-    public static final String SELECTOR_SQL =
-            "insert ignore into {0} (service_id, leader_id, last_seen_active) 
values (''{1}'', ''{2}'', now()) on duplicate key update leader_id = 
if(last_seen_active < now() - interval # second, values(leader_id), 
leader_id),last_seen_active = if(leader_id = values(leader_id), 
values(last_seen_active), last_seen_active)";
-    public static final String REPLACE_LEADER_SQL =
-            "replace into {0} ( service_id, leader_id, last_seen_active ) 
values (''{1}'', ''#'', now())";
-    public static final String RELEASE_SQL = "delete from {0} where 
service_id=''{1}'' and leader_id= ''{2}''";
-    public static final String IS_LEADER_SQL =
-            "select count(*) as is_leader from {0} where service_id=''{1}'' 
and leader_id=''{2}''";
-    public static final String SEARCH_CURRENT_LEADER_SQL =
-            "select leader_id as leader from {0} where service_id=''{1}''";
-    public static final String SELECT_TEST_SQL = "SELECT 1 ";
-
-}
diff --git a/inlong-audit/audit-service/src/main/java/channel/DataQueue.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
similarity index 95%
rename from inlong-audit/audit-service/src/main/java/channel/DataQueue.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
index 46b3d9068f..3169e9d377 100644
--- a/inlong-audit/audit-service/src/main/java/channel/DataQueue.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package channel;
+package org.apache.inlong.audit.channel;
+
+import org.apache.inlong.audit.entities.StatData;
 
-import entities.StatData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
similarity index 93%
rename from inlong-audit/audit-service/src/main/java/config/ConfigConstants.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
index 42ef2d7046..bc4f0f9326 100644
--- a/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package config;
+package org.apache.inlong.audit.config;
 
 /**
  * Config constants
@@ -112,6 +112,15 @@ public class ConfigConstants {
     public static final String PREP_STMT_CACHE_SIZE = "prepStmtCacheSize";
     public static final String PREP_STMT_CACHE_SQL_LIMIT = 
"prepStmtCacheSqlLimit";
 
+    public static final String KEY_CACHE_PREP_STMTS = "cache.prep.stmts";
+    public static final boolean DEFAULT_CACHE_PREP_STMTS = true;
+
+    public static final String KEY_PREP_STMT_CACHE_SIZE = 
"prep.stmt.cache.size";
+    public static final int DEFAULT_PREP_STMT_CACHE_SIZE = 250;
+
+    public static final String KEY_PREP_STMT_CACHE_SQL_LIMIT = 
"prep.stmt.cache.sql.limit";
+    public static final int DEFAULT_PREP_STMT_CACHE_SQL_LIMIT = 2048;
+
     public static final int MAX_INIT_COUNT = 2;
     public static final int RANDOM_BOUND = 10;
 }
diff --git a/inlong-audit/audit-service/src/main/java/config/Configuration.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
similarity index 92%
rename from inlong-audit/audit-service/src/main/java/config/Configuration.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
index 56a50f8266..c04aaf6aac 100644
--- a/inlong-audit/audit-service/src/main/java/config/Configuration.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package config;
+package org.apache.inlong.audit.config;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,6 +72,11 @@ public class Configuration {
         return value == null ? defaultValue : value.toString();
     }
 
+    public boolean get(String key, boolean defaultValue) {
+        Object value = properties.get(key);
+        return value == null ? defaultValue : (Boolean) value;
+    }
+
     /**
      * @param key
      * @param defaultValue
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
new file mode 100644
index 0000000000..a64cd2234d
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.config;
+
+/**
+ * Sql constants
+ */
+public class SqlConstants {
+
+    // HA selector sql
+    public static final String SELECTOR_SQL =
+            "insert ignore into {0} (service_id, leader_id, last_seen_active) 
values (''{1}'', ''{2}'', now()) on duplicate key update leader_id = 
if(last_seen_active < now() - interval # second, values(leader_id), 
leader_id),last_seen_active = if(leader_id = values(leader_id), 
values(last_seen_active), last_seen_active)";
+    public static final String REPLACE_LEADER_SQL =
+            "replace into {0} ( service_id, leader_id, last_seen_active ) 
values (''{1}'', ''#'', now())";
+    public static final String RELEASE_SQL = "delete from {0} where 
service_id=''{1}'' and leader_id= ''{2}''";
+    public static final String IS_LEADER_SQL =
+            "select count(*) as is_leader from {0} where service_id=''{1}'' 
and leader_id=''{2}''";
+    public static final String SEARCH_CURRENT_LEADER_SQL =
+            "select leader_id as leader from {0} where service_id=''{1}''";
+    public static final String SELECT_TEST_SQL = "SELECT 1 ";
+
+    // ClickHouse query sql
+    public static final String KEY_CLICKHOUSE_SOURCE_QUERY_SQL = 
"clickhouse.source.query.sql";
+    public static final String DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL =
+            "SELECT inlong_group_id, inlong_stream_id, audit_id, audit_tag\n" +
+                    "    , sum(cnt) AS cnt, sum(size) AS size\n" +
+                    "    , sum(delay) AS delay\n" +
+                    "FROM (\n" +
+                    "    SELECT max(audit_version), ip, docker_id, 
thread_id\n" +
+                    "        , inlong_group_id, inlong_stream_id, audit_id, 
audit_tag, cnt\n" +
+                    "        , size, delay\n" +
+                    "    FROM (\n" +
+                    "        SELECT audit_version, ip, docker_id, thread_id, 
inlong_group_id\n" +
+                    "            , inlong_stream_id, audit_id, audit_tag, 
sum(count) AS cnt\n" +
+                    "            , sum(size) AS size, sum(delay) AS delay\n" +
+                    "        FROM (\n" +
+                    "            SELECT audit_version, docker_id, thread_id, 
sdk_ts, packet_id\n" +
+                    "                , log_ts, ip, inlong_group_id, 
inlong_stream_id, audit_id\n" +
+                    "                , audit_tag, count, size, delay\n" +
+                    "            FROM audit_data \n" +
+                    "            WHERE log_ts BETWEEN ? AND ? \n" +
+                    "                AND audit_id = ? \n" +
+                    "            GROUP BY audit_version, docker_id, thread_id, 
sdk_ts, packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id, 
audit_tag, count, size, delay\n"
+                    +
+                    "        ) t1\n" +
+                    "        GROUP BY audit_version, ip, docker_id, thread_id, 
inlong_group_id, inlong_stream_id, audit_id, audit_tag\n"
+                    +
+                    "    ) t2\n" +
+                    "    GROUP BY ip, docker_id, thread_id, inlong_group_id, 
inlong_stream_id, audit_id, audit_tag, cnt, size, delay\n"
+                    +
+                    ") t3\n" +
+                    "GROUP BY inlong_group_id, inlong_stream_id, audit_id, 
audit_tag";
+
+    // Mysql query sql
+    public static final String KEY_MYSQL_SOURCE_QUERY_TEMP_SQL = 
"mysql.query.temp.sql";
+    public static final String DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL =
+            "SELECT inlong_group_id, inlong_stream_id, audit_id, audit_tag\n" +
+                    ", sum(count) AS cnt, sum(size) AS size\n" +
+                    ", sum(delay) AS delay\n" +
+                    "FROM audit_data_temp\n" +
+                    "WHERE log_ts BETWEEN ?  AND ? \n" +
+                    "AND audit_id = ? \n" +
+                    "GROUP BY inlong_group_id, inlong_stream_id, audit_id, 
audit_tag";
+
+}
diff --git a/inlong-audit/audit-service/src/main/java/entities/AuditCycle.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/AuditCycle.java
similarity index 96%
rename from inlong-audit/audit-service/src/main/java/entities/AuditCycle.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/AuditCycle.java
index 0b0bb82ce5..8420bdcaeb 100644
--- a/inlong-audit/audit-service/src/main/java/entities/AuditCycle.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/AuditCycle.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package entities;
+package org.apache.inlong.audit.entities;
 
 /**
  * Audit cycle
diff --git a/inlong-audit/audit-service/src/main/java/entities/Request.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/Request.java
similarity index 95%
rename from inlong-audit/audit-service/src/main/java/entities/Request.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/Request.java
index 0e9ec8a4da..67954f1c72 100644
--- a/inlong-audit/audit-service/src/main/java/entities/Request.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/Request.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package entities;
+package org.apache.inlong.audit.entities;
 
 import lombok.Data;
 
diff --git 
a/inlong-audit/audit-service/src/main/java/entities/SourceConfig.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
similarity index 57%
rename from inlong-audit/audit-service/src/main/java/entities/SourceConfig.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
index f26ea02566..428ca0389f 100644
--- a/inlong-audit/audit-service/src/main/java/entities/SourceConfig.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package entities;
+package org.apache.inlong.audit.entities;
 
 import lombok.Data;
 
@@ -26,12 +26,26 @@ import lombok.Data;
 public class SourceConfig {
 
     private AuditCycle auditCycle;
-    private String sourceTable;
-    private int beforeTimes;
+    private String querySql;
+    private int statBackTimes;
+    private final String driverClassName;
+    private final String jdbcUrl;
+    private final String username;
+    private final String password;
 
-    public SourceConfig(AuditCycle auditCycle, String sourceTable, int 
beforeTimes) {
+    public SourceConfig(AuditCycle auditCycle,
+            String querySql,
+            int statBackTimes,
+            String driverClassName,
+            String jdbcUrl,
+            String username,
+            String password) {
         this.auditCycle = auditCycle;
-        this.sourceTable = sourceTable;
-        this.beforeTimes = beforeTimes;
+        this.querySql = querySql;
+        this.statBackTimes = statBackTimes;
+        this.driverClassName = driverClassName;
+        this.jdbcUrl = jdbcUrl;
+        this.username = username;
+        this.password = password;
     }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/entities/StartEndTime.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StartEndTime.java
similarity index 95%
rename from inlong-audit/audit-service/src/main/java/entities/StartEndTime.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StartEndTime.java
index 7b7944320e..d7bba55679 100644
--- a/inlong-audit/audit-service/src/main/java/entities/StartEndTime.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StartEndTime.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package entities;
+package org.apache.inlong.audit.entities;
 
 import lombok.Data;
 
diff --git a/inlong-audit/audit-service/src/main/java/entities/StatData.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
similarity index 96%
rename from inlong-audit/audit-service/src/main/java/entities/StatData.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
index aefc828efe..b5f450b27d 100644
--- a/inlong-audit/audit-service/src/main/java/entities/StatData.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package entities;
+package org.apache.inlong.audit.entities;
 
 import lombok.Data;
 
diff --git 
a/inlong-audit/audit-service/src/main/java/elector/ElectorChangeListenerImpl.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/ElectorChangeListenerImpl.java
similarity index 91%
rename from 
inlong-audit/audit-service/src/main/java/elector/ElectorChangeListenerImpl.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/ElectorChangeListenerImpl.java
index 9017caacff..d579250975 100644
--- 
a/inlong-audit/audit-service/src/main/java/elector/ElectorChangeListenerImpl.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/ElectorChangeListenerImpl.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package elector;
+package org.apache.inlong.audit.selector;
+
+import org.apache.inlong.audit.selector.api.SelectorChangeListener;
 
-import elector.api.SelectorChangeListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/inlong-audit/audit-service/src/main/java/elector/api/Selector.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/Selector.java
similarity index 96%
rename from inlong-audit/audit-service/src/main/java/elector/api/Selector.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/Selector.java
index 5e9e564762..b9a66c7b96 100644
--- a/inlong-audit/audit-service/src/main/java/elector/api/Selector.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/Selector.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package elector.api;
+package org.apache.inlong.audit.selector.api;
 
 /**
  * Selector
diff --git 
a/inlong-audit/audit-service/src/main/java/elector/api/SelectorChangeListener.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
similarity index 95%
rename from 
inlong-audit/audit-service/src/main/java/elector/api/SelectorChangeListener.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
index a4af6a9598..f914269150 100644
--- 
a/inlong-audit/audit-service/src/main/java/elector/api/SelectorChangeListener.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package elector.api;
+package org.apache.inlong.audit.selector.api;
 
 /**
  * Selector change listener
diff --git 
a/inlong-audit/audit-service/src/main/java/elector/api/SelectorConfig.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
similarity index 98%
rename from 
inlong-audit/audit-service/src/main/java/elector/api/SelectorConfig.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
index d037985332..b4dd399021 100644
--- a/inlong-audit/audit-service/src/main/java/elector/api/SelectorConfig.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package elector.api;
+package org.apache.inlong.audit.selector.api;
 
 import lombok.Data;
 import org.apache.commons.lang3.StringUtils;
diff --git 
a/inlong-audit/audit-service/src/main/java/elector/api/SelectorFactory.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
similarity index 90%
rename from 
inlong-audit/audit-service/src/main/java/elector/api/SelectorFactory.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
index 42ed31a724..d7a38fbf39 100644
--- a/inlong-audit/audit-service/src/main/java/elector/api/SelectorFactory.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package elector.api;
+package org.apache.inlong.audit.selector.api;
 
-import elector.impl.SelectorImpl;
+import org.apache.inlong.audit.selector.impl.SelectorImpl;
 
 /**
  * Selector factory
diff --git 
a/inlong-audit/audit-service/src/main/java/elector/impl/DBDataSource.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
similarity index 88%
rename from 
inlong-audit/audit-service/src/main/java/elector/impl/DBDataSource.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
index 91eef3156f..2751321345 100644
--- a/inlong-audit/audit-service/src/main/java/elector/impl/DBDataSource.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
@@ -15,11 +15,14 @@
  * limitations under the License.
  */
 
-package elector.impl;
+package org.apache.inlong.audit.selector.impl;
+
+import org.apache.inlong.audit.config.ConfigConstants;
+import org.apache.inlong.audit.config.SqlConstants;
+import org.apache.inlong.audit.selector.api.SelectorConfig;
 
 import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.HikariDataSource;
-import elector.api.SelectorConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,28 +32,17 @@ import java.sql.ResultSet;
 import java.text.MessageFormat;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static config.ConfigConstants.CACHE_PREP_STMTS;
-import static config.ConfigConstants.MAX_INIT_COUNT;
-import static config.ConfigConstants.PREP_STMT_CACHE_SIZE;
-import static config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
-import static config.SqlConstants.IS_LEADER_SQL;
-import static config.SqlConstants.RELEASE_SQL;
-import static config.SqlConstants.REPLACE_LEADER_SQL;
-import static config.SqlConstants.SEARCH_CURRENT_LEADER_SQL;
-import static config.SqlConstants.SELECTOR_SQL;
-import static config.SqlConstants.SELECT_TEST_SQL;
-
 /**
  * DB data source
  */
 public class DBDataSource {
 
     private static final Logger logger = 
LoggerFactory.getLogger(DBDataSource.class);
-    private String selectorSql = SELECTOR_SQL;
-    private String replaceLeaderSql = REPLACE_LEADER_SQL;
-    private String reLeaseSql = RELEASE_SQL;
-    private String isLeaderSql = IS_LEADER_SQL;
-    private String searchCurrentLeaderSql = SEARCH_CURRENT_LEADER_SQL;
+    private String selectorSql = SqlConstants.SELECTOR_SQL;
+    private String replaceLeaderSql = SqlConstants.REPLACE_LEADER_SQL;
+    private String reLeaseSql = SqlConstants.RELEASE_SQL;
+    private String isLeaderSql = SqlConstants.IS_LEADER_SQL;
+    private String searchCurrentLeaderSql = 
SqlConstants.SEARCH_CURRENT_LEADER_SQL;
     private final SelectorConfig selectorConfig;
     private HikariDataSource datasource;
     public AtomicInteger getConnectionFailTimes;
@@ -90,7 +82,7 @@ public class DBDataSource {
         boolean initSucc = false;
         int initCount = 0;
 
-        while (!initSucc && initCount < MAX_INIT_COUNT) {
+        while (!initSucc && initCount < ConfigConstants.MAX_INIT_COUNT) {
             try {
                 ++initCount;
                 if (datasource == null || datasource.isClosed()) {
@@ -104,11 +96,12 @@ public class DBDataSource {
                     config.setAutoCommit(true);
                     config.setConnectionTimeout((long) 
selectorConfig.getConnectionTimeout());
                     config.setMaxLifetime((long) 
selectorConfig.getMaxLifetime());
-                    config.addDataSourceProperty(CACHE_PREP_STMTS, 
selectorConfig.getCachePrepStmts());
-                    config.addDataSourceProperty(PREP_STMT_CACHE_SIZE, 
selectorConfig.getPrepStmtCacheSize());
-                    config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
+                    
config.addDataSourceProperty(ConfigConstants.CACHE_PREP_STMTS, 
selectorConfig.getCachePrepStmts());
+                    
config.addDataSourceProperty(ConfigConstants.PREP_STMT_CACHE_SIZE,
+                            selectorConfig.getPrepStmtCacheSize());
+                    
config.addDataSourceProperty(ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT,
                             selectorConfig.getPrepStmtCacheSqlLimit());
-                    config.setConnectionTestQuery(SELECT_TEST_SQL);
+                    
config.setConnectionTestQuery(SqlConstants.SELECT_TEST_SQL);
                     datasource = new HikariDataSource(config);
                 }
 
diff --git 
a/inlong-audit/audit-service/src/main/java/elector/impl/SelectorImpl.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
similarity index 88%
rename from 
inlong-audit/audit-service/src/main/java/elector/impl/SelectorImpl.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
index b84051fccb..45352be5c2 100644
--- a/inlong-audit/audit-service/src/main/java/elector/impl/SelectorImpl.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
@@ -15,12 +15,14 @@
  * limitations under the License.
  */
 
-package elector.impl;
+package org.apache.inlong.audit.selector.impl;
+
+import org.apache.inlong.audit.config.ConfigConstants;
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.selector.api.Selector;
+import org.apache.inlong.audit.selector.api.SelectorConfig;
+import org.apache.inlong.audit.selector.task.DBMonitorTask;
 
-import config.Configuration;
-import elector.api.Selector;
-import elector.api.SelectorConfig;
-import elector.task.DBMonitorTask;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,12 +32,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import static config.ConfigConstants.DEFAULT_RELEASE_LEADER_INTERVAL;
-import static config.ConfigConstants.DEFAULT_SELECTOR_THREAD_POOL_SIZE;
-import static config.ConfigConstants.KEY_RELEASE_LEADER_INTERVAL;
-import static config.ConfigConstants.KEY_SELECTOR_THREAD_POOL_SIZE;
-import static config.ConfigConstants.RANDOM_BOUND;
-
 /**
  * Elector Impl
  */
@@ -53,8 +49,8 @@ public class SelectorImpl extends Selector {
         this.selectorConfig = selectorConfig;
         this.dbDataSource = new DBDataSource(selectorConfig);
         fixedThreadPool = 
Executors.newFixedThreadPool(Configuration.getInstance().get(
-                KEY_SELECTOR_THREAD_POOL_SIZE,
-                DEFAULT_SELECTOR_THREAD_POOL_SIZE));
+                ConfigConstants.KEY_SELECTOR_THREAD_POOL_SIZE,
+                ConfigConstants.DEFAULT_SELECTOR_THREAD_POOL_SIZE));
     }
 
     /**
@@ -97,8 +93,8 @@ public class SelectorImpl extends Selector {
             }
 
         try {
-            
TimeUnit.SECONDS.sleep(Configuration.getInstance().get(KEY_RELEASE_LEADER_INTERVAL,
-                    DEFAULT_RELEASE_LEADER_INTERVAL));
+            
TimeUnit.SECONDS.sleep(Configuration.getInstance().get(ConfigConstants.KEY_RELEASE_LEADER_INTERVAL,
+                    ConfigConstants.DEFAULT_RELEASE_LEADER_INTERVAL));
         } catch (Exception exception) {
             logger.error("Exception :{}", exception.getMessage());
         }
@@ -195,7 +191,7 @@ public class SelectorImpl extends Selector {
 
                         isLeader = false;
                         sleepTime = selectorConfig.getTryToBeLeaderInterval()
-                                + random.nextInt(RANDOM_BOUND);
+                                + random.nextInt(ConfigConstants.RANDOM_BOUND);
                     }
                 }
 
diff --git 
a/inlong-audit/audit-service/src/main/java/elector/task/DBMonitorTask.java 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
similarity index 92%
rename from 
inlong-audit/audit-service/src/main/java/elector/task/DBMonitorTask.java
rename to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
index 8ccf92ab3b..c1dc8e3d5c 100644
--- a/inlong-audit/audit-service/src/main/java/elector/task/DBMonitorTask.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package elector.task;
+package org.apache.inlong.audit.selector.task;
+
+import org.apache.inlong.audit.selector.api.SelectorConfig;
+import org.apache.inlong.audit.selector.impl.DBDataSource;
 
-import elector.api.SelectorConfig;
-import elector.impl.DBDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
new file mode 100644
index 0000000000..accd22ebbc
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.source;
+
+import org.apache.inlong.audit.channel.DataQueue;
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.SourceConfig;
+import org.apache.inlong.audit.entities.StartEndTime;
+import org.apache.inlong.audit.entities.StatData;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
+import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_AUDIT_IDS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_STAT_INTERVAL;
+import static org.apache.inlong.audit.config.ConfigConstants.KEY_AUDIT_IDS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_STAT_INTERVAL;
+import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
+import static org.apache.inlong.audit.entities.AuditCycle.DAY;
+import static org.apache.inlong.audit.entities.AuditCycle.HOUR;
+
+/**
+ * Jdbc source
+ */
+@Data
+public class JdbcSource {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
+    private final ConcurrentHashMap<Integer, ScheduledExecutorService> 
statTimers = new ConcurrentHashMap<>();
+    private DataQueue dataQueue;
+    private List<String> auditIds;
+    private int querySqlTimeout;
+    private DataSource dataSource;
+    private String querySql;
+    private SourceConfig sourceConfig;
+
+    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+    private static final int MAX_MINUTE = 60;
+
+    public JdbcSource(DataQueue dataQueue, SourceConfig sourceConfig) {
+        this.dataQueue = dataQueue;
+        this.sourceConfig = sourceConfig;
+        auditIds = 
Arrays.asList(Configuration.getInstance().get(KEY_AUDIT_IDS, 
DEFAULT_AUDIT_IDS).split(";"));
+    }
+
+    /**
+     * Init
+     */
+    public void init() {
+        createDataSource();
+    }
+
+    public void start() {
+        init();
+        int statInterval = 
Configuration.getInstance().get(KEY_SOURCE_DB_STAT_INTERVAL,
+                DEFAULT_SOURCE_DB_STAT_INTERVAL);
+        if (sourceConfig.getAuditCycle() == DAY) {
+            statInterval = HOUR.getValue();
+        }
+        for (int statBackTime = 1; statBackTime < 
sourceConfig.getStatBackTimes(); statBackTime++) {
+            ScheduledExecutorService timer =
+                    statTimers.computeIfAbsent(statBackTime, k -> 
Executors.newSingleThreadScheduledExecutor());
+            timer.scheduleWithFixedDelay(new StatServer(statBackTime),
+                    statBackTime,
+                    statInterval, TimeUnit.MINUTES);
+        }
+    }
+
+    ;
+
+    /**
+     * Get stat cycle of minute
+     *
+     * @param hoursAgo
+     * @param dataCycle
+     * @return
+     */
+    public List<StartEndTime> getStatCycleOfMinute(int hoursAgo, int 
dataCycle) {
+        List<StartEndTime> statCycleList = new LinkedList<>();
+        for (int minute = 0; minute < MAX_MINUTE; minute = minute + dataCycle) 
{
+            Calendar calendar = Calendar.getInstance();
+            calendar.add(Calendar.HOUR_OF_DAY, -hoursAgo);
+
+            calendar.set(Calendar.MINUTE, minute);
+            calendar.set(Calendar.SECOND, 0);
+            SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+            StartEndTime statCycle = new StartEndTime();
+            statCycle.setStartTime(dateFormat.format(calendar.getTime()));
+
+            calendar.set(Calendar.MINUTE, minute + dataCycle - 1);
+            calendar.set(Calendar.SECOND, 0);
+            statCycle.setEndTime(dateFormat.format(calendar.getTime()));
+            statCycleList.add(statCycle);
+        }
+        return statCycleList;
+    }
+
+    /**
+     * Get stat cycle of day
+     *
+     * @param daysAgo
+     * @return
+     */
+    public List<StartEndTime> getStatCycleOfDay(int daysAgo) {
+        StartEndTime statCycle = new StartEndTime();
+        Calendar calendar = Calendar.getInstance();
+        calendar.add(Calendar.DATE, -daysAgo);
+        calendar.set(Calendar.HOUR_OF_DAY, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+        statCycle.setStartTime(dateFormat.format(calendar.getTime()));
+
+        calendar.set(Calendar.HOUR_OF_DAY, 23);
+        calendar.set(Calendar.MINUTE, 59);
+        statCycle.setEndTime(dateFormat.format(calendar.getTime()));
+        return new ArrayList<StartEndTime>() {
+
+            {
+                add(statCycle);
+            }
+        };
+    }
+
+    /**
+     * Create data source
+     */
+    protected void createDataSource() {
+        HikariConfig config = new HikariConfig();
+        config.setDriverClassName(sourceConfig.getDriverClassName());
+        config.setJdbcUrl(sourceConfig.getJdbcUrl());
+        config.setUsername(sourceConfig.getUsername());
+        config.setPassword(sourceConfig.getPassword());
+        
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
+                DEFAULT_CONNECTION_TIMEOUT));
+        config.addDataSourceProperty(CACHE_PREP_STMTS,
+                Configuration.getInstance().get(KEY_CACHE_PREP_STMTS, 
DEFAULT_CACHE_PREP_STMTS));
+        config.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
+                Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE, 
DEFAULT_PREP_STMT_CACHE_SIZE));
+        config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
+                Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT, 
DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
+        config.setMaximumPoolSize(
+                Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE,
+                        DEFAULT_DATASOURCE_POOL_SIZE));
+        dataSource = new HikariDataSource(config);
+    }
+
+    /**
+     * Destory
+     */
+    public void destroy() {
+        for (Map.Entry<Integer, ScheduledExecutorService> timer : 
statTimers.entrySet()) {
+            timer.getValue().shutdown();
+        }
+    }
+
+    /**
+     * Stat server
+     */
+    class StatServer implements Runnable, AutoCloseable {
+
+        private final int statBackTimes;
+
+        public StatServer(int statBackTimes) {
+            this.statBackTimes = statBackTimes;
+        }
+
+        public void run() {
+            long currentTimestamp = System.currentTimeMillis();
+            LOG.info("Stat source data at {},stat back times:{}", 
currentTimestamp, statBackTimes);
+
+            statByStep();
+
+            long timeCost = System.currentTimeMillis() - currentTimestamp;
+            LOG.info("Stat source data cost time:{}ms,stat back times:{}", 
timeCost, statBackTimes);
+        }
+
+        /**
+         * Stat by step
+         */
+        public void statByStep() {
+            if (auditIds.isEmpty()) {
+                return;
+            }
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String auditId : auditIds) {
+                CompletableFuture<Void> future = CompletableFuture.runAsync(() 
-> {
+                    aggregate(auditId);
+                });
+                futures.add(future);
+            }
+            CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[futures.size()])).join();
+        }
+
+        /**
+         * Aggregate
+         *
+         * @param auditId
+         */
+        public void aggregate(String auditId) {
+            List<StartEndTime> statCycleList = sourceConfig.getAuditCycle() == 
DAY ? getStatCycleOfDay(statBackTimes)
+                    : getStatCycleOfMinute(statBackTimes, 
sourceConfig.getAuditCycle().getValue());
+            for (StartEndTime statCycle : statCycleList) {
+                long currentTimestamp = System.currentTimeMillis();
+                query(statCycle.getStartTime(), statCycle.getEndTime(), 
auditId);
+                long timeCost = System.currentTimeMillis() - currentTimestamp;
+                LOG.info("[{}]-[{}],{},stat back times:{},audit 
id:{},cost:{}ms",
+                        statCycle.getStartTime(), statCycle.getEndTime(),
+                        sourceConfig.getAuditCycle(),
+                        statBackTimes, auditId, timeCost);
+            }
+        }
+
+        /**
+         * Query
+         *
+         * @param startTime
+         * @param endTime
+         * @param auditId
+         */
+        public void query(String startTime, String endTime, String auditId) {
+            try (Connection connection = dataSource.getConnection();
+                    PreparedStatement pstat = 
connection.prepareStatement(sourceConfig.getQuerySql())) {
+                if (connection.isClosed()) {
+                    createDataSource();
+                }
+                pstat.setString(1, startTime);
+                pstat.setString(2, endTime);
+                pstat.setString(3, auditId);
+                try (ResultSet resultSet = pstat.executeQuery()) {
+                    while (resultSet.next()) {
+                        String inlongGroupID = resultSet.getString(1);
+                        String InlongStreamID = resultSet.getString(2);
+                        String AuditId = resultSet.getString(3);
+                        String AuditTag = resultSet.getString(4);
+                        long count = resultSet.getLong(5);
+                        long size = resultSet.getLong(6);
+                        long delay = resultSet.getLong(7);
+                        StatData data = new StatData();
+                        data.setLogTs(startTime);
+                        data.setInlongGroupId(inlongGroupID);
+                        data.setInlongStreamId(InlongStreamID);
+                        data.setAuditId(AuditId);
+                        data.setAuditTag(AuditTag);
+                        data.setCount(count);
+                        data.setSize(size);
+                        data.setDelay(delay);
+                        dataQueue.push(data);
+                    }
+                } catch (SQLException sqlException) {
+                    LOG.error("Query has SQL exception! ", sqlException);
+                }
+            } catch (Exception exception) {
+                LOG.error("Query has exception! ", exception);
+            }
+        }
+
+        @Override
+        public void close() throws Exception {
+
+        }
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/source/AbstractSource.java 
b/inlong-audit/audit-service/src/main/java/source/AbstractSource.java
deleted file mode 100644
index b2c8cc8f2e..0000000000
--- a/inlong-audit/audit-service/src/main/java/source/AbstractSource.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package source;
-
-import channel.DataQueue;
-import entities.SourceConfig;
-import entities.StartEndTime;
-import lombok.Data;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.sql.DataSource;
-
-import java.sql.SQLException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-
-/**
- * Abstract source
- */
-@Data
-public class AbstractSource {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractSource.class);
-    protected final ConcurrentHashMap<Integer, ScheduledExecutorService> 
statTimers = new ConcurrentHashMap<>();
-    protected DataQueue dataQueue;
-    protected List<String> auditIds;
-    protected int querySqlTimeout;
-    protected DataSource dataSource;
-    protected String querySql;
-    protected SourceConfig sourceConfig;
-
-    protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
-
-    public AbstractSource(DataQueue dataQueue) {
-        this.dataQueue = dataQueue;
-    }
-
-    /**
-     * Get stat cycle of minute
-     *
-     * @param beforeHour
-     * @param dataCycle
-     * @return
-     */
-    public List<StartEndTime> getStatCycleMinute(int beforeHour, int 
dataCycle) {
-        List<StartEndTime> statCycleList = new LinkedList<>();
-        for (int step = 0; step < 60; step = step + dataCycle) {
-            Calendar calendar = Calendar.getInstance();
-            calendar.add(Calendar.HOUR_OF_DAY, -beforeHour);
-
-            calendar.set(Calendar.MINUTE, step);
-            calendar.set(Calendar.SECOND, 0);
-            SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
-            StartEndTime statCycle = new StartEndTime();
-            statCycle.setStartTime(dateFormat.format(calendar.getTime()));
-
-            calendar.set(Calendar.MINUTE, step + dataCycle - 1);
-            calendar.set(Calendar.SECOND, 0);
-            statCycle.setEndTime(dateFormat.format(calendar.getTime()));
-            statCycleList.add(statCycle);
-        }
-        return statCycleList;
-    }
-
-    /**
-     * Get stat cycle of day
-     *
-     * @param beforeDay
-     * @return
-     */
-    public List<StartEndTime> getStatCycleDay(int beforeDay) {
-        StartEndTime statCycle = new StartEndTime();
-        Calendar calendar = Calendar.getInstance();
-        calendar.add(Calendar.DATE, -beforeDay);
-        calendar.set(Calendar.HOUR_OF_DAY, 0);
-        calendar.set(Calendar.MINUTE, 0);
-        calendar.set(Calendar.SECOND, 0);
-        calendar.set(Calendar.MILLISECOND, 0);
-
-        SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
-        statCycle.setStartTime(dateFormat.format(calendar.getTime()));
-
-        calendar.set(Calendar.HOUR_OF_DAY, 23);
-        calendar.set(Calendar.MINUTE, 59);
-        statCycle.setEndTime(dateFormat.format(calendar.getTime()));
-        return new ArrayList<StartEndTime>() {
-
-            {
-                add(statCycle);
-            }
-        };
-    }
-
-    /**
-     * Destory
-     */
-    public void destory() {
-        try {
-            dataSource.getConnection().close();
-        } catch (SQLException exception) {
-            LOG.error(exception.getMessage());
-        }
-    }
-}
diff --git a/inlong-audit/audit-service/src/main/java/source/SourceStat.java 
b/inlong-audit/audit-service/src/main/java/source/SourceStat.java
deleted file mode 100644
index 4477d25532..0000000000
--- a/inlong-audit/audit-service/src/main/java/source/SourceStat.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package source;
-
-/**
- * Source stat interface.
- */
-public interface SourceStat extends Runnable {
-
-    public void statByStep();
-
-    public void aggregate(String auditId);
-
-    public void query(String startTime, String endTime, String auditId);
-}
diff --git a/inlong-audit/pom.xml b/inlong-audit/pom.xml
index ea25c3daaf..54ebd8fcc2 100644
--- a/inlong-audit/pom.xml
+++ b/inlong-audit/pom.xml
@@ -84,6 +84,12 @@
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.zaxxer</groupId>
+            <artifactId>HikariCP</artifactId>
+            <version>${HikariCP.version}</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
 </project>

Reply via email to