This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 50539a5660 [INLONG-9943][Audit] Audit-service add codes of jdbc sink
(#9949)
50539a5660 is described below
commit 50539a566013f49a70499b57750e3fc27a099ebc
Author: doleyzi <[email protected]>
AuthorDate: Tue Apr 9 19:12:45 2024 +0800
[INLONG-9943][Audit] Audit-service add codes of jdbc sink (#9949)
* Audit-service add codes of jdbc sink for aggregate the data from the data
source and store the aggregated data to the target storage
* Use lombok.AllArgsConstructor annotation to structure SinkConfig
---
.../inlong/audit/config/ConfigConstants.java | 12 +-
.../apache/inlong/audit/config/SqlConstants.java | 10 ++
.../apache/inlong/audit/entities/SinkConfig.java | 36 ++++
.../apache/inlong/audit/service/EtlService.java | 196 +++++++++++++++++++++
.../org/apache/inlong/audit/sink/JdbcSink.java | 164 +++++++++++++++++
5 files changed, 412 insertions(+), 6 deletions(-)
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
index bc4f0f9326..ecf64338e7 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
@@ -25,14 +25,14 @@ public class ConfigConstants {
// Source config
public static final String KEY_CLICKHOUSE_DRIVER = "clickhouse.driver";
public static final String DEFAULT_CLICKHOUSE_DRIVER =
"ru.yandex.clickhouse.ClickHouseDriver";
- public static final String KEY_CLICKHOUSE_URL = "clickhouse.url";
+ public static final String KEY_CLICKHOUSE_JDBC_URL = "clickhouse.jdbc.url";
public static final String KEY_CLICKHOUSE_USERNAME = "clickhouse.username";
public static final String KEY_CLICKHOUSE_PASSWORD = "clickhouse.password";
// DB config
public static final String KEY_MYSQL_DRIVER = "mysql.driver";
public static final String KEY_DEFAULT_MYSQL_DRIVER =
"com.mysql.cj.jdbc.Driver";
- public static final String KEY_MYSQL_URL = "mysql.url";
+ public static final String KEY_MYSQL_JDBC_URL = "mysql.jdbc.url";
public static final String KEY_MYSQL_USERNAME = "mysql.username";
public static final String KEY_MYSQL_PASSWORD = "mysql.password";
@@ -91,15 +91,15 @@ public class ConfigConstants {
public static final String DEFAULT_REALTIME_SUMMARY_SOURCE_TABLE =
"audit_data";
public static final String KEY_REALTIME_SUMMARY_SINK_TABLE =
"realtime.summary.sink.table";
public static final String DEFAULT_REALTIME_SUMMARY_SINK_TABLE =
"audit_data_temp";
- public static final String KEY_REALTIME_SUMMARY_BEFORE_TIMES =
"realtime.summary.before.times";
- public static final int DEFAULT_REALTIME_SUMMARY_BEFORE_TIMES = 6;
+ public static final String KEY_REALTIME_SUMMARY_STAT_BACK_TIMES =
"realtime.summary.stat.back.times";
+ public static final int DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES = 6;
public static final String KEY_DAILY_SUMMARY_SOURCE_TABLE =
"daily.summary.source.table";
public static final String DEFAULT_DAILY_SUMMARY_SOURCE_TABLE =
"audit_data_temp";
public static final String KEY_DAILY_SUMMARY_SINK_TABLE =
"daily.summary.sink.table";
public static final String DEFAULT_DAILY_SUMMARY_SINK_TABLE =
"audit_data_day";
- public static final String KEY_DAILY_SUMMARY_BEFORE_TIMES =
"daily.summary.before.times";
- public static final int DEFAULT_DAILY_SUMMARY_BEFORE_TIMES = 2;
+ public static final String KEY_DAILY_SUMMARY_STAT_BACK_TIMES =
"daily.summary.stat.back.times";
+ public static final int DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES = 2;
// HA selector config
public static final String KEY_RELEASE_LEADER_INTERVAL =
"release.leader.interval";
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index a64cd2234d..1eee2f7ec9 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -77,4 +77,14 @@ public class SqlConstants {
"AND audit_id = ? \n" +
"GROUP BY inlong_group_id, inlong_stream_id, audit_id,
audit_tag";
+ // Mysql insert sql
+ public static final String KEY_MYSQL_SINK_INSERT_DAY_SQL =
"mysql.sink.insert.day.sql";
+ public static final String DEFAULT_MYSQL_SINK_INSERT_DAY_SQL =
+ "replace into audit_data_day (log_ts,inlong_group_id,
inlong_stream_id, audit_id,audit_tag,count, size, delay) "
+ + " values (?,?,?,?,?,?,?,?)";
+ public static final String KEY_MYSQL_SINK_INSERT_TEMP_SQL =
"mysql.sink.insert.temp.sql";
+ public static final String DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL =
+ "replace into audit_data_temp (log_ts,inlong_group_id,
inlong_stream_id, audit_id,audit_tag,count, size, delay) "
+ + " values (?,?,?,?,?,?,?,?)";
+
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java
new file mode 100644
index 0000000000..d2e137ec83
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.entities;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * Source config
+ */
+@Data
+@AllArgsConstructor
+public class SinkConfig {
+
+ private String insertSql;
+ private final String driverClassName;
+ private final String jdbcUrl;
+ private final String username;
+ private final String password;
+
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
new file mode 100644
index 0000000000..dada6cb7f2
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service;
+
+import org.apache.inlong.audit.channel.DataQueue;
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.AuditCycle;
+import org.apache.inlong.audit.entities.SinkConfig;
+import org.apache.inlong.audit.entities.SourceConfig;
+import org.apache.inlong.audit.sink.JdbcSink;
+import org.apache.inlong.audit.source.JdbcSource;
+
+import java.util.Objects;
+
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CLICKHOUSE_DRIVER;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATA_QUEUE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_DRIVER;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_JDBC_URL;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_PASSWORD;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_USERNAME;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DAILY_SUMMARY_STAT_BACK_TIMES;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATA_QUEUE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER;
+import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_REALTIME_SUMMARY_STAT_BACK_TIMES;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SINK_INSERT_DAY_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_CLICKHOUSE_SOURCE_QUERY_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SINK_INSERT_DAY_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SINK_INSERT_TEMP_SQL;
+import static
org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY_TEMP_SQL;
+
+/**
+ * Etl service aggregate the data from the data source and store the
aggregated data to the target storage.
+ */
+public class EtlService {
+
+ private JdbcSource mysqlSourceOfTemp;
+ private JdbcSink mysqlSinkOfDay;
+ private JdbcSource clickhouseSource;
+ private JdbcSink mysqlSinkOfTemp;
+ private final int queueSize;
+
+ public EtlService() {
+ queueSize = Configuration.getInstance().get(KEY_DATA_QUEUE_SIZE,
+ DEFAULT_DATA_QUEUE_SIZE);
+ }
+
+ /**
+ * Start the etl service.
+ */
+ public void start() {
+ clickhouseToMysql();
+ mysqlToMysqlOfDay();
+ }
+
+ /**
+ * Aggregate data from mysql data source and store the aggregated data in
the target mysql table.
+ * The audit data cycle is days,and stored in table of day.
+ */
+ private void mysqlToMysqlOfDay() {
+ DataQueue dataQueue = new DataQueue(queueSize);
+
+ mysqlSourceOfTemp = new JdbcSource(dataQueue,
buildMysqlSourceConfig());
+ mysqlSourceOfTemp.start();
+
+ SinkConfig sinkConfig =
buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_DAY_SQL,
+ DEFAULT_MYSQL_SINK_INSERT_DAY_SQL));
+ mysqlSinkOfDay = new JdbcSink(dataQueue, sinkConfig);
+ mysqlSinkOfDay.start();
+ }
+
+ /**
+ * Aggregate data from clickhouse data source and store the aggregated
data in the target mysql table.
+ * The default audit data cycle is 5 minutes,and stored in a temporary
table.
+ */
+ private void clickhouseToMysql() {
+ DataQueue dataQueue = new DataQueue(queueSize);
+
+ clickhouseSource = new JdbcSource(dataQueue,
buildClickhouseSourceConfig());
+ clickhouseSource.start();
+
+ SinkConfig sinkConfig =
buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_TEMP_SQL,
+ DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL));
+ mysqlSinkOfTemp = new JdbcSink(dataQueue, sinkConfig);
+ mysqlSinkOfTemp.start();
+ }
+
+ /**
+ * Build the configurations of mysql sink.
+ *
+ * @param insertSql
+ * @return
+ */
+ private SinkConfig buildMysqlSinkConfig(String insertSql) {
+ String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER,
KEY_DEFAULT_MYSQL_DRIVER);
+ String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL);
+ String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME);
+ String passWord = Configuration.getInstance().get(KEY_MYSQL_PASSWORD);
+ assert (Objects.nonNull(driver)
+ && Objects.nonNull(jdbcUrl)
+ && Objects.nonNull(userName)
+ && Objects.nonNull(passWord));
+
+ return new SinkConfig(
+ insertSql,
+ driver,
+ jdbcUrl,
+ userName,
+ passWord);
+ }
+
+ /**
+ * Build the configurations of mysql source.
+ *
+ * @return
+ */
+ private SourceConfig buildMysqlSourceConfig() {
+ String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER,
KEY_DEFAULT_MYSQL_DRIVER);
+ String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL);
+ String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME);
+ String passWord = Configuration.getInstance().get(KEY_MYSQL_PASSWORD);
+ assert (Objects.nonNull(driver)
+ && Objects.nonNull(jdbcUrl)
+ && Objects.nonNull(userName));
+
+ return new SourceConfig(AuditCycle.DAY,
+
Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL,
+ DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL),
+
Configuration.getInstance().get(KEY_DAILY_SUMMARY_STAT_BACK_TIMES,
+ DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES),
+ driver,
+ jdbcUrl,
+ userName,
+ passWord);
+ }
+
+ /**
+ * Build the configurations of clickhouse source.
+ *
+ * @return
+ */
+ private SourceConfig buildClickhouseSourceConfig() {
+ String driver = Configuration.getInstance().get(KEY_CLICKHOUSE_DRIVER,
DEFAULT_CLICKHOUSE_DRIVER);
+ String jdbcUrl =
Configuration.getInstance().get(KEY_CLICKHOUSE_JDBC_URL);
+ String userName =
Configuration.getInstance().get(KEY_CLICKHOUSE_USERNAME);
+ String passWord =
Configuration.getInstance().get(KEY_CLICKHOUSE_PASSWORD);
+ assert (Objects.nonNull(driver)
+ && Objects.nonNull(jdbcUrl)
+ && Objects.nonNull(userName)
+ && Objects.nonNull(passWord));
+
+ return new SourceConfig(AuditCycle.MINUTE_5,
+
Configuration.getInstance().get(KEY_CLICKHOUSE_SOURCE_QUERY_SQL,
+ DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL),
+
Configuration.getInstance().get(KEY_REALTIME_SUMMARY_STAT_BACK_TIMES,
+ DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES),
+ driver,
+ jdbcUrl,
+ userName,
+ passWord);
+ }
+
+ /**
+ * Stop the etl service,and destroy related resources.
+ */
+ public void stop() {
+ mysqlSourceOfTemp.destroy();
+ mysqlSinkOfDay.destroy();
+
+ clickhouseSource.destroy();
+ mysqlSinkOfTemp.destroy();
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
new file mode 100644
index 0000000000..99a6709826
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import org.apache.inlong.audit.channel.DataQueue;
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.SinkConfig;
+import org.apache.inlong.audit.entities.StatData;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_QUEUE_PULL_TIMEOUT;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_BATCH;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_INTERVAL;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_QUEUE_PULL_TIMEOUT;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_BATCH;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_INTERVAL;
+import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
+import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
+
+/**
+ * Jdbc sink
+ */
+public class JdbcSink implements AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcSink.class);
+ private final ScheduledExecutorService sinkTimer =
Executors.newSingleThreadScheduledExecutor();
+ private final DataQueue dataQueue;
+ private final int insertBatch;
+ private final int pullTimeOut;
+ private final SinkConfig sinkConfig;
+ private DataSource dataSource;
+
+ public JdbcSink(DataQueue dataQueue, SinkConfig sinkConfig) {
+ this.dataQueue = dataQueue;
+ this.sinkConfig = sinkConfig;
+
+ insertBatch = Configuration.getInstance().get(KEY_SOURCE_DB_SINK_BATCH,
+ DEFAULT_SOURCE_DB_SINK_BATCH);
+
+ pullTimeOut = Configuration.getInstance().get(KEY_QUEUE_PULL_TIMEOUT,
+ DEFAULT_QUEUE_PULL_TIMEOUT);
+ }
+
+ /**
+ * start
+ */
+ public void start() {
+ createDataSource();
+
+ sinkTimer.scheduleWithFixedDelay(this::process,
+ 0,
+ Configuration.getInstance().get(KEY_SOURCE_DB_SINK_INTERVAL,
+ DEFAULT_SOURCE_DB_SINK_INTERVAL),
+ TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Process
+ */
+ private void process() {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sinkConfig.getInsertSql())) {
+ if (connection.isClosed()) {
+ createDataSource();
+ }
+ int counter = 0;
+ StatData data = dataQueue.pull(pullTimeOut, TimeUnit.MICROSECONDS);
+ while (data != null) {
+ preparedStatement.setString(1, data.getLogTs());
+ preparedStatement.setString(2, data.getInlongGroupId());
+ preparedStatement.setString(3, data.getInlongStreamId());
+ preparedStatement.setString(4, data.getAuditId());
+ preparedStatement.setString(5, data.getAuditTag());
+ preparedStatement.setLong(6, data.getCount());
+ preparedStatement.setLong(7, data.getSize());
+ preparedStatement.setLong(8, data.getDelay());
+ preparedStatement.addBatch();
+
+ if (++counter >= insertBatch) {
+ preparedStatement.executeBatch();
+ preparedStatement.clearBatch();
+ counter = 0;
+ }
+ data = dataQueue.pull(pullTimeOut, TimeUnit.MICROSECONDS);
+ }
+ if (counter > 0) {
+ preparedStatement.executeBatch();
+ preparedStatement.clearBatch();
+ }
+ } catch (Exception e) {
+ LOG.error("Process exception! {}", e.getMessage());
+ }
+ }
+
+ /**
+ * Create data source
+ */
+ protected void createDataSource() {
+ HikariConfig config = new HikariConfig();
+ config.setDriverClassName(sinkConfig.getDriverClassName());
+ config.setJdbcUrl(sinkConfig.getJdbcUrl());
+ config.setUsername(sinkConfig.getUsername());
+ config.setPassword(sinkConfig.getPassword());
+
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
+ DEFAULT_CONNECTION_TIMEOUT));
+ config.addDataSourceProperty(CACHE_PREP_STMTS,
+ Configuration.getInstance().get(KEY_CACHE_PREP_STMTS,
DEFAULT_CACHE_PREP_STMTS));
+ config.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
+ Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE,
DEFAULT_PREP_STMT_CACHE_SIZE));
+ config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
+ Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT,
DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
+ config.setMaximumPoolSize(
+ Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE,
+ DEFAULT_DATASOURCE_POOL_SIZE));
+ dataSource = new HikariDataSource(config);
+ }
+
+ public void destroy() {
+ sinkTimer.shutdown();
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}