This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d963e5c753 [INLONG-10099][Audit] Audit-store adds the general JDBC
sink capability (#10104)
d963e5c753 is described below
commit d963e5c7537a8685df6812bbcf1b2d3b1b0a41a4
Author: doleyzi <[email protected]>
AuthorDate: Mon Apr 29 18:00:19 2024 +0800
[INLONG-10099][Audit] Audit-store adds the general JDBC sink capability
(#10104)
* Audit-store adds the general JDBC sink capability
* Audit-store adds the general JDBC sink capability
* Modify configuration file of JDBC
* Modify configuration file of JDBC
* Modify configuration file of JDBC
* Adjust the import order
---
inlong-audit/audit-docker/Dockerfile | 7 +-
inlong-audit/audit-docker/audit-docker.sh | 26 ++-
.../config/{StoreConfig.java => JdbcConfig.java} | 47 +++--
.../apache/inlong/audit/config/StoreConfig.java | 3 +
.../entities/JdbcDataPo.java} | 55 ++---
.../audit/service/AuditMsgConsumerServer.java | 18 +-
.../inlong/audit/service/ClickHouseService.java | 7 +
.../inlong/audit/service/ElasticsearchService.java | 7 +
.../apache/inlong/audit/service/InsertData.java | 5 +
.../apache/inlong/audit/service/JdbcService.java | 224 +++++++++++++++++++++
.../apache/inlong/audit/service/MySqlService.java | 8 +
.../inlong/audit/service/consume/BaseConsume.java | 13 +-
.../audit/service/consume/PulsarConsume.java | 3 +-
inlong-audit/conf/application.properties | 6 +
inlong-audit/conf/audit-service.properties | 16 --
15 files changed, 366 insertions(+), 79 deletions(-)
diff --git a/inlong-audit/audit-docker/Dockerfile
b/inlong-audit/audit-docker/Dockerfile
index d55ac7e2f6..6717eca684 100644
--- a/inlong-audit/audit-docker/Dockerfile
+++ b/inlong-audit/audit-docker/Dockerfile
@@ -35,7 +35,7 @@ ENV TUBE_AUDIT_TOPIC="inlong-audit"
ENV AUDIT_DBNAME="apache_inlong_audit"
# proxy/store/all, start audit module individually, or all
ENV START_MODE="all"
-# mysql / clickhouse / elasticsearch
+# mysql / clickhouse / elasticsearch / starrocks
ENV STORE_MODE=mysql
# mysql
ENV JDBC_URL=127.0.0.1:3306
@@ -52,6 +52,11 @@ ENV STORE_ES_PORT=9200
ENV STORE_ES_AUTHENABLE=false
ENV STORE_ES_USERNAME=elastic
ENV STORE_ES_PASSWD=inlong
+# starrocks
+ENV STORE_SR_URL=127.0.0.1:9030
+ENV STORE_SR_USERNAME=default
+ENV STORE_SR_PASSWD=default
+ENV STORE_SR_DBNAME="apache_inlong_audit"
# jvm
ENV AUDIT_JVM_HEAP_OPTS="-XX:+UseContainerSupport
-XX:InitialRAMPercentage=40.0 -XX:MaxRAMPercentage=80.0
-XX:-UseAdaptiveSizePolicy"
WORKDIR /opt/inlong-audit
diff --git a/inlong-audit/audit-docker/audit-docker.sh
b/inlong-audit/audit-docker/audit-docker.sh
index 42c43c9322..777a80b801 100755
--- a/inlong-audit/audit-docker/audit-docker.sh
+++ b/inlong-audit/audit-docker/audit-docker.sh
@@ -21,8 +21,12 @@ file_path=$(cd "$(dirname "$0")"/../;pwd)
store_conf_file=${file_path}/conf/application.properties
# proxy config
proxy_conf_file=${file_path}/conf/audit-proxy-${MQ_TYPE}.conf
-sql_file="${file_path}"/sql/apache_inlong_audit.sql
+sql_mysql_file="${file_path}"/sql/apache_inlong_audit_mysql.sql
sql_ck_file="${file_path}"/sql/apache_inlong_audit_clickhouse.sql
+sql_sr_file="${file_path}"/sql/apache_inlong_audit_starrocks.sql
+
+# audit-service config
+service_conf_file=${file_path}/conf/audit-service.properties
# replace the configuration for audit proxy
sed -i
"s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g"
"${store_conf_file}"
@@ -52,7 +56,7 @@ sed -i
"s/127.0.0.1:3306\/apache_inlong_audit/${JDBC_URL}\/${AUDIT_DBNAME}/g" "$
sed -i
"s/spring.datasource.druid.username=.*$/spring.datasource.druid.username=${USERNAME}/g"
"${store_conf_file}"
sed -i
"s/spring.datasource.druid.password=.*$/spring.datasource.druid.password=${PASSWORD}/g"
"${store_conf_file}"
# mysql file for audit
-sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_file}"
+sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_mysql_file}"
# clickhouse
sed -i
"s/clickhouse.url=.*$/clickhouse.url=jdbc:clickhouse:\/\/${STORE_CK_URL}\/${STORE_CK_DBNAME}/g"
"${store_conf_file}"
sed -i "s/clickhouse.username=.*$/clickhouse.username=${STORE_CK_USERNAME}/g"
"${store_conf_file}"
@@ -66,6 +70,18 @@ sed -i
"s/elasticsearch.authEnable=.*$/elasticsearch.authEnable=${STORE_ES_AUTHE
sed -i
"s/elasticsearch.username=.*$/elasticsearch.username=${STORE_ES_USERNAME}/g"
"${store_conf_file}"
sed -i
"s/elasticsearch.password=.*$/elasticsearch.password=${STORE_ES_PASSWD}/g"
"${store_conf_file}"
+# StarRocks SQL file for audit
+sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_sr_file}"
+# StarRocks
+sed -i
"s/jdbc.url=.*$/jdbc.url=jdbc:mysql:\/\/${STORE_SR_URL}\/${STORE_SR_DBNAME}/g"
"${store_conf_file}"
+sed -i "s/jdbc.username=.*$/jdbc.username=${STORE_SR_USERNAME}/g"
"${store_conf_file}"
+sed -i "s/jdbc.password=.*$/jdbc.password=${STORE_SR_PASSWD}/g"
"${store_conf_file}"
+
+# audit-service config
+sed -i
"s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${JDBC_URL}\/${AUDIT_DBNAME}/g"
"${service_conf_file}"
+sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${USERNAME}/g"
"${service_conf_file}"
+sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${PASSWORD}/g"
"${service_conf_file}"
+
# Whether the database table exists. If it does not exist, initialize the
database and skip if it exists.
if [[ "${JDBC_URL}" =~ (.+):([0-9]+) ]]; then
datasource_hostname=${BASH_REMATCH[1]}
@@ -96,6 +112,12 @@ fi
if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "store" ]; then
bash +x ./bin/store-start.sh
fi
+
+# start service
+if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "service" ]; then
+ bash +x ./bin/service-start.sh
+fi
+
sleep 3
# keep alive
tail -F ./logs/info.log
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java
similarity index 57%
copy from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
copy to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java
index 41ab00da8b..e7f3f639cd 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java
@@ -17,29 +17,28 @@
package org.apache.inlong.audit.config;
-import lombok.Getter;
-import lombok.Setter;
+import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-@Getter
-@Setter
-public class StoreConfig {
-
- @Value("${audit.config.store.mode:mysql}")
- private String store;
-
- public boolean isMysqlStore() {
- return store.contains("mysql");
- }
-
- public boolean isElasticsearchStore() {
- return store.contains("elasticsearch");
- }
-
- public boolean isClickHouseStore() {
- return store.contains("clickhouse");
- }
-
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@Data
+public class JdbcConfig {
+
+ @Value("${jdbc.driver:com.mysql.cj.jdbc.Driver}")
+ private String driver;
+ @Value("${jdbc.url}")
+ private String url;
+ @Value("${jdbc.username}")
+ private String userName;
+ @Value("${jdbc.password}")
+ private String password;
+ @Value("${jdbc.batchIntervalMs:1000}")
+ private int batchIntervalMs;
+ @Value("${jdbc.batchThreshold:500}")
+ private int batchThreshold;
+ @Value("${jdbc.processIntervalMs:100}")
+ private int processIntervalMs;
+ @Value("${data.queue.size:1000000}")
+ private int dataQueueSize;
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
index 41ab00da8b..c4a2db3328 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
@@ -42,4 +42,7 @@ public class StoreConfig {
return store.contains("clickhouse");
}
+ public boolean isJdbc() {
+ return store.contains("jdbc");
+ }
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
similarity index 52%
copy from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
copy to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
index 41ab00da8b..ebc42f4a53 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
@@ -15,31 +15,32 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.config;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-@Getter
-@Setter
-public class StoreConfig {
-
- @Value("${audit.config.store.mode:mysql}")
- private String store;
-
- public boolean isMysqlStore() {
- return store.contains("mysql");
- }
-
- public boolean isElasticsearchStore() {
- return store.contains("elasticsearch");
- }
-
- public boolean isClickHouseStore() {
- return store.contains("clickhouse");
- }
-
+package org.apache.inlong.audit.db.entities;
+
+import lombok.Data;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+
+import java.sql.Timestamp;
+
+@Data
+public class JdbcDataPo {
+
+ private String ip;
+ private String dockerId;
+ private String threadId;
+ private Timestamp sdkTs;
+ private Long packetId;
+ private Timestamp logTs;
+ private String inLongGroupId;
+ private String inLongStreamId;
+ private String auditId;
+ private String auditTag;
+ private long auditVersion;
+ private Long count;
+ private Long size;
+ private Long delay;
+ private Timestamp updateTime;
+ private Consumer<byte[]> consumer;
+ private MessageId messageId;
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index 04796a3b4b..2e2f4d9469 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -18,6 +18,7 @@
package org.apache.inlong.audit.service;
import org.apache.inlong.audit.config.ClickHouseConfig;
+import org.apache.inlong.audit.config.JdbcConfig;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.consts.ConfigConstants;
@@ -68,16 +69,14 @@ public class AuditMsgConsumerServer implements
InitializingBean {
private ClickHouseConfig chConfig;
// ClickHouseService
private ClickHouseService ckService;
-
+ @Autowired
+ private JdbcConfig jdbcConfig;
+ private JdbcService jdbcService;
private static final String DEFAULT_CONFIG_PROPERTIES =
"application.properties";
-
// interval time of getting mq config
private static final int INTERVAL_MS = 5000;
-
private final CloseableHttpClient httpClient =
HttpClientBuilder.create().build();
-
private final Gson gson = new Gson();
-
/**
* Initializing bean
*/
@@ -105,13 +104,15 @@ public class AuditMsgConsumerServer implements
InitializingBean {
if (mqConsume == null) {
LOG.error("Unknown MessageQueue {}", mqConfig.getMqType());
}
-
if (storeConfig.isElasticsearchStore()) {
esService.startTimerRoutine();
}
if (storeConfig.isClickHouseStore()) {
ckService.start();
}
+ if (storeConfig.isJdbc()) {
+ jdbcService.start();
+ }
mqConsume.start();
}
@@ -133,6 +134,11 @@ public class AuditMsgConsumerServer implements
InitializingBean {
ckService = new ClickHouseService(chConfig);
insertServiceList.add(ckService);
}
+ if (storeConfig.isJdbc()) {
+ // create jdbc object
+ jdbcService = new JdbcService(jdbcConfig);
+ insertServiceList.add(jdbcService);
+ }
return insertServiceList;
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
index ad257ba372..47c63aa395 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
@@ -21,6 +21,8 @@ import org.apache.inlong.audit.config.ClickHouseConfig;
import org.apache.inlong.audit.db.entities.ClickHouseDataPo;
import org.apache.inlong.audit.protocol.AuditData;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -197,6 +199,11 @@ public class ClickHouseService implements InsertData,
AutoCloseable {
}
}
+ @Override
+ public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId
messageId) {
+
+ }
+
/**
* close
*
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
index 02a23c3a8c..3de234146f 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
@@ -24,6 +24,8 @@ import org.apache.inlong.audit.protocol.AuditData;
import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -278,4 +280,9 @@ public class ElasticsearchService implements InsertData,
AutoCloseable {
esPo.setPacketId(msgBody.getPacketId());
this.insertData(esPo);
}
+
+ @Override
+ public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId
messageId) {
+
+ }
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
index 478920e3a2..7f9bcf8207 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
@@ -19,6 +19,9 @@ package org.apache.inlong.audit.service;
import org.apache.inlong.audit.protocol.AuditData;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+
/**
* Insert Data interface
*/
@@ -28,4 +31,6 @@ public interface InsertData {
* insert audit data to storage.
*/
void insert(AuditData msgBody);
+
+ void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId
messageId);
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java
new file mode 100644
index 0000000000..7aaac09e64
--- /dev/null
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service;
+
+import org.apache.inlong.audit.config.JdbcConfig;
+import org.apache.inlong.audit.db.entities.JdbcDataPo;
+import org.apache.inlong.audit.protocol.AuditData;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is a general jdbc sink service. As long as it meets the jdbc protocol,
you can use this service.
+ */
+public class JdbcService implements InsertData, AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcService.class);
+ private static final String INSERT_SQL = "insert into audit_data (ip,
docker_id, thread_id, \r\n"
+ + " sdk_ts, packet_id, log_ts, \r\n"
+ + " inlong_group_id, inlong_stream_id, audit_id, audit_tag,
audit_version, \r\n"
+ + " count, size, delay, \r\n"
+ + " update_time)\r\n"
+ + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+ private final JdbcConfig jdbcConfig;
+
+ private final ScheduledExecutorService timerService =
Executors.newSingleThreadScheduledExecutor();
+ private LinkedBlockingQueue<JdbcDataPo> receiveQueue;
+ private long lastCheckTime = System.currentTimeMillis();
+ private Connection connection;
+ private final List<JdbcDataPo> writeDataList = new LinkedList<>();
+
+ public JdbcService(JdbcConfig jdbcConfig) {
+ this.jdbcConfig = jdbcConfig;
+ }
+
+ /**
+ * start
+ */
+ public void start() {
+ receiveQueue = new
LinkedBlockingQueue<>(jdbcConfig.getDataQueueSize());
+ try {
+ Class.forName(jdbcConfig.getDriver());
+ reconnect();
+ } catch (Exception e) {
+ LOG.error("Start failure!", e);
+ }
+ timerService.scheduleWithFixedDelay(this::process,
+ jdbcConfig.getProcessIntervalMs(),
+ jdbcConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+
+ private void process() {
+ if (receiveQueue.size() < jdbcConfig.getBatchThreshold()
+ && (System.currentTimeMillis() - lastCheckTime <
jdbcConfig.getBatchIntervalMs())) {
+ return;
+ }
+ lastCheckTime = System.currentTimeMillis();
+
+ if (writeDataList.size() > 0) {
+ if (executeBatch(writeDataList)) {
+ acknowledge(writeDataList);
+ writeDataList.clear();
+ } else {
+ return;
+ }
+ }
+
+ JdbcDataPo data = receiveQueue.poll();
+ while (data != null) {
+ writeDataList.add(data);
+ if (writeDataList.size() > jdbcConfig.getBatchThreshold()) {
+ if (executeBatch(writeDataList)) {
+ acknowledge(writeDataList);
+ writeDataList.clear();
+ } else {
+ break;
+ }
+ }
+ data = receiveQueue.poll();
+ }
+ }
+
+ private boolean executeBatch(List<JdbcDataPo> dataList) {
+ boolean result = false;
+ try (PreparedStatement statement =
connection.prepareStatement(INSERT_SQL)) {
+ for (JdbcDataPo data : dataList) {
+ statement.setString(1, data.getIp());
+ statement.setString(2, data.getDockerId());
+ statement.setString(3, data.getThreadId());
+ statement.setTimestamp(4, data.getSdkTs());
+ statement.setLong(5, data.getPacketId());
+ statement.setTimestamp(6, data.getLogTs());
+ statement.setString(7, data.getInLongGroupId());
+ statement.setString(8, data.getInLongStreamId());
+ statement.setString(9, data.getAuditId());
+ statement.setString(10, data.getAuditTag());
+ statement.setLong(11, data.getAuditVersion());
+ statement.setLong(12, data.getCount());
+ statement.setLong(13, data.getSize());
+ statement.setLong(14, data.getDelay());
+ statement.setTimestamp(15, data.getUpdateTime());
+ statement.addBatch();
+ }
+ statement.executeBatch();
+ connection.commit();
+ result = true;
+ } catch (Exception exception) {
+ LOG.error("Execute batch has failure!", exception);
+ try {
+ reconnect();
+ } catch (SQLException sqlException) {
+ LOG.error("Re-connect has failure!", sqlException);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * reconnect
+ *
+ * @throws SQLException Exception when creating connection.
+ */
+ private void reconnect() throws SQLException {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (Exception e) {
+ LOG.error("Reconnect has exception!", e);
+ }
+ connection = null;
+ }
+ connection = DriverManager.getConnection(jdbcConfig.getUrl(),
jdbcConfig.getUserName(),
+ jdbcConfig.getPassword());
+ connection.setAutoCommit(false);
+ }
+
+ /**
+ * insert
+ *
+ * @param msgBody audit data reading from Pulsar or other MessageQueue.
+ */
+ @Override
+ public void insert(AuditData msgBody) {
+ }
+
+ @Override
+ public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId
messageId) {
+ JdbcDataPo data = new JdbcDataPo();
+ data.setConsumer(consumer);
+ data.setMessageId(messageId);
+ data.setIp(msgBody.getIp());
+ data.setThreadId(msgBody.getThreadId());
+ data.setDockerId(msgBody.getDockerId());
+ data.setPacketId(msgBody.getPacketId());
+ data.setSdkTs(new Timestamp(msgBody.getSdkTs()));
+ data.setLogTs(new Timestamp(msgBody.getLogTs()));
+ data.setAuditId(msgBody.getAuditId());
+ data.setAuditTag(msgBody.getAuditTag());
+ data.setAuditVersion(msgBody.getAuditVersion());
+ data.setCount(msgBody.getCount());
+ data.setDelay(msgBody.getDelay());
+ data.setInLongGroupId(msgBody.getInlongGroupId());
+ data.setInLongStreamId(msgBody.getInlongStreamId());
+ data.setSize(msgBody.getSize());
+ data.setUpdateTime(new Timestamp(System.currentTimeMillis()));
+ try {
+ receiveQueue.offer(data, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException exception) {
+ LOG.error("Insert data has InterruptedException ", exception);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.connection.close();
+ this.timerService.shutdown();
+ }
+
+ private void acknowledge(List<JdbcDataPo> dataList) {
+ Iterator<JdbcDataPo> iterator = dataList.iterator();
+ while (iterator.hasNext()) {
+ JdbcDataPo jdbcData = iterator.next();
+ try {
+ if (jdbcData.getConsumer() != null && jdbcData.getMessageId()
!= null) {
+
jdbcData.getConsumer().acknowledge(jdbcData.getMessageId());
+ }
+ iterator.remove();
+ } catch (Exception exception) {
+ LOG.error("Acknowledge has exception!", exception);
+ }
+ }
+ }
+}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
index bedc40cef9..21f471d003 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
@@ -21,6 +21,9 @@ import org.apache.inlong.audit.db.dao.AuditDataDao;
import org.apache.inlong.audit.db.entities.AuditDataPo;
import org.apache.inlong.audit.protocol.AuditData;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+
import java.util.Date;
/**
@@ -54,4 +57,9 @@ public class MySqlService implements InsertData {
dao.insert(po);
}
+ @Override
+ public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId
messageId) {
+
+ }
+
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
index 0f8a5b104a..e0fbb7180a 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
@@ -23,6 +23,8 @@ import org.apache.inlong.audit.protocol.AuditData;
import org.apache.inlong.audit.service.InsertData;
import com.google.gson.Gson;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,5 +64,14 @@ public abstract class BaseConsume {
}
});
}
-
+ protected void handleMessage(String body, Consumer<byte[]> consumer,
MessageId messageId) {
+ AuditData msgBody = gson.fromJson(body, AuditData.class);
+ this.insertServiceList.forEach((service) -> {
+ try {
+ service.insert(msgBody, consumer, messageId);
+ } catch (Exception e) {
+ LOG.error("Handle message has exception!", e);
+ }
+ });
+ }
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
index c73fc7319b..7d8efc79ce 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
@@ -129,8 +129,7 @@ public class PulsarConsume extends BaseConsume {
public void received(Consumer<byte[]> consumer,
Message<byte[]> msg) {
try {
String body = new String(msg.getData(),
StandardCharsets.UTF_8);
- handleMessage(body);
- consumer.acknowledge(msg);
+ handleMessage(body, consumer,
msg.getMessageId());
} catch (Exception e) {
LOG.error("Consumer has exception topic
{}, subName {}, ex {}",
topic,
diff --git a/inlong-audit/conf/application.properties
b/inlong-audit/conf/application.properties
index c76942da90..d721e375cb 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -96,3 +96,9 @@ clickhouse.batchIntervalMs=1000
clickhouse.batchThreshold=500
clickhouse.processIntervalMs=100
+# Generic jdbc storage
+jdbc.driver=com.mysql.cj.jdbc.Driver
+jdbc.url=jdbc:mysql://127.0.0.1:9020/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL
+jdbc.username=*******
+jdbc.password=********
+
diff --git a/inlong-audit/conf/audit-service.properties
b/inlong-audit/conf/audit-service.properties
index 66fa9e0fd5..4d7c144ff1 100644
--- a/inlong-audit/conf/audit-service.properties
+++ b/inlong-audit/conf/audit-service.properties
@@ -16,25 +16,9 @@
# specific language governing permissions and limitations
# under the License.
-# clickhouse config
-clickhouse.jdbc.url=jdbc:clickhouse://*****:***/db_inlong_audit?socket_timeout=600000
-clickhouse.username=*****
-clickhouse.password=*****
-
# mysql config
mysql.jdbc.url=jdbc:mysql://*****:***/apache_inlong_audit?characterEncoding=utf8&useUnicode=true&rewriteBatchedStatements=true
mysql.username=*****
mysql.password=*****
-# summary config
-summary.realtime.stat.back.times=6
-summary.daily.stat.back.times=2
-audit.ids=3;4;5;6
-
-# api config
-api.cache.max.size=50000000
-api.cache.expired.hours=12
-api.real.limiter.qps=1000.0
-api.pool.size=10
-api.backlog.size=100