This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new da4c77e2cd [INLONG-9920][Audit] Audit-service add codes of source
(#9941)
da4c77e2cd is described below
commit da4c77e2cd6ccd80949139c2ba59a08281a3d45e
Author: doleyzi <[email protected]>
AuthorDate: Tue Apr 9 14:08:21 2024 +0800
[INLONG-9920][Audit] Audit-service add codes of source (#9941)
---
.../src/main/java/config/SqlConstants.java | 37 ---
.../apache/inlong/audit}/channel/DataQueue.java | 5 +-
.../inlong/audit}/config/ConfigConstants.java | 11 +-
.../apache/inlong/audit}/config/Configuration.java | 7 +-
.../apache/inlong/audit/config/SqlConstants.java | 80 ++++++
.../apache/inlong/audit}/entities/AuditCycle.java | 2 +-
.../apache/inlong/audit}/entities/Request.java | 2 +-
.../inlong/audit}/entities/SourceConfig.java | 26 +-
.../inlong/audit}/entities/StartEndTime.java | 2 +-
.../apache/inlong/audit}/entities/StatData.java | 2 +-
.../audit/selector}/ElectorChangeListenerImpl.java | 5 +-
.../inlong/audit/selector}/api/Selector.java | 2 +-
.../selector}/api/SelectorChangeListener.java | 2 +-
.../inlong/audit/selector}/api/SelectorConfig.java | 2 +-
.../audit/selector}/api/SelectorFactory.java | 4 +-
.../inlong/audit/selector}/impl/DBDataSource.java | 39 ++-
.../inlong/audit/selector}/impl/SelectorImpl.java | 28 +-
.../inlong/audit/selector}/task/DBMonitorTask.java | 7 +-
.../org/apache/inlong/audit/source/JdbcSource.java | 314 +++++++++++++++++++++
.../src/main/java/source/AbstractSource.java | 125 --------
.../src/main/java/source/SourceStat.java | 30 --
inlong-audit/pom.xml | 6 +
22 files changed, 483 insertions(+), 255 deletions(-)
diff --git a/inlong-audit/audit-service/src/main/java/config/SqlConstants.java
b/inlong-audit/audit-service/src/main/java/config/SqlConstants.java
deleted file mode 100644
index a7164f2bdf..0000000000
--- a/inlong-audit/audit-service/src/main/java/config/SqlConstants.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package config;
-
-/**
- * Sql constants
- */
-public class SqlConstants {
-
- // HA selector sql
- public static final String SELECTOR_SQL =
- "insert ignore into {0} (service_id, leader_id, last_seen_active)
values (''{1}'', ''{2}'', now()) on duplicate key update leader_id =
if(last_seen_active < now() - interval # second, values(leader_id),
leader_id),last_seen_active = if(leader_id = values(leader_id),
values(last_seen_active), last_seen_active)";
- public static final String REPLACE_LEADER_SQL =
- "replace into {0} ( service_id, leader_id, last_seen_active )
values (''{1}'', ''#'', now())";
- public static final String RELEASE_SQL = "delete from {0} where
service_id=''{1}'' and leader_id= ''{2}''";
- public static final String IS_LEADER_SQL =
- "select count(*) as is_leader from {0} where service_id=''{1}''
and leader_id=''{2}''";
- public static final String SEARCH_CURRENT_LEADER_SQL =
- "select leader_id as leader from {0} where service_id=''{1}''";
- public static final String SELECT_TEST_SQL = "SELECT 1 ";
-
-}
diff --git a/inlong-audit/audit-service/src/main/java/channel/DataQueue.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
similarity index 95%
rename from inlong-audit/audit-service/src/main/java/channel/DataQueue.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
index 46b3d9068f..3169e9d377 100644
--- a/inlong-audit/audit-service/src/main/java/channel/DataQueue.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package channel;
+package org.apache.inlong.audit.channel;
+
+import org.apache.inlong.audit.entities.StatData;
-import entities.StatData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
similarity index 93%
rename from inlong-audit/audit-service/src/main/java/config/ConfigConstants.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
index 42ef2d7046..bc4f0f9326 100644
--- a/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package config;
+package org.apache.inlong.audit.config;
/**
* Config constants
@@ -112,6 +112,15 @@ public class ConfigConstants {
public static final String PREP_STMT_CACHE_SIZE = "prepStmtCacheSize";
public static final String PREP_STMT_CACHE_SQL_LIMIT =
"prepStmtCacheSqlLimit";
+ public static final String KEY_CACHE_PREP_STMTS = "cache.prep.stmts";
+ public static final boolean DEFAULT_CACHE_PREP_STMTS = true;
+
+ public static final String KEY_PREP_STMT_CACHE_SIZE =
"prep.stmt.cache.size";
+ public static final int DEFAULT_PREP_STMT_CACHE_SIZE = 250;
+
+ public static final String KEY_PREP_STMT_CACHE_SQL_LIMIT =
"prep.stmt.cache.sql.limit";
+ public static final int DEFAULT_PREP_STMT_CACHE_SQL_LIMIT = 2048;
+
public static final int MAX_INIT_COUNT = 2;
public static final int RANDOM_BOUND = 10;
}
diff --git a/inlong-audit/audit-service/src/main/java/config/Configuration.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
similarity index 92%
rename from inlong-audit/audit-service/src/main/java/config/Configuration.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
index 56a50f8266..c04aaf6aac 100644
--- a/inlong-audit/audit-service/src/main/java/config/Configuration.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package config;
+package org.apache.inlong.audit.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +72,11 @@ public class Configuration {
return value == null ? defaultValue : value.toString();
}
+ public boolean get(String key, boolean defaultValue) {
+ Object value = properties.get(key);
+ return value == null ? defaultValue : (Boolean) value;
+ }
+
/**
* @param key
* @param defaultValue
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
new file mode 100644
index 0000000000..a64cd2234d
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.config;
+
+/**
+ * Sql constants
+ */
+public class SqlConstants {
+
+ // HA selector sql
+ public static final String SELECTOR_SQL =
+ "insert ignore into {0} (service_id, leader_id, last_seen_active)
values (''{1}'', ''{2}'', now()) on duplicate key update leader_id =
if(last_seen_active < now() - interval # second, values(leader_id),
leader_id),last_seen_active = if(leader_id = values(leader_id),
values(last_seen_active), last_seen_active)";
+ public static final String REPLACE_LEADER_SQL =
+ "replace into {0} ( service_id, leader_id, last_seen_active )
values (''{1}'', ''#'', now())";
+ public static final String RELEASE_SQL = "delete from {0} where
service_id=''{1}'' and leader_id= ''{2}''";
+ public static final String IS_LEADER_SQL =
+ "select count(*) as is_leader from {0} where service_id=''{1}''
and leader_id=''{2}''";
+ public static final String SEARCH_CURRENT_LEADER_SQL =
+ "select leader_id as leader from {0} where service_id=''{1}''";
+ public static final String SELECT_TEST_SQL = "SELECT 1 ";
+
+ // ClickHouse query sql
+ public static final String KEY_CLICKHOUSE_SOURCE_QUERY_SQL =
"clickhouse.source.query.sql";
+ public static final String DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL =
+ "SELECT inlong_group_id, inlong_stream_id, audit_id, audit_tag\n" +
+ " , sum(cnt) AS cnt, sum(size) AS size\n" +
+ " , sum(delay) AS delay\n" +
+ "FROM (\n" +
+ " SELECT max(audit_version), ip, docker_id,
thread_id\n" +
+ " , inlong_group_id, inlong_stream_id, audit_id,
audit_tag, cnt\n" +
+ " , size, delay\n" +
+ " FROM (\n" +
+ " SELECT audit_version, ip, docker_id, thread_id,
inlong_group_id\n" +
+ " , inlong_stream_id, audit_id, audit_tag,
sum(count) AS cnt\n" +
+ " , sum(size) AS size, sum(delay) AS delay\n" +
+ " FROM (\n" +
+ " SELECT audit_version, docker_id, thread_id,
sdk_ts, packet_id\n" +
+ " , log_ts, ip, inlong_group_id,
inlong_stream_id, audit_id\n" +
+ " , audit_tag, count, size, delay\n" +
+ " FROM audit_data \n" +
+ " WHERE log_ts BETWEEN ? AND ? \n" +
+ " AND audit_id = ? \n" +
+ " GROUP BY audit_version, docker_id, thread_id,
sdk_ts, packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id,
audit_tag, count, size, delay\n"
+ +
+ " ) t1\n" +
+ " GROUP BY audit_version, ip, docker_id, thread_id,
inlong_group_id, inlong_stream_id, audit_id, audit_tag\n"
+ +
+ " ) t2\n" +
+ " GROUP BY ip, docker_id, thread_id, inlong_group_id,
inlong_stream_id, audit_id, audit_tag, cnt, size, delay\n"
+ +
+ ") t3\n" +
+ "GROUP BY inlong_group_id, inlong_stream_id, audit_id,
audit_tag";
+
+ // Mysql query sql
+ public static final String KEY_MYSQL_SOURCE_QUERY_TEMP_SQL =
"mysql.query.temp.sql";
+ public static final String DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL =
+ "SELECT inlong_group_id, inlong_stream_id, audit_id, audit_tag\n" +
+ ", sum(count) AS cnt, sum(size) AS size\n" +
+ ", sum(delay) AS delay\n" +
+ "FROM audit_data_temp\n" +
+ "WHERE log_ts BETWEEN ? AND ? \n" +
+ "AND audit_id = ? \n" +
+ "GROUP BY inlong_group_id, inlong_stream_id, audit_id,
audit_tag";
+
+}
diff --git a/inlong-audit/audit-service/src/main/java/entities/AuditCycle.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/AuditCycle.java
similarity index 96%
rename from inlong-audit/audit-service/src/main/java/entities/AuditCycle.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/AuditCycle.java
index 0b0bb82ce5..8420bdcaeb 100644
--- a/inlong-audit/audit-service/src/main/java/entities/AuditCycle.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/AuditCycle.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package entities;
+package org.apache.inlong.audit.entities;
/**
* Audit cycle
diff --git a/inlong-audit/audit-service/src/main/java/entities/Request.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/Request.java
similarity index 95%
rename from inlong-audit/audit-service/src/main/java/entities/Request.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/Request.java
index 0e9ec8a4da..67954f1c72 100644
--- a/inlong-audit/audit-service/src/main/java/entities/Request.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/Request.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package entities;
+package org.apache.inlong.audit.entities;
import lombok.Data;
diff --git
a/inlong-audit/audit-service/src/main/java/entities/SourceConfig.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
similarity index 57%
rename from inlong-audit/audit-service/src/main/java/entities/SourceConfig.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
index f26ea02566..428ca0389f 100644
--- a/inlong-audit/audit-service/src/main/java/entities/SourceConfig.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package entities;
+package org.apache.inlong.audit.entities;
import lombok.Data;
@@ -26,12 +26,26 @@ import lombok.Data;
public class SourceConfig {
private AuditCycle auditCycle;
- private String sourceTable;
- private int beforeTimes;
+ private String querySql;
+ private int statBackTimes;
+ private final String driverClassName;
+ private final String jdbcUrl;
+ private final String username;
+ private final String password;
- public SourceConfig(AuditCycle auditCycle, String sourceTable, int
beforeTimes) {
+ public SourceConfig(AuditCycle auditCycle,
+ String querySql,
+ int statBackTimes,
+ String driverClassName,
+ String jdbcUrl,
+ String username,
+ String password) {
this.auditCycle = auditCycle;
- this.sourceTable = sourceTable;
- this.beforeTimes = beforeTimes;
+ this.querySql = querySql;
+ this.statBackTimes = statBackTimes;
+ this.driverClassName = driverClassName;
+ this.jdbcUrl = jdbcUrl;
+ this.username = username;
+ this.password = password;
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/entities/StartEndTime.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StartEndTime.java
similarity index 95%
rename from inlong-audit/audit-service/src/main/java/entities/StartEndTime.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StartEndTime.java
index 7b7944320e..d7bba55679 100644
--- a/inlong-audit/audit-service/src/main/java/entities/StartEndTime.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StartEndTime.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package entities;
+package org.apache.inlong.audit.entities;
import lombok.Data;
diff --git a/inlong-audit/audit-service/src/main/java/entities/StatData.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
similarity index 96%
rename from inlong-audit/audit-service/src/main/java/entities/StatData.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
index aefc828efe..b5f450b27d 100644
--- a/inlong-audit/audit-service/src/main/java/entities/StatData.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package entities;
+package org.apache.inlong.audit.entities;
import lombok.Data;
diff --git
a/inlong-audit/audit-service/src/main/java/elector/ElectorChangeListenerImpl.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/ElectorChangeListenerImpl.java
similarity index 91%
rename from
inlong-audit/audit-service/src/main/java/elector/ElectorChangeListenerImpl.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/ElectorChangeListenerImpl.java
index 9017caacff..d579250975 100644
---
a/inlong-audit/audit-service/src/main/java/elector/ElectorChangeListenerImpl.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/ElectorChangeListenerImpl.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package elector;
+package org.apache.inlong.audit.selector;
+
+import org.apache.inlong.audit.selector.api.SelectorChangeListener;
-import elector.api.SelectorChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/inlong-audit/audit-service/src/main/java/elector/api/Selector.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/Selector.java
similarity index 96%
rename from inlong-audit/audit-service/src/main/java/elector/api/Selector.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/Selector.java
index 5e9e564762..b9a66c7b96 100644
--- a/inlong-audit/audit-service/src/main/java/elector/api/Selector.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/Selector.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package elector.api;
+package org.apache.inlong.audit.selector.api;
/**
* Selector
diff --git
a/inlong-audit/audit-service/src/main/java/elector/api/SelectorChangeListener.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
similarity index 95%
rename from
inlong-audit/audit-service/src/main/java/elector/api/SelectorChangeListener.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
index a4af6a9598..f914269150 100644
---
a/inlong-audit/audit-service/src/main/java/elector/api/SelectorChangeListener.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package elector.api;
+package org.apache.inlong.audit.selector.api;
/**
* Selector change listener
diff --git
a/inlong-audit/audit-service/src/main/java/elector/api/SelectorConfig.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
similarity index 98%
rename from
inlong-audit/audit-service/src/main/java/elector/api/SelectorConfig.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
index d037985332..b4dd399021 100644
--- a/inlong-audit/audit-service/src/main/java/elector/api/SelectorConfig.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package elector.api;
+package org.apache.inlong.audit.selector.api;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
diff --git
a/inlong-audit/audit-service/src/main/java/elector/api/SelectorFactory.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
similarity index 90%
rename from
inlong-audit/audit-service/src/main/java/elector/api/SelectorFactory.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
index 42ed31a724..d7a38fbf39 100644
--- a/inlong-audit/audit-service/src/main/java/elector/api/SelectorFactory.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package elector.api;
+package org.apache.inlong.audit.selector.api;
-import elector.impl.SelectorImpl;
+import org.apache.inlong.audit.selector.impl.SelectorImpl;
/**
* Selector factory
diff --git
a/inlong-audit/audit-service/src/main/java/elector/impl/DBDataSource.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
similarity index 88%
rename from
inlong-audit/audit-service/src/main/java/elector/impl/DBDataSource.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
index 91eef3156f..2751321345 100644
--- a/inlong-audit/audit-service/src/main/java/elector/impl/DBDataSource.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
@@ -15,11 +15,14 @@
* limitations under the License.
*/
-package elector.impl;
+package org.apache.inlong.audit.selector.impl;
+
+import org.apache.inlong.audit.config.ConfigConstants;
+import org.apache.inlong.audit.config.SqlConstants;
+import org.apache.inlong.audit.selector.api.SelectorConfig;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
-import elector.api.SelectorConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,28 +32,17 @@ import java.sql.ResultSet;
import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicInteger;
-import static config.ConfigConstants.CACHE_PREP_STMTS;
-import static config.ConfigConstants.MAX_INIT_COUNT;
-import static config.ConfigConstants.PREP_STMT_CACHE_SIZE;
-import static config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
-import static config.SqlConstants.IS_LEADER_SQL;
-import static config.SqlConstants.RELEASE_SQL;
-import static config.SqlConstants.REPLACE_LEADER_SQL;
-import static config.SqlConstants.SEARCH_CURRENT_LEADER_SQL;
-import static config.SqlConstants.SELECTOR_SQL;
-import static config.SqlConstants.SELECT_TEST_SQL;
-
/**
* DB data source
*/
public class DBDataSource {
private static final Logger logger =
LoggerFactory.getLogger(DBDataSource.class);
- private String selectorSql = SELECTOR_SQL;
- private String replaceLeaderSql = REPLACE_LEADER_SQL;
- private String reLeaseSql = RELEASE_SQL;
- private String isLeaderSql = IS_LEADER_SQL;
- private String searchCurrentLeaderSql = SEARCH_CURRENT_LEADER_SQL;
+ private String selectorSql = SqlConstants.SELECTOR_SQL;
+ private String replaceLeaderSql = SqlConstants.REPLACE_LEADER_SQL;
+ private String reLeaseSql = SqlConstants.RELEASE_SQL;
+ private String isLeaderSql = SqlConstants.IS_LEADER_SQL;
+ private String searchCurrentLeaderSql =
SqlConstants.SEARCH_CURRENT_LEADER_SQL;
private final SelectorConfig selectorConfig;
private HikariDataSource datasource;
public AtomicInteger getConnectionFailTimes;
@@ -90,7 +82,7 @@ public class DBDataSource {
boolean initSucc = false;
int initCount = 0;
- while (!initSucc && initCount < MAX_INIT_COUNT) {
+ while (!initSucc && initCount < ConfigConstants.MAX_INIT_COUNT) {
try {
++initCount;
if (datasource == null || datasource.isClosed()) {
@@ -104,11 +96,12 @@ public class DBDataSource {
config.setAutoCommit(true);
config.setConnectionTimeout((long)
selectorConfig.getConnectionTimeout());
config.setMaxLifetime((long)
selectorConfig.getMaxLifetime());
- config.addDataSourceProperty(CACHE_PREP_STMTS,
selectorConfig.getCachePrepStmts());
- config.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
selectorConfig.getPrepStmtCacheSize());
- config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
+
config.addDataSourceProperty(ConfigConstants.CACHE_PREP_STMTS,
selectorConfig.getCachePrepStmts());
+
config.addDataSourceProperty(ConfigConstants.PREP_STMT_CACHE_SIZE,
+ selectorConfig.getPrepStmtCacheSize());
+
config.addDataSourceProperty(ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT,
selectorConfig.getPrepStmtCacheSqlLimit());
- config.setConnectionTestQuery(SELECT_TEST_SQL);
+
config.setConnectionTestQuery(SqlConstants.SELECT_TEST_SQL);
datasource = new HikariDataSource(config);
}
diff --git
a/inlong-audit/audit-service/src/main/java/elector/impl/SelectorImpl.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
similarity index 88%
rename from
inlong-audit/audit-service/src/main/java/elector/impl/SelectorImpl.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
index b84051fccb..45352be5c2 100644
--- a/inlong-audit/audit-service/src/main/java/elector/impl/SelectorImpl.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
@@ -15,12 +15,14 @@
* limitations under the License.
*/
-package elector.impl;
+package org.apache.inlong.audit.selector.impl;
+
+import org.apache.inlong.audit.config.ConfigConstants;
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.selector.api.Selector;
+import org.apache.inlong.audit.selector.api.SelectorConfig;
+import org.apache.inlong.audit.selector.task.DBMonitorTask;
-import config.Configuration;
-import elector.api.Selector;
-import elector.api.SelectorConfig;
-import elector.task.DBMonitorTask;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,12 +32,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import static config.ConfigConstants.DEFAULT_RELEASE_LEADER_INTERVAL;
-import static config.ConfigConstants.DEFAULT_SELECTOR_THREAD_POOL_SIZE;
-import static config.ConfigConstants.KEY_RELEASE_LEADER_INTERVAL;
-import static config.ConfigConstants.KEY_SELECTOR_THREAD_POOL_SIZE;
-import static config.ConfigConstants.RANDOM_BOUND;
-
/**
* Elector Impl
*/
@@ -53,8 +49,8 @@ public class SelectorImpl extends Selector {
this.selectorConfig = selectorConfig;
this.dbDataSource = new DBDataSource(selectorConfig);
fixedThreadPool =
Executors.newFixedThreadPool(Configuration.getInstance().get(
- KEY_SELECTOR_THREAD_POOL_SIZE,
- DEFAULT_SELECTOR_THREAD_POOL_SIZE));
+ ConfigConstants.KEY_SELECTOR_THREAD_POOL_SIZE,
+ ConfigConstants.DEFAULT_SELECTOR_THREAD_POOL_SIZE));
}
/**
@@ -97,8 +93,8 @@ public class SelectorImpl extends Selector {
}
try {
-
TimeUnit.SECONDS.sleep(Configuration.getInstance().get(KEY_RELEASE_LEADER_INTERVAL,
- DEFAULT_RELEASE_LEADER_INTERVAL));
+
TimeUnit.SECONDS.sleep(Configuration.getInstance().get(ConfigConstants.KEY_RELEASE_LEADER_INTERVAL,
+ ConfigConstants.DEFAULT_RELEASE_LEADER_INTERVAL));
} catch (Exception exception) {
logger.error("Exception :{}", exception.getMessage());
}
@@ -195,7 +191,7 @@ public class SelectorImpl extends Selector {
isLeader = false;
sleepTime = selectorConfig.getTryToBeLeaderInterval()
- + random.nextInt(RANDOM_BOUND);
+ + random.nextInt(ConfigConstants.RANDOM_BOUND);
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/elector/task/DBMonitorTask.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
similarity index 92%
rename from
inlong-audit/audit-service/src/main/java/elector/task/DBMonitorTask.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
index 8ccf92ab3b..c1dc8e3d5c 100644
--- a/inlong-audit/audit-service/src/main/java/elector/task/DBMonitorTask.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-package elector.task;
+package org.apache.inlong.audit.selector.task;
+
+import org.apache.inlong.audit.selector.api.SelectorConfig;
+import org.apache.inlong.audit.selector.impl.DBDataSource;
-import elector.api.SelectorConfig;
-import elector.impl.DBDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
new file mode 100644
index 0000000000..accd22ebbc
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.source;
+
+import org.apache.inlong.audit.channel.DataQueue;
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.SourceConfig;
+import org.apache.inlong.audit.entities.StartEndTime;
+import org.apache.inlong.audit.entities.StatData;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
+import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_AUDIT_IDS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_STAT_INTERVAL;
+import static org.apache.inlong.audit.config.ConfigConstants.KEY_AUDIT_IDS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_STAT_INTERVAL;
+import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
+import static org.apache.inlong.audit.entities.AuditCycle.DAY;
+import static org.apache.inlong.audit.entities.AuditCycle.HOUR;
+
+/**
+ * Jdbc source
+ */
+@Data
+public class JdbcSource {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcSource.class);
+ private final ConcurrentHashMap<Integer, ScheduledExecutorService>
statTimers = new ConcurrentHashMap<>();
+ private DataQueue dataQueue;
+ private List<String> auditIds;
+ private int querySqlTimeout;
+ private DataSource dataSource;
+ private String querySql;
+ private SourceConfig sourceConfig;
+
+ private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+ private static final int MAX_MINUTE = 60;
+
+ public JdbcSource(DataQueue dataQueue, SourceConfig sourceConfig) {
+ this.dataQueue = dataQueue;
+ this.sourceConfig = sourceConfig;
+ auditIds =
Arrays.asList(Configuration.getInstance().get(KEY_AUDIT_IDS,
DEFAULT_AUDIT_IDS).split(";"));
+ }
+
+ /**
+ * Init
+ */
+ public void init() {
+ createDataSource();
+ }
+
+ public void start() {
+ init();
+ int statInterval =
Configuration.getInstance().get(KEY_SOURCE_DB_STAT_INTERVAL,
+ DEFAULT_SOURCE_DB_STAT_INTERVAL);
+ if (sourceConfig.getAuditCycle() == DAY) {
+ statInterval = HOUR.getValue();
+ }
+ for (int statBackTime = 1; statBackTime <
sourceConfig.getStatBackTimes(); statBackTime++) {
+ ScheduledExecutorService timer =
+ statTimers.computeIfAbsent(statBackTime, k ->
Executors.newSingleThreadScheduledExecutor());
+ timer.scheduleWithFixedDelay(new StatServer(statBackTime),
+ statBackTime,
+ statInterval, TimeUnit.MINUTES);
+ }
+ }
+
+ ;
+
+ /**
+ * Get stat cycle of minute
+ *
+ * @param hoursAgo
+ * @param dataCycle
+ * @return
+ */
+ public List<StartEndTime> getStatCycleOfMinute(int hoursAgo, int
dataCycle) {
+ List<StartEndTime> statCycleList = new LinkedList<>();
+ for (int minute = 0; minute < MAX_MINUTE; minute = minute + dataCycle)
{
+ Calendar calendar = Calendar.getInstance();
+ calendar.add(Calendar.HOUR_OF_DAY, -hoursAgo);
+
+ calendar.set(Calendar.MINUTE, minute);
+ calendar.set(Calendar.SECOND, 0);
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ StartEndTime statCycle = new StartEndTime();
+ statCycle.setStartTime(dateFormat.format(calendar.getTime()));
+
+ calendar.set(Calendar.MINUTE, minute + dataCycle - 1);
+ calendar.set(Calendar.SECOND, 0);
+ statCycle.setEndTime(dateFormat.format(calendar.getTime()));
+ statCycleList.add(statCycle);
+ }
+ return statCycleList;
+ }
+
+ /**
+ * Get stat cycle of day
+ *
+ * @param daysAgo
+ * @return
+ */
+ public List<StartEndTime> getStatCycleOfDay(int daysAgo) {
+ StartEndTime statCycle = new StartEndTime();
+ Calendar calendar = Calendar.getInstance();
+ calendar.add(Calendar.DATE, -daysAgo);
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ statCycle.setStartTime(dateFormat.format(calendar.getTime()));
+
+ calendar.set(Calendar.HOUR_OF_DAY, 23);
+ calendar.set(Calendar.MINUTE, 59);
+ statCycle.setEndTime(dateFormat.format(calendar.getTime()));
+ return new ArrayList<StartEndTime>() {
+
+ {
+ add(statCycle);
+ }
+ };
+ }
+
+ /**
+ * Create data source
+ */
+ protected void createDataSource() {
+ HikariConfig config = new HikariConfig();
+ config.setDriverClassName(sourceConfig.getDriverClassName());
+ config.setJdbcUrl(sourceConfig.getJdbcUrl());
+ config.setUsername(sourceConfig.getUsername());
+ config.setPassword(sourceConfig.getPassword());
+
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
+ DEFAULT_CONNECTION_TIMEOUT));
+ config.addDataSourceProperty(CACHE_PREP_STMTS,
+ Configuration.getInstance().get(KEY_CACHE_PREP_STMTS,
DEFAULT_CACHE_PREP_STMTS));
+ config.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
+ Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE,
DEFAULT_PREP_STMT_CACHE_SIZE));
+ config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
+ Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT,
DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
+ config.setMaximumPoolSize(
+ Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE,
+ DEFAULT_DATASOURCE_POOL_SIZE));
+ dataSource = new HikariDataSource(config);
+ }
+
+ /**
+ * Destory
+ */
+ public void destroy() {
+ for (Map.Entry<Integer, ScheduledExecutorService> timer :
statTimers.entrySet()) {
+ timer.getValue().shutdown();
+ }
+ }
+
+ /**
+ * Stat server
+ */
+ class StatServer implements Runnable, AutoCloseable {
+
+ private final int statBackTimes;
+
+ public StatServer(int statBackTimes) {
+ this.statBackTimes = statBackTimes;
+ }
+
+ public void run() {
+ long currentTimestamp = System.currentTimeMillis();
+ LOG.info("Stat source data at {},stat back times:{}",
currentTimestamp, statBackTimes);
+
+ statByStep();
+
+ long timeCost = System.currentTimeMillis() - currentTimestamp;
+ LOG.info("Stat source data cost time:{}ms,stat back times:{}",
timeCost, statBackTimes);
+ }
+
+ /**
+ * Stat by step
+ */
+ public void statByStep() {
+ if (auditIds.isEmpty()) {
+ return;
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (String auditId : auditIds) {
+ CompletableFuture<Void> future = CompletableFuture.runAsync(()
-> {
+ aggregate(auditId);
+ });
+ futures.add(future);
+ }
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[futures.size()])).join();
+ }
+
+ /**
+ * Aggregate
+ *
+ * @param auditId
+ */
+ public void aggregate(String auditId) {
+ List<StartEndTime> statCycleList = sourceConfig.getAuditCycle() ==
DAY ? getStatCycleOfDay(statBackTimes)
+ : getStatCycleOfMinute(statBackTimes,
sourceConfig.getAuditCycle().getValue());
+ for (StartEndTime statCycle : statCycleList) {
+ long currentTimestamp = System.currentTimeMillis();
+ query(statCycle.getStartTime(), statCycle.getEndTime(),
auditId);
+ long timeCost = System.currentTimeMillis() - currentTimestamp;
+ LOG.info("[{}]-[{}],{},stat back times:{},audit
id:{},cost:{}ms",
+ statCycle.getStartTime(), statCycle.getEndTime(),
+ sourceConfig.getAuditCycle(),
+ statBackTimes, auditId, timeCost);
+ }
+ }
+
+ /**
+ * Query
+ *
+ * @param startTime
+ * @param endTime
+ * @param auditId
+ */
+ public void query(String startTime, String endTime, String auditId) {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement pstat =
connection.prepareStatement(sourceConfig.getQuerySql())) {
+ if (connection.isClosed()) {
+ createDataSource();
+ }
+ pstat.setString(1, startTime);
+ pstat.setString(2, endTime);
+ pstat.setString(3, auditId);
+ try (ResultSet resultSet = pstat.executeQuery()) {
+ while (resultSet.next()) {
+ String inlongGroupID = resultSet.getString(1);
+ String InlongStreamID = resultSet.getString(2);
+ String AuditId = resultSet.getString(3);
+ String AuditTag = resultSet.getString(4);
+ long count = resultSet.getLong(5);
+ long size = resultSet.getLong(6);
+ long delay = resultSet.getLong(7);
+ StatData data = new StatData();
+ data.setLogTs(startTime);
+ data.setInlongGroupId(inlongGroupID);
+ data.setInlongStreamId(InlongStreamID);
+ data.setAuditId(AuditId);
+ data.setAuditTag(AuditTag);
+ data.setCount(count);
+ data.setSize(size);
+ data.setDelay(delay);
+ dataQueue.push(data);
+ }
+ } catch (SQLException sqlException) {
+ LOG.error("Query has SQL exception! ", sqlException);
+ }
+ } catch (Exception exception) {
+ LOG.error("Query has exception! ", exception);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/source/AbstractSource.java
b/inlong-audit/audit-service/src/main/java/source/AbstractSource.java
deleted file mode 100644
index b2c8cc8f2e..0000000000
--- a/inlong-audit/audit-service/src/main/java/source/AbstractSource.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package source;
-
-import channel.DataQueue;
-import entities.SourceConfig;
-import entities.StartEndTime;
-import lombok.Data;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.sql.DataSource;
-
-import java.sql.SQLException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-
-/**
- * Abstract source
- */
-@Data
-public class AbstractSource {
-
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractSource.class);
- protected final ConcurrentHashMap<Integer, ScheduledExecutorService>
statTimers = new ConcurrentHashMap<>();
- protected DataQueue dataQueue;
- protected List<String> auditIds;
- protected int querySqlTimeout;
- protected DataSource dataSource;
- protected String querySql;
- protected SourceConfig sourceConfig;
-
- protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
-
- public AbstractSource(DataQueue dataQueue) {
- this.dataQueue = dataQueue;
- }
-
- /**
- * Get stat cycle of minute
- *
- * @param beforeHour
- * @param dataCycle
- * @return
- */
- public List<StartEndTime> getStatCycleMinute(int beforeHour, int
dataCycle) {
- List<StartEndTime> statCycleList = new LinkedList<>();
- for (int step = 0; step < 60; step = step + dataCycle) {
- Calendar calendar = Calendar.getInstance();
- calendar.add(Calendar.HOUR_OF_DAY, -beforeHour);
-
- calendar.set(Calendar.MINUTE, step);
- calendar.set(Calendar.SECOND, 0);
- SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
- StartEndTime statCycle = new StartEndTime();
- statCycle.setStartTime(dateFormat.format(calendar.getTime()));
-
- calendar.set(Calendar.MINUTE, step + dataCycle - 1);
- calendar.set(Calendar.SECOND, 0);
- statCycle.setEndTime(dateFormat.format(calendar.getTime()));
- statCycleList.add(statCycle);
- }
- return statCycleList;
- }
-
- /**
- * Get stat cycle of day
- *
- * @param beforeDay
- * @return
- */
- public List<StartEndTime> getStatCycleDay(int beforeDay) {
- StartEndTime statCycle = new StartEndTime();
- Calendar calendar = Calendar.getInstance();
- calendar.add(Calendar.DATE, -beforeDay);
- calendar.set(Calendar.HOUR_OF_DAY, 0);
- calendar.set(Calendar.MINUTE, 0);
- calendar.set(Calendar.SECOND, 0);
- calendar.set(Calendar.MILLISECOND, 0);
-
- SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
- statCycle.setStartTime(dateFormat.format(calendar.getTime()));
-
- calendar.set(Calendar.HOUR_OF_DAY, 23);
- calendar.set(Calendar.MINUTE, 59);
- statCycle.setEndTime(dateFormat.format(calendar.getTime()));
- return new ArrayList<StartEndTime>() {
-
- {
- add(statCycle);
- }
- };
- }
-
- /**
- * Destory
- */
- public void destory() {
- try {
- dataSource.getConnection().close();
- } catch (SQLException exception) {
- LOG.error(exception.getMessage());
- }
- }
-}
diff --git a/inlong-audit/audit-service/src/main/java/source/SourceStat.java
b/inlong-audit/audit-service/src/main/java/source/SourceStat.java
deleted file mode 100644
index 4477d25532..0000000000
--- a/inlong-audit/audit-service/src/main/java/source/SourceStat.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package source;
-
-/**
- * Source stat interface.
- */
-public interface SourceStat extends Runnable {
-
- public void statByStep();
-
- public void aggregate(String auditId);
-
- public void query(String startTime, String endTime, String auditId);
-}
diff --git a/inlong-audit/pom.xml b/inlong-audit/pom.xml
index ea25c3daaf..54ebd8fcc2 100644
--- a/inlong-audit/pom.xml
+++ b/inlong-audit/pom.xml
@@ -84,6 +84,12 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP</artifactId>
+ <version>${HikariCP.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>