This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 778cbe6050 [INLONG-11286][Audit] Optimize the statistics of daily
Audit data (#11312)
778cbe6050 is described below
commit 778cbe6050ff4c60b7fd17dbeabfe614289cac7e
Author: doleyzi <[email protected]>
AuthorDate: Wed Oct 9 19:32:17 2024 +0800
[INLONG-11286][Audit] Optimize the statistics of daily Audit data (#11312)
---
.../inlong/audit/config/ConfigConstants.java | 6 +-
.../apache/inlong/audit/config/SqlConstants.java | 12 +-
.../inlong/audit/entities/PartitionEntity.java | 71 ++++++++
.../apache/inlong/audit/entities/SourceConfig.java | 4 +-
.../org/apache/inlong/audit/main/Application.java | 3 +
.../apache/inlong/audit/service/EtlService.java | 153 ++++++-----------
.../inlong/audit/service/PartitionManager.java | 186 +++++++++++++++++++++
.../org/apache/inlong/audit/sink/AuditSink.java | 24 +++
.../org/apache/inlong/audit/sink/CacheSink.java | 2 +-
.../org/apache/inlong/audit/sink/JdbcSink.java | 152 ++---------------
.../org/apache/inlong/audit/source/JdbcSource.java | 37 +---
.../org/apache/inlong/audit/utils/JdbcUtils.java | 37 ++++
inlong-audit/sql/apache_inlong_audit_mysql.sql | 4 +-
13 files changed, 413 insertions(+), 278 deletions(-)
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
index d5afb1306e..d74e162f78 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
@@ -57,9 +57,6 @@ public class ConfigConstants {
public static final int DEFAULT_SOURCE_DB_SINK_BATCH = 1000;
public static final String KEY_CONFIG_UPDATE_INTERVAL_SECONDS =
"config.update.interval.seconds";
public static final int DEFAULT_CONFIG_UPDATE_INTERVAL_SECONDS = 60;
-
- public static final String KEY_ENABLE_MANAGE_PARTITIONS =
"enable.manage.partitions";
- public static final boolean DEFAULT_ENABLE_MANAGE_PARTITIONS = true;
public static final String KEY_CHECK_PARTITION_INTERVAL_HOURS =
"check.partition.interval.hours";
public static final int DEFAULT_CHECK_PARTITION_INTERVAL_HOURS = 6;
@@ -113,4 +110,7 @@ public class ConfigConstants {
public static final int MAX_INIT_COUNT = 2;
public static final int RANDOM_BOUND = 10;
+ public static final String KEY_ENABLE_STAT_AUDIT_DAY =
"enable.stat.audit.day";
+ public static final boolean DEFAULT_ENABLE_STAT_AUDIT_DAY = true;
+
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index b48368b921..04fee349b5 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -170,8 +170,12 @@ public class SqlConstants {
public static final String KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL =
"audit.data.temp.delete.partition.sql";
public static final String DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL =
"ALTER TABLE audit_data_temp DROP PARTITION %s";
-
- public static final String KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL =
"audit.data.temp.check.partition.sql";
- public static final String DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL =
- "SELECT COUNT(*) AS count FROM INFORMATION_SCHEMA.PARTITIONS WHERE
TABLE_NAME = 'audit_data_temp' and PARTITION_NAME = ?";
+ public static final String KEY_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL =
"audit.data.check.partition.sql";
+ public static final String DEFAULT_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL =
+ "SELECT COUNT(*) AS count FROM INFORMATION_SCHEMA.PARTITIONS WHERE
TABLE_NAME = '%s' and PARTITION_NAME = '%s'";
+ public static final String KEY_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL =
"audit.data.day.add.partition.sql";
+ public static final String DEFAULT_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL =
+ "ALTER TABLE audit_data_day ADD PARTITION (PARTITION %s VALUES
LESS THAN (TO_DAYS('%s')))";
+ public static final String TABLE_AUDIT_DATA_DAY = "audit_data_day";
+ public static final String TABLE_AUDIT_DATA_TEMP = "audit_data_temp";
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/PartitionEntity.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/PartitionEntity.java
new file mode 100644
index 0000000000..4704ea1031
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/PartitionEntity.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.entities;
+
+import org.apache.inlong.audit.config.Configuration;
+
+import lombok.Data;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL;
+
+@Data
+public class PartitionEntity {
+
+ private final String tableName;
+ private final String addPartitionStatement;
+ private final String deletePartitionStatement;
+ private final DateTimeFormatter FORMATTER_YYMMDDHH =
DateTimeFormatter.ofPattern("yyyyMMdd");
+ private final DateTimeFormatter FORMATTER_YY_MM_DD_HH =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+ private String formatPartitionName(LocalDate date) {
+ return "p" + date.format(FORMATTER_YYMMDDHH);
+ }
+
+ public PartitionEntity(String tableName, String addPartitionStatement,
String deletePartitionStatement) {
+ this.tableName = tableName;
+ this.addPartitionStatement = addPartitionStatement;
+ this.deletePartitionStatement = deletePartitionStatement;
+ }
+
+ public String getAddPartitionSql(long daysToAdd) {
+ String partitionValue = LocalDate.now().plusDays(daysToAdd +
1).format(FORMATTER_YY_MM_DD_HH);
+ return String.format(addPartitionStatement,
getAddPartitionName(daysToAdd), partitionValue);
+ }
+
+ public String getDeletePartitionSql(long daysToDelete) {
+ return String.format(deletePartitionStatement,
getDeletePartitionName(daysToDelete));
+ }
+
+ public String getCheckPartitionSql(long partitionDay, boolean isDelete) {
+ String partitionName = isDelete ? getDeletePartitionName(partitionDay)
: getAddPartitionName(partitionDay);
+ return
String.format(Configuration.getInstance().get(KEY_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL,
+ DEFAULT_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL), tableName,
partitionName);
+ }
+
+ public String getAddPartitionName(long daysToAdd) {
+ return formatPartitionName(LocalDate.now().plusDays(daysToAdd));
+ }
+
+ public String getDeletePartitionName(long daysToDelete) {
+ return formatPartitionName(LocalDate.now().minusDays(daysToDelete));
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
index 88730a203a..b43b72c052 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
@@ -32,7 +32,7 @@ public class SourceConfig {
private int statBackTimes;
private final String driverClassName;
private final String jdbcUrl;
- private final String username;
+ private final String userName;
private final String password;
private boolean needJoin = false;
@@ -48,7 +48,7 @@ public class SourceConfig {
this.statBackTimes = statBackTimes;
this.driverClassName = driverClassName;
this.jdbcUrl = jdbcUrl;
- this.username = username;
+ this.userName = username;
this.password = password;
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
index 067667133d..7df068473a 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
@@ -25,6 +25,7 @@ import org.apache.inlong.audit.selector.api.SelectorFactory;
import org.apache.inlong.audit.service.ApiService;
import org.apache.inlong.audit.service.ConfigService;
import org.apache.inlong.audit.service.EtlService;
+import org.apache.inlong.audit.service.PartitionManager;
import org.apache.inlong.audit.utils.JdbcUtils;
import org.apache.inlong.common.util.NetworkUtils;
@@ -51,6 +52,8 @@ public class Application {
// Periodically obtain audit id and audit course configuration
from DB
ConfigService.getInstance().start();
+ PartitionManager.getInstance().start();
+
// Etl service aggregate the data from the data source and store
the aggregated data to the target storage
etlService.start();
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
index 95e1cddd75..a5eb286a08 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
@@ -26,11 +26,14 @@ import org.apache.inlong.audit.entities.AuditCycle;
import org.apache.inlong.audit.entities.JdbcConfig;
import org.apache.inlong.audit.entities.SinkConfig;
import org.apache.inlong.audit.entities.SourceConfig;
+import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.sink.AuditSink;
import org.apache.inlong.audit.sink.CacheSink;
import org.apache.inlong.audit.sink.JdbcSink;
import org.apache.inlong.audit.source.JdbcSource;
import org.apache.inlong.audit.utils.JdbcUtils;
+import com.github.benmanes.caffeine.cache.Cache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,10 +41,12 @@ import java.util.LinkedList;
import java.util.List;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATA_QUEUE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_ENABLE_STAT_AUDIT_DAY;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATA_QUEUE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_ENABLE_STAT_AUDIT_DAY;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_SERVICE_ID;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_DAILY_STAT_BACK_TIMES;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_REALTIME_STAT_BACK_TIMES;
@@ -60,113 +65,70 @@ import static
org.apache.inlong.audit.config.SqlConstants.KEY_SOURCE_STAT_SQL;
public class EtlService {
private static final Logger LOGGER =
LoggerFactory.getLogger(EtlService.class);
- private JdbcSource mysqlSourceOfTemp;
- private JdbcSource mysqlSourceOfTenMinutesCache;
- private JdbcSource mysqlSourceOfHalfHourCache;
- private JdbcSource mysqlSourceOfHourCache;
- private JdbcSink mysqlSinkOfDay;
- private final List<JdbcSource> auditJdbcSources = new LinkedList<>();
- private JdbcSink mysqlSinkOfTemp;
- private CacheSink cacheSinkOfTenMinutesCache;
- private CacheSink cacheSinkOfHalfHourCache;
- private CacheSink cacheSinkOfHourCache;
+
+ // Statistics of original audit data
+ private final List<JdbcSource> originalSources = new LinkedList<>();
private final int queueSize;
- private final int statBackTimes;
private final String serviceId;
+ private final Configuration configuration;
+
+ private final List<JdbcSource> dataFlowSources = new LinkedList<>();
+ private final List<AuditSink> dataFlowSinks = new LinkedList<>();
public EtlService() {
- queueSize = Configuration.getInstance().get(KEY_DATA_QUEUE_SIZE,
+ configuration = Configuration.getInstance();
+ queueSize = configuration.get(KEY_DATA_QUEUE_SIZE,
DEFAULT_DATA_QUEUE_SIZE);
- statBackTimes =
Configuration.getInstance().get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
- DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES);
- serviceId = Configuration.getInstance().get(KEY_SELECTOR_SERVICE_ID,
DEFAULT_SELECTOR_SERVICE_ID);
+ serviceId = configuration.get(KEY_SELECTOR_SERVICE_ID,
DEFAULT_SELECTOR_SERVICE_ID);
}
- /**
- * Start the etl service.
- */
public void start() {
- mysqlToMysqlOfDay();
- mysqlToTenMinutesCache();
- mysqlToHalfHourCache();
- mysqlToHourCache();
- }
-
- /**
- * Aggregate data from mysql data source and store the aggregated data in
the target mysql table.
- * The audit data cycle is days,and stored in table of day.
- */
- private void mysqlToMysqlOfDay() {
- DataQueue dataQueue = new DataQueue(queueSize);
-
- mysqlSourceOfTemp = new JdbcSource(dataQueue,
buildMysqlSourceConfig(AuditCycle.DAY,
-
Configuration.getInstance().get(KEY_SUMMARY_DAILY_STAT_BACK_TIMES,
- DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES)));
- mysqlSourceOfTemp.start();
-
- SinkConfig sinkConfig =
buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_DAY_SQL,
- DEFAULT_MYSQL_SINK_INSERT_DAY_SQL));
- mysqlSinkOfDay = new JdbcSink(dataQueue, sinkConfig);
- mysqlSinkOfDay.start();
- }
-
- /**
- * Aggregate data from mysql data source and store in local cache for
openapi.
- */
- private void mysqlToTenMinutesCache() {
- DataQueue dataQueue = new DataQueue(queueSize);
- mysqlSourceOfTenMinutesCache =
- new JdbcSource(dataQueue,
buildMysqlSourceConfig(AuditCycle.MINUTE_10, statBackTimes));
- mysqlSourceOfTenMinutesCache.start();
-
- cacheSinkOfTenMinutesCache = new CacheSink(dataQueue,
TenMinutesCache.getInstance().getCache());
- cacheSinkOfTenMinutesCache.start();
- }
+ int statBackTimes =
configuration.get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
+ DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES);
- /**
- * Aggregate data from mysql data source and store in local cache for
openapi.
- */
- private void mysqlToHalfHourCache() {
- DataQueue dataQueue = new DataQueue(queueSize);
- mysqlSourceOfHalfHourCache =
- new JdbcSource(dataQueue,
buildMysqlSourceConfig(AuditCycle.MINUTE_30, statBackTimes));
- mysqlSourceOfHalfHourCache.start();
+ startDataFlow(AuditCycle.MINUTE_10, statBackTimes,
TenMinutesCache.getInstance().getCache());
+ startDataFlow(AuditCycle.MINUTE_30, statBackTimes,
HalfHourCache.getInstance().getCache());
+ startDataFlow(AuditCycle.HOUR, statBackTimes,
HourCache.getInstance().getCache());
- cacheSinkOfHalfHourCache = new CacheSink(dataQueue,
HalfHourCache.getInstance().getCache());
- cacheSinkOfHalfHourCache.start();
+ if (configuration.get(KEY_ENABLE_STAT_AUDIT_DAY,
DEFAULT_ENABLE_STAT_AUDIT_DAY)) {
+ statBackTimes =
configuration.get(KEY_SUMMARY_DAILY_STAT_BACK_TIMES,
DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES);
+ startDataFlow(AuditCycle.DAY, statBackTimes, null);
+ }
}
- /**
- * Aggregate data from mysql data source and store in local cache for
openapi.
- */
- private void mysqlToHourCache() {
+ private void startDataFlow(AuditCycle cycle, int backTimes, Cache<String,
StatData> cache) {
DataQueue dataQueue = new DataQueue(queueSize);
- mysqlSourceOfHourCache = new JdbcSource(dataQueue,
buildMysqlSourceConfig(AuditCycle.HOUR, statBackTimes));
- mysqlSourceOfHourCache.start();
-
- cacheSinkOfHourCache = new CacheSink(dataQueue,
HourCache.getInstance().getCache());
- cacheSinkOfHourCache.start();
+ JdbcSource source = new JdbcSource(dataQueue,
buildMysqlSourceConfig(cycle, backTimes));
+ source.start();
+ dataFlowSources.add(source);
+
+ AuditSink sink;
+ if (cache != null) {
+ sink = new CacheSink(dataQueue, cache);
+ } else {
+ SinkConfig sinkConfig =
buildMysqlSinkConfig(configuration.get(KEY_MYSQL_SINK_INSERT_DAY_SQL,
+ DEFAULT_MYSQL_SINK_INSERT_DAY_SQL));
+ sink = new JdbcSink(dataQueue, sinkConfig);
+ }
+ sink.start();
+ dataFlowSinks.add(sink);
}
- /**
- * Aggregate data from clickhouse data source and store the aggregated
data in the target mysql table.
- * The default audit data cycle is 5 minutes,and stored in a temporary
table.
- * Support multiple audit source clusters.
- */
public void auditSourceToMysql() {
DataQueue dataQueue = new DataQueue(queueSize);
List<JdbcConfig> sourceList =
ConfigService.getInstance().getAuditSourceByServiceId(serviceId);
for (JdbcConfig jdbcConfig : sourceList) {
JdbcSource jdbcSource = new JdbcSource(dataQueue,
buildAuditJdbcSourceConfig(jdbcConfig));
jdbcSource.start();
- auditJdbcSources.add(jdbcSource);
+ originalSources.add(jdbcSource);
LOGGER.info("Audit source to mysql jdbc config:{}", jdbcConfig);
}
- SinkConfig sinkConfig =
buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_TEMP_SQL,
+ SinkConfig sinkConfig =
buildMysqlSinkConfig(configuration.get(KEY_MYSQL_SINK_INSERT_TEMP_SQL,
DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL));
- mysqlSinkOfTemp = new JdbcSink(dataQueue, sinkConfig);
- mysqlSinkOfTemp.start();
+ JdbcSink sink = new JdbcSink(dataQueue, sinkConfig);
+ sink.start();
+ dataFlowSinks.add(sink);
}
/**
@@ -193,7 +155,7 @@ public class EtlService {
private SourceConfig buildMysqlSourceConfig(AuditCycle auditCycle, int
statBackTimes) {
JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
return new SourceConfig(auditCycle,
-
Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL,
+ configuration.get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL,
DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL),
statBackTimes,
jdbcConfig.getDriverClass(),
@@ -209,9 +171,9 @@ public class EtlService {
*/
private SourceConfig buildAuditJdbcSourceConfig(JdbcConfig jdbcConfig) {
return new SourceConfig(AuditCycle.MINUTE_5,
- Configuration.getInstance().get(KEY_SOURCE_STAT_SQL,
+ configuration.get(KEY_SOURCE_STAT_SQL,
DEFAULT_SOURCE_STAT_SQL),
-
Configuration.getInstance().get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
+ configuration.get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES),
jdbcConfig.getDriverClass(),
jdbcConfig.getJdbcUrl(),
@@ -224,21 +186,14 @@ public class EtlService {
* Stop the etl service,and destroy related resources.
*/
public void stop() {
- mysqlSourceOfTemp.destroy();
- mysqlSinkOfDay.destroy();
-
- for (JdbcSource source : auditJdbcSources) {
+ for (JdbcSource source : originalSources) {
source.destroy();
}
- if (null != mysqlSinkOfTemp)
- mysqlSinkOfTemp.destroy();
-
- mysqlSourceOfTenMinutesCache.destroy();
- mysqlSourceOfHalfHourCache.destroy();
- mysqlSourceOfHourCache.destroy();
-
- cacheSinkOfTenMinutesCache.destroy();
- cacheSinkOfHalfHourCache.destroy();
- cacheSinkOfHourCache.destroy();
+ for (JdbcSource source : dataFlowSources) {
+ source.destroy();
+ }
+ for (AuditSink sink : dataFlowSinks) {
+ sink.destroy();
+ }
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/PartitionManager.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/PartitionManager.java
new file mode 100644
index 0000000000..5238bc1d05
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/PartitionManager.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service;
+
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.JdbcConfig;
+import org.apache.inlong.audit.entities.PartitionEntity;
+import org.apache.inlong.audit.utils.JdbcUtils;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CHECK_PARTITION_INTERVAL_HOURS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_AUDIT_DATA_TEMP_STORAGE_DAYS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CHECK_PARTITION_INTERVAL_HOURS;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL;
+import static org.apache.inlong.audit.config.SqlConstants.TABLE_AUDIT_DATA_DAY;
+import static
org.apache.inlong.audit.config.SqlConstants.TABLE_AUDIT_DATA_TEMP;
+
+public class PartitionManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionManager.class);
+ private static volatile PartitionManager partitionManager = null;
+ private final ScheduledExecutorService timer =
Executors.newSingleThreadScheduledExecutor();
+ private DataSource dataSource;
+ private final PartitionEntity auditDayTable;
+ private final PartitionEntity auditTempTable;
+ private final Configuration configuration;
+
+ public static PartitionManager getInstance() {
+ if (partitionManager == null) {
+ synchronized (PartitionManager.class) {
+ if (partitionManager == null) {
+ partitionManager = new PartitionManager();
+ }
+ }
+ }
+ return partitionManager;
+ }
+
+ private PartitionManager() {
+ configuration = Configuration.getInstance();
+ createDataSource();
+ auditDayTable = createAndAddPartition(TABLE_AUDIT_DATA_DAY,
+ KEY_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL,
+ DEFAULT_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL,
+ null,
+ null);
+ auditTempTable = createAndAddPartition(TABLE_AUDIT_DATA_TEMP,
+ KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL,
+ DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL,
+ KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL,
+ DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL);
+ }
+
+ private PartitionEntity createAndAddPartition(String tableName,
+ String addPartitionKey,
+ String defaultAddPartitionSql,
+ String deletePartitionKey,
+ String defaultDeletePartitionSql) {
+ String addPartitionSql = configuration.get(addPartitionKey,
defaultAddPartitionSql);
+ String deletePartitionSql =
+ deletePartitionKey != null ?
configuration.get(deletePartitionKey, defaultDeletePartitionSql) : null;
+ PartitionEntity partitionEntity = new PartitionEntity(tableName,
addPartitionSql, deletePartitionSql);
+ addPartition(partitionEntity, 0);
+ return partitionEntity;
+ }
+
+ public void start() {
+ long intervalHours =
+ configuration.get(KEY_CHECK_PARTITION_INTERVAL_HOURS,
DEFAULT_CHECK_PARTITION_INTERVAL_HOURS);
+ timer.scheduleWithFixedDelay(this::executePartitionManagement, 0,
intervalHours, TimeUnit.HOURS);
+ }
+
+ private void executePartitionManagement() {
+ try {
+ managePartition(auditDayTable, false);
+ managePartition(auditTempTable, true);
+ } catch (Exception e) {
+ LOGGER.error("Error occurred while managing partitions", e);
+ }
+ }
+
+ private void managePartition(PartitionEntity partitionEntity, boolean
delete) {
+ addPartition(partitionEntity, 1);
+ if (delete) {
+ long storageDays =
+ configuration.get(KEY_AUDIT_DATA_TEMP_STORAGE_DAYS,
DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS);
+ deletePartition(partitionEntity, storageDays);
+ }
+ }
+
+ private void addPartition(PartitionEntity partitionEntity, long daysToAdd)
{
+ String partitionName = partitionEntity.getAddPartitionName(daysToAdd);
+ if (isPartitionExist(partitionEntity.getCheckPartitionSql(daysToAdd,
false))) {
+ LOGGER.info("Partition [{}] of [{}] already exists. Don`t need to
add.", partitionName,
+ partitionEntity.getTableName());
+ return;
+ }
+ executeUpdate(partitionEntity.getAddPartitionSql(daysToAdd));
+ }
+
+ private void deletePartition(PartitionEntity partitionEntity, long
daysToDelete) {
+ String partitionName =
partitionEntity.getDeletePartitionName(daysToDelete);
+ if
(!isPartitionExist(partitionEntity.getCheckPartitionSql(daysToDelete, true))) {
+ LOGGER.info("Partition [{}] of [{}] does not exist. Don`t need to
delete.", partitionName,
+ partitionEntity.getTableName());
+ return;
+ }
+ executeUpdate(partitionEntity.getDeletePartitionSql(daysToDelete));
+ }
+
+ private boolean isPartitionExist(String querySql) {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(querySql)) {
+ return isPartitionInResultSet(statement);
+ } catch (SQLException exception) {
+ LOGGER.error("An exception occurred while checking partition
[{}]:", querySql, exception);
+ }
+ return false;
+ }
+
+ private boolean isPartitionInResultSet(PreparedStatement statement) {
+ try (ResultSet resultSet = statement.executeQuery()) {
+ if (resultSet.next()) {
+ return resultSet.getInt("count") > 0;
+ }
+ } catch (SQLException sqlException) {
+ LOGGER.error("An error occurred while processing the result set:",
sqlException);
+ }
+ return false;
+ }
+
+ private void executeUpdate(String sql) {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(sql)) {
+ statement.executeUpdate();
+ LOGGER.info("Success to manage partition, execute SQL: {}", sql);
+ } catch (SQLException e) {
+ LOGGER.error("Failed to execute update: {}", sql, e);
+ }
+ }
+
+ private void createDataSource() {
+ JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
+ HikariConfig hikariConfig = JdbcUtils.buildHikariConfig(
+ jdbcConfig.getDriverClass(),
+ jdbcConfig.getJdbcUrl(),
+ jdbcConfig.getUserName(),
+ jdbcConfig.getPassword());
+ dataSource = new HikariDataSource(hikariConfig);
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/AuditSink.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/AuditSink.java
new file mode 100644
index 0000000000..be7250e2dc
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/AuditSink.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+public interface AuditSink {
+
+ void start();
+ void destroy();
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
index 6f5f809a3d..a695d5d9b5 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
@@ -38,7 +38,7 @@ import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_
/**
* Cache sink
*/
-public class CacheSink {
+public class CacheSink implements AuditSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(CacheSink.class);
private final ScheduledExecutorService sinkTimer =
Executors.newSingleThreadScheduledExecutor();
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
index ac9afc50d4..e802984da9 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
@@ -21,6 +21,7 @@ import org.apache.inlong.audit.channel.DataQueue;
import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.SinkConfig;
import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.utils.JdbcUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
@@ -31,75 +32,41 @@ import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.time.LocalDate;
-import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CHECK_PARTITION_INTERVAL_HOURS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_ENABLE_MANAGE_PARTITIONS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_QUEUE_PULL_TIMEOUT;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_BATCH;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_INTERVAL;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_AUDIT_DATA_TEMP_STORAGE_DAYS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CHECK_PARTITION_INTERVAL_HOURS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_ENABLE_MANAGE_PARTITIONS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_QUEUE_PULL_TIMEOUT;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_BATCH;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_INTERVAL;
-import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
-import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL;
-import static
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
/**
* Jdbc sink
*/
-public class JdbcSink implements AutoCloseable {
+public class JdbcSink implements AutoCloseable, AuditSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(JdbcSink.class);
private final ScheduledExecutorService sinkTimer =
Executors.newSingleThreadScheduledExecutor();
- private final ScheduledExecutorService partitionManagerTimer =
Executors.newSingleThreadScheduledExecutor();
private final DataQueue dataQueue;
private final int insertBatch;
private final int pullTimeOut;
private final SinkConfig sinkConfig;
private DataSource dataSource;
-
- private final DateTimeFormatter FORMATTER_YYMMDDHH =
DateTimeFormatter.ofPattern("yyyyMMdd");
- private final DateTimeFormatter FORMATTER_YY_MM_DD_HH =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
- private final String checkPartitionSql;
+ private final Configuration configuration;
public JdbcSink(DataQueue dataQueue, SinkConfig sinkConfig) {
+ configuration = Configuration.getInstance();
this.dataQueue = dataQueue;
this.sinkConfig = sinkConfig;
- insertBatch = Configuration.getInstance().get(KEY_SOURCE_DB_SINK_BATCH,
+ insertBatch = configuration.get(KEY_SOURCE_DB_SINK_BATCH,
DEFAULT_SOURCE_DB_SINK_BATCH);
- pullTimeOut = Configuration.getInstance().get(KEY_QUEUE_PULL_TIMEOUT,
+ pullTimeOut = configuration.get(KEY_QUEUE_PULL_TIMEOUT,
DEFAULT_QUEUE_PULL_TIMEOUT);
- checkPartitionSql =
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL,
- DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL);
}
@@ -108,23 +75,11 @@ public class JdbcSink implements AutoCloseable {
*/
public void start() {
createDataSource();
-
sinkTimer.scheduleWithFixedDelay(this::process,
0,
- Configuration.getInstance().get(KEY_SOURCE_DB_SINK_INTERVAL,
+ configuration.get(KEY_SOURCE_DB_SINK_INTERVAL,
DEFAULT_SOURCE_DB_SINK_INTERVAL),
TimeUnit.MILLISECONDS);
- if (Configuration.getInstance().get(KEY_ENABLE_MANAGE_PARTITIONS,
- DEFAULT_ENABLE_MANAGE_PARTITIONS)) {
- // Create MySQL data partition of today
- addPartition(0);
-
-
partitionManagerTimer.scheduleWithFixedDelay(this::managePartitions,
- 0,
-
Configuration.getInstance().get(KEY_CHECK_PARTITION_INTERVAL_HOURS,
- DEFAULT_CHECK_PARTITION_INTERVAL_HOURS),
- TimeUnit.HOURS);
- }
}
/**
@@ -169,91 +124,12 @@ public class JdbcSink implements AutoCloseable {
* Create data source
*/
protected void createDataSource() {
- HikariConfig config = new HikariConfig();
- config.setDriverClassName(sinkConfig.getDriverClassName());
- config.setJdbcUrl(sinkConfig.getJdbcUrl());
- config.setUsername(sinkConfig.getUserName());
- config.setPassword(sinkConfig.getPassword());
-
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
- DEFAULT_CONNECTION_TIMEOUT));
- config.addDataSourceProperty(CACHE_PREP_STMTS,
- Configuration.getInstance().get(KEY_CACHE_PREP_STMTS,
DEFAULT_CACHE_PREP_STMTS));
- config.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
- Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE,
DEFAULT_PREP_STMT_CACHE_SIZE));
- config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
- Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT,
DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
- config.setMaximumPoolSize(
- Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE,
- DEFAULT_DATASOURCE_POOL_SIZE));
- dataSource = new HikariDataSource(config);
- }
-
- private void managePartitions() {
- // Create MySQL data partition of tomorrow
- addPartition(1);
-
- deletePartition();
- }
-
- private String formatPartitionName(LocalDate date) {
- return "p" + date.format(FORMATTER_YYMMDDHH);
- }
-
- private void addPartition(long daysToAdd) {
- String partitionName =
formatPartitionName(LocalDate.now().plusDays(daysToAdd));
- if (isPartitionExists(partitionName)) {
- LOGGER.info("Partition [{}] is exist, dont`t need add this
partition.", partitionName);
- return;
- }
- String partitionValue = LocalDate.now().plusDays(daysToAdd +
1).format(FORMATTER_YY_MM_DD_HH);
- String addPartitionSQL = String.format(
-
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL,
- DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL),
- partitionName, partitionValue);
- executeUpdate(addPartitionSQL);
- }
-
- private void deletePartition() {
- int daysToSubtract =
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_STORAGE_DAYS,
- DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS);
- String partitionName =
formatPartitionName(LocalDate.now().minusDays(daysToSubtract));
- if (!isPartitionExists(partitionName)) {
- LOGGER.info("Partition [{}] is not exist, dont`t need delete this
partition.", partitionName);
- return;
- }
- String deletePartitionSQL = String.format(
-
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL,
- DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL),
- partitionName);
- executeUpdate(deletePartitionSQL);
- }
-
- private void executeUpdate(String updateSQL) {
- try (Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement =
connection.prepareStatement(updateSQL)) {
- preparedStatement.executeUpdate();
- LOGGER.info("Execute update [{}] success!", updateSQL);
- } catch (Exception exception) {
- LOGGER.error("Execute update [{}] has exception!", updateSQL,
exception);
- }
- }
-
- private boolean isPartitionExists(String partitionName) {
- try (Connection connection = dataSource.getConnection();
- PreparedStatement statement =
connection.prepareStatement(checkPartitionSql)) {
- statement.setString(1, partitionName);
-
- try (ResultSet resultSet = statement.executeQuery()) {
- if (resultSet.next()) {
- return resultSet.getInt("count") > 0;
- }
- } catch (SQLException sqlException) {
- LOGGER.error("An error occurred while checking partition [{}]
existence:", partitionName, sqlException);
- }
- } catch (Exception exception) {
- LOGGER.error("An exception occurred while checking partition
[{}]existence:", partitionName, exception);
- }
- return false;
+ HikariConfig hikariConfig = JdbcUtils.buildHikariConfig(
+ sinkConfig.getDriverClassName(),
+ sinkConfig.getJdbcUrl(),
+ sinkConfig.getUserName(),
+ sinkConfig.getPassword());
+ dataSource = new HikariDataSource(hikariConfig);
}
public void destroy() {
@@ -261,7 +137,7 @@ public class JdbcSink implements AutoCloseable {
}
@Override
- public void close() throws Exception {
+ public void close() {
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
index 511efc1f97..da009ac62e 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
@@ -24,6 +24,7 @@ import org.apache.inlong.audit.entities.StartEndTime;
import org.apache.inlong.audit.entities.StatData;
import org.apache.inlong.audit.service.ConfigService;
import org.apache.inlong.audit.utils.CacheUtils;
+import org.apache.inlong.audit.utils.JdbcUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
@@ -51,26 +52,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_STAT_INTERVAL;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_STAT_BACK_INITIAL_OFFSET;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_STAT_THREAD_POOL_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_STAT_INTERVAL;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_STAT_BACK_INITIAL_OFFSET;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_STAT_THREAD_POOL_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
import static org.apache.inlong.audit.entities.AuditCycle.DAY;
import static org.apache.inlong.audit.entities.AuditCycle.HOUR;
@@ -181,23 +169,12 @@ public class JdbcSource {
* Create data source
*/
protected void createDataSource() {
- HikariConfig config = new HikariConfig();
- config.setDriverClassName(sourceConfig.getDriverClassName());
- config.setJdbcUrl(sourceConfig.getJdbcUrl());
- config.setUsername(sourceConfig.getUsername());
- config.setPassword(sourceConfig.getPassword());
-
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
- DEFAULT_CONNECTION_TIMEOUT));
- config.addDataSourceProperty(CACHE_PREP_STMTS,
- Configuration.getInstance().get(KEY_CACHE_PREP_STMTS,
DEFAULT_CACHE_PREP_STMTS));
- config.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
- Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE,
DEFAULT_PREP_STMT_CACHE_SIZE));
- config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
- Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT,
DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
- config.setMaximumPoolSize(
- Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE,
- DEFAULT_DATASOURCE_POOL_SIZE));
- dataSource = new HikariDataSource(config);
+ HikariConfig hikariConfig = JdbcUtils.buildHikariConfig(
+ sourceConfig.getDriverClassName(),
+ sourceConfig.getJdbcUrl(),
+ sourceConfig.getUserName(),
+ sourceConfig.getPassword());
+ dataSource = new HikariDataSource(hikariConfig);
}
/**
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
index fa629a725c..07f40e2e6b 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
@@ -20,13 +20,28 @@ package org.apache.inlong.audit.utils;
import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.JdbcConfig;
+import com.zaxxer.hikari.HikariConfig;
+
import java.util.Objects;
+import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
+import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
/**
* Jdbc utils
@@ -64,4 +79,26 @@ public class JdbcUtils {
userName,
password);
}
+
+ public static HikariConfig buildHikariConfig(String driverClassName,
String jdbcUrl, String userName,
+ String passWord) {
+ HikariConfig hikariConfig = new HikariConfig();
+ hikariConfig.setDriverClassName(driverClassName);
+ hikariConfig.setJdbcUrl(jdbcUrl);
+ hikariConfig.setUsername(userName);
+ hikariConfig.setPassword(passWord);
+ Configuration configuration = Configuration.getInstance();
+
hikariConfig.setConnectionTimeout(configuration.get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
+ DEFAULT_CONNECTION_TIMEOUT));
+ hikariConfig.addDataSourceProperty(CACHE_PREP_STMTS,
+ configuration.get(KEY_CACHE_PREP_STMTS,
DEFAULT_CACHE_PREP_STMTS));
+ hikariConfig.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
+ configuration.get(KEY_PREP_STMT_CACHE_SIZE,
DEFAULT_PREP_STMT_CACHE_SIZE));
+ hikariConfig.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
+ configuration.get(KEY_PREP_STMT_CACHE_SQL_LIMIT,
DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
+ hikariConfig.setMaximumPoolSize(
+ configuration.get(KEY_DATASOURCE_POOL_SIZE,
+ DEFAULT_DATASOURCE_POOL_SIZE));
+ return hikariConfig;
+ }
}
diff --git a/inlong-audit/sql/apache_inlong_audit_mysql.sql
b/inlong-audit/sql/apache_inlong_audit_mysql.sql
index e9e114e2e0..6fb07021f5 100644
--- a/inlong-audit/sql/apache_inlong_audit_mysql.sql
+++ b/inlong-audit/sql/apache_inlong_audit_mysql.sql
@@ -92,7 +92,9 @@ CREATE TABLE IF NOT EXISTS `audit_data_day`
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON
UPDATE CURRENT_TIMESTAMP COMMENT 'Update time',
PRIMARY KEY
(`log_ts`,`inlong_group_id`,`inlong_stream_id`,`audit_id`,`audit_tag`)
) ENGINE = InnoDB
-DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data day table';
+DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data day table'
+PARTITION BY RANGE (to_days(`log_ts`))
+(PARTITION pDefault VALUES LESS THAN (TO_DAYS('1970-01-01')));
-- ----------------------------
-- Table structure for selector