This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new aa674dc79d [INLONG-10132][Audit] Clean up ElasticSearch and ClickHouse
related code of audit-store (#10134)
aa674dc79d is described below
commit aa674dc79de76aa49412743b0318bc21c54e3fef
Author: doleyzi <[email protected]>
AuthorDate: Wed May 8 10:29:49 2024 +0800
[INLONG-10132][Audit] Clean up ElasticSearch and ClickHouse related code of
audit-store (#10134)
---
inlong-audit/audit-docker/Dockerfile | 27 +-
inlong-audit/audit-docker/audit-docker.sh | 61 ++---
inlong-audit/audit-service/pom.xml | 6 -
.../apache/inlong/audit/cache/RealTimeQuery.java | 23 +-
.../inlong/audit/config/OpenApiConstants.java | 2 +-
.../inlong/audit/config/ClickHouseConfig.java | 50 ----
.../inlong/audit/config/DataServerConfig.java | 47 ----
.../inlong/audit/config/ElasticsearchConfig.java | 125 ---------
.../apache/inlong/audit/config/StoreConfig.java | 15 +-
.../org/apache/inlong/audit/db/DruidConfig.java | 80 ------
.../inlong/audit/db/DruidDataSourceProperties.java | 47 ----
.../apache/inlong/audit/db/dao/AuditDataDao.java | 28 --
.../inlong/audit/db/entities/AuditDataPo.java | 46 ----
.../inlong/audit/db/entities/ClickHouseDataPo.java | 45 ----
.../apache/inlong/audit/db/entities/ESDataPo.java | 49 ----
.../audit/service/AuditMsgConsumerServer.java | 27 --
.../inlong/audit/service/ClickHouseService.java | 217 ----------------
.../inlong/audit/service/ElasticsearchService.java | 288 ---------------------
.../apache/inlong/audit/service/MySqlService.java | 65 -----
.../src/main/resources/mapper/AuditDataDao.xml | 51 ----
.../audit/service/ElasticsearchServiceTest.java | 96 -------
.../audit/service/consume/KafkaConsumeTest.java | 15 +-
.../audit/service/consume/TubeConsumeTest.java | 17 +-
.../src/test/resources/mapper/AuditDataDao.xml | 47 ----
inlong-audit/conf/application.properties | 62 +----
inlong-audit/conf/mapper/AuditDataDao.xml | 47 ----
.../sql/apache_inlong_audit_clickhouse.sql | 47 ----
inlong-audit/sql/apache_inlong_audit_mysql.sql | 6 +-
28 files changed, 64 insertions(+), 1572 deletions(-)
diff --git a/inlong-audit/audit-docker/Dockerfile
b/inlong-audit/audit-docker/Dockerfile
index 6717eca684..e578b98c4b 100644
--- a/inlong-audit/audit-docker/Dockerfile
+++ b/inlong-audit/audit-docker/Dockerfile
@@ -35,28 +35,11 @@ ENV TUBE_AUDIT_TOPIC="inlong-audit"
ENV AUDIT_DBNAME="apache_inlong_audit"
# proxy/store/all, start audit module individually, or all
ENV START_MODE="all"
-# mysql / clickhouse / elasticsearch / starrocks
-ENV STORE_MODE=mysql
-# mysql
-ENV JDBC_URL=127.0.0.1:3306
-ENV USERNAME=root
-ENV PASSWORD=inlong
-# clickhouse
-ENV STORE_CK_URL=127.0.0.1:8123
-ENV STORE_CK_USERNAME=default
-ENV STORE_CK_PASSWD=default
-ENV STORE_CK_DBNAME="apache_inlong_audit"
-# elasticsearch
-ENV STORE_ES_HOST=127.0.0.1
-ENV STORE_ES_PORT=9200
-ENV STORE_ES_AUTHENABLE=false
-ENV STORE_ES_USERNAME=elastic
-ENV STORE_ES_PASSWD=inlong
-# starrocks
-ENV STORE_SR_URL=127.0.0.1:9030
-ENV STORE_SR_USERNAME=default
-ENV STORE_SR_PASSWD=default
-ENV STORE_SR_DBNAME="apache_inlong_audit"
+# MySQL / StarRocks
+ENV AUDIT_JDBC_URL=127.0.0.1:3306
+ENV AUDIT_JDBC_USERNAME=root
+ENV AUDIT_JDBC_PASSWORD=inlong
+
# jvm
ENV AUDIT_JVM_HEAP_OPTS="-XX:+UseContainerSupport
-XX:InitialRAMPercentage=40.0 -XX:MaxRAMPercentage=80.0
-XX:-UseAdaptiveSizePolicy"
WORKDIR /opt/inlong-audit
diff --git a/inlong-audit/audit-docker/audit-docker.sh
b/inlong-audit/audit-docker/audit-docker.sh
index 71c71f5545..cdc6c103cf 100755
--- a/inlong-audit/audit-docker/audit-docker.sh
+++ b/inlong-audit/audit-docker/audit-docker.sh
@@ -17,18 +17,20 @@
#
file_path=$(cd "$(dirname "$0")"/../;pwd)
-# store config
-store_conf_file=${file_path}/conf/application.properties
+
+#SQL file
+sql_mysql_file="${file_path}"/sql/apache_inlong_audit_mysql.sql
+
# proxy config
proxy_conf_file=${file_path}/conf/audit-proxy-${MQ_TYPE}.conf
-sql_mysql_file="${file_path}"/sql/apache_inlong_audit_mysql.sql
-sql_ck_file="${file_path}"/sql/apache_inlong_audit_clickhouse.sql
-sql_sr_file="${file_path}"/sql/apache_inlong_audit_starrocks.sql
+
+# store config
+store_conf_file=${file_path}/conf/application.properties
# audit-service config
service_conf_file=${file_path}/conf/audit-service.properties
-# replace the configuration for audit proxy
+# replace the configuration for audit-proxy
sed -i
"s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g"
"${store_conf_file}"
sed -i "s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g"
"${store_conf_file}"
if [ "${MQ_TYPE}" = "pulsar" ]; then
@@ -47,51 +49,29 @@ if [ "${MQ_TYPE}" = "tubemq" ]; then
sed -i "s/agent1.sinks.tube-sink-msg2.topic =
.*$/agent1.sinks.tube-sink-msg2.topic = ${TUBE_AUDIT_TOPIC}/g"
"${proxy_conf_file}"
fi
-# replace the configuration for audit store
-if [ -n "${STORE_MODE}" ]; then
- sed -i
"s/audit.config.store.mode=.*$/audit.config.store.mode=${STORE_MODE}/g"
"${store_conf_file}"
-fi
-# DB
-sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${JDBC_URL}\/${AUDIT_DBNAME}/g"
"${store_conf_file}"
-sed -i
"s/spring.datasource.druid.username=.*$/spring.datasource.druid.username=${USERNAME}/g"
"${store_conf_file}"
-sed -i
"s/spring.datasource.druid.password=.*$/spring.datasource.druid.password=${PASSWORD}/g"
"${store_conf_file}"
-# mysql file for audit
+# replace the audit db name for audit sql file
sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_mysql_file}"
-# clickhouse
-sed -i
"s/clickhouse.url=.*$/clickhouse.url=jdbc:clickhouse:\/\/${STORE_CK_URL}\/${STORE_CK_DBNAME}/g"
"${store_conf_file}"
-sed -i "s/clickhouse.username=.*$/clickhouse.username=${STORE_CK_USERNAME}/g"
"${store_conf_file}"
-sed -i "s/clickhouse.password=.*$/clickhouse.password=${STORE_CK_PASSWD}/g"
"${store_conf_file}"
-# mysql file for clickhouse
-sed -i "s/apache_inlong_audit/${STORE_CK_DBNAME}/g" "${sql_ck_file}"
-# elasticsearch
-sed -i "s/elasticsearch.host=.*$/elasticsearch.host=${STORE_ES_HOST}/g"
"${store_conf_file}"
-sed -i "s/elasticsearch.port=.*$/elasticsearch.port=${STORE_ES_PORT}/g"
"${store_conf_file}"
-sed -i
"s/elasticsearch.authEnable=.*$/elasticsearch.authEnable=${STORE_ES_AUTHENABLE}/g"
"${store_conf_file}"
-sed -i
"s/elasticsearch.username=.*$/elasticsearch.username=${STORE_ES_USERNAME}/g"
"${store_conf_file}"
-sed -i
"s/elasticsearch.password=.*$/elasticsearch.password=${STORE_ES_PASSWD}/g"
"${store_conf_file}"
-# StarRocks SQL file for audit
-sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_sr_file}"
-# StarRocks
-sed -i
"s/jdbc.url=.*$/jdbc.url=jdbc:mysql:\/\/${STORE_SR_URL}\/${STORE_SR_DBNAME}/g"
"${store_conf_file}"
-sed -i "s/jdbc.username=.*$/jdbc.username=${STORE_SR_USERNAME}/g"
"${store_conf_file}"
-sed -i "s/jdbc.password=.*$/jdbc.password=${STORE_SR_PASSWD}/g"
"${store_conf_file}"
+# replace the configuration for audit-store
+sed -i
"s/127.0.0.1:3306\/apache_inlong_audit/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g"
"${store_conf_file}"
+sed -i "s/jdbc.username=.*$/jdbc.username=${AUDIT_JDBC_USERNAME}/g"
"${store_conf_file}"
+sed -i "s/jdbc.password=.*$/jdbc.password=${AUDIT_JDBC_PASSWORD}/g"
"${store_conf_file}"
-# audit-service config
-sed -i
"s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${JDBC_URL}\/${AUDIT_DBNAME}/g"
"${service_conf_file}"
-sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${USERNAME}/g"
"${service_conf_file}"
-sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${PASSWORD}/g"
"${service_conf_file}"
+# replace the configuration for audit-service
+sed -i
"s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g"
"${service_conf_file}"
+sed -i
"s/mysql.jdbc.username=.*$/mysql.jdbc.username=${AUDIT_JDBC_USERNAME}/g"
"${service_conf_file}"
+sed -i
"s/mysql.jdbc.password=.*$/mysql.jdbc.password=${AUDIT_JDBC_PASSWORD}/g"
"${service_conf_file}"
# Whether the database table exists. If it does not exist, initialize the
database and skip if it exists.
-if [[ "${JDBC_URL}" =~ (.+):([0-9]+) ]]; then
+if [[ "${AUDIT_JDBC_URL}" =~ (.+):([0-9]+) ]]; then
datasource_hostname=${BASH_REMATCH[1]}
datasource_port=${BASH_REMATCH[2]}
select_db_sql="SELECT COUNT(*) FROM information_schema.TABLES WHERE
table_schema = 'apache_inlong_audit'"
- inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port}
-u${USERNAME} -p${PASSWORD} -e "${select_db_sql}")
+ inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port}
-u${AUDIT_JDBC_USERNAME} -p${AUDIT_JDBC_PASSWORD} -e "${select_db_sql}")
inlong_num=$(echo "$inlong_audit_count" | tr -cd "[0-9]")
if [ "${inlong_num}" = 0 ]; then
- mysql -h${datasource_hostname} -P${datasource_port} -u${USERNAME}
-p${PASSWORD} < sql/apache_inlong_audit_mysql.sql
+ mysql -h${datasource_hostname} -P${datasource_port}
-u${AUDIT_JDBC_USERNAME} -p${AUDIT_JDBC_PASSWORD} <
sql/apache_inlong_audit_mysql.sql
fi
fi
@@ -108,6 +88,7 @@ if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" =
"proxy" ]; then
bash +x ./bin/proxy-start.sh tubemq
fi
fi
+
# start store
if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "store" ]; then
bash +x ./bin/store-start.sh
diff --git a/inlong-audit/audit-service/pom.xml
b/inlong-audit/audit-service/pom.xml
index e71c33586b..2a67f41afc 100644
--- a/inlong-audit/audit-service/pom.xml
+++ b/inlong-audit/audit-service/pom.xml
@@ -86,12 +86,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>audit-common</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
</dependencies>
<build>
<resources>
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
index ff9de5c6d5..6eea81ec9d 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
@@ -39,6 +39,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
@@ -48,6 +51,8 @@ import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETE
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IPS_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_MINUTE_SQL;
@@ -68,8 +73,12 @@ public class RealTimeQuery {
private final String queryLogTsSql;
private final String queryIdsByIpSql;
private final String queryReportIpsSql;
+ private final ExecutorService executor =
+ Executors.newFixedThreadPool(
+ Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE,
DEFAULT_API_THREAD_POOL_SIZE));
private RealTimeQuery() {
+
List<JdbcConfig> jdbcConfigList =
ConfigService.getInstance().getAllAuditSource();
for (JdbcConfig jdbcConfig : jdbcConfigList) {
BasicDataSource dataSource = new BasicDataSource();
@@ -126,7 +135,7 @@ public class RealTimeQuery {
public List<StatData> queryLogTs(String startTime, String endTime, String
inlongGroupId,
String inlongStreamId, String auditId) {
long currentTime = System.currentTimeMillis();
- List<StatData> statDataList = new LinkedList<>();
+ List<StatData> statDataList = new CopyOnWriteArrayList<>();
if (dataSourceList.isEmpty()) {
return statDataList;
}
@@ -136,11 +145,11 @@ public class RealTimeQuery {
List<StatData> statDataListTemp =
doQueryLogTs(dataSource, startTime, endTime,
inlongGroupId, inlongStreamId, auditId);
statDataList.addAll(statDataListTemp);
- });
+ }, executor);
futures.add(future);
}
- CompletableFuture.allOf(futures.toArray(new
CompletableFuture[futures.size()])).join();
- LOGGER.info("Query log ts by params: {} {} {} {} {}, cost {} ms",
startTime, endTime, inlongGroupId,
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).join();
+ LOGGER.info("Query log ts by params: {} {} {} {} {}, total cost {}
ms", startTime, endTime, inlongGroupId,
inlongStreamId, auditId, System.currentTimeMillis() -
currentTime);
return filterMaxAuditVersion(statDataList);
}
@@ -165,8 +174,7 @@ public class RealTimeQuery {
for (Map.Entry<String, List<StatData>> entry : allData.entrySet()) {
long maxAuditVersion = Long.MIN_VALUE;
for (StatData maxData : entry.getValue()) {
- maxAuditVersion =
- maxData.getAuditVersion() > maxAuditVersion ?
maxData.getAuditVersion() : maxAuditVersion;
+ maxAuditVersion = Math.max(maxData.getAuditVersion(),
maxAuditVersion);
}
for (StatData statData : entry.getValue()) {
if (statData.getAuditVersion() == maxAuditVersion) {
@@ -191,6 +199,7 @@ public class RealTimeQuery {
*/
private List<StatData> doQueryLogTs(DataSource dataSource, String
startTime, String endTime, String inlongGroupId,
String inlongStreamId, String auditId) {
+ long currentTime = System.currentTimeMillis();
List<StatData> result = new LinkedList<>();
try (Connection connection = dataSource.getConnection();
PreparedStatement pstat =
connection.prepareStatement(queryLogTsSql)) {
@@ -219,6 +228,8 @@ public class RealTimeQuery {
} catch (Exception exception) {
LOGGER.error("Query log time has exception!, datasource={} ",
dataSource, exception);
}
+ LOGGER.info("Query log ts by params: {} {} {} {} {}, cost {} ms",
startTime, endTime, inlongGroupId,
+ inlongStreamId, auditId, System.currentTimeMillis() -
currentTime);
return result;
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
index 05643c43bf..5e186716e5 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
@@ -45,7 +45,7 @@ public class OpenApiConstants {
public static final int DEFAULT_API_CACHE_MAX_SIZE = 50000000;
public static final String KEY_API_CACHE_EXPIRED_HOURS =
"api.cache.expired.hours";
- public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
+ public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 18;
// Http config
public static final String PARAMS_START_TIME = "startTime";
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java
deleted file mode 100644
index 3ed02e69a1..0000000000
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.config;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-@Getter
-@Setter
-public class ClickHouseConfig {
-
- @Value("${clickhouse.driver}")
- private String driver;
-
- @Value("${clickhouse.url}")
- private String url;
-
- @Value("${clickhouse.username}")
- private String username;
-
- @Value("${clickhouse.password}")
- private String password;
-
- @Value("${clickhouse.batchIntervalMs:1000}")
- private int batchIntervalMs;
-
- @Value("${clickhouse.batchThreshold:500}")
- private int batchThreshold;
-
- @Value("${clickhouse.processIntervalMs:100}")
- private int processIntervalMs;
-}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/DataServerConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/DataServerConfig.java
deleted file mode 100644
index c366e6cd45..0000000000
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/DataServerConfig.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.config;
-
-import org.mybatis.spring.annotation.MapperScan;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.ComponentScan.Filter;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.FilterType;
-import org.springframework.context.annotation.PropertySource;
-import org.springframework.context.annotation.PropertySources;
-import
org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
-import org.springframework.stereotype.Component;
-import org.springframework.stereotype.Service;
-
-@Configuration
-@ComponentScan(basePackages = "org.apache.inlong.audit", useDefaultFilters =
false, includeFilters = {
- @Filter(type = FilterType.ANNOTATION, value = Component.class),
- @Filter(type = FilterType.ANNOTATION, value = Service.class)})
-@MapperScan(basePackages = "org.apache.inlong.audit.db.dao")
-@PropertySources({
- @PropertySource("classpath:application.properties"),
-})
-
-public class DataServerConfig {
-
- @Bean
- public static PropertySourcesPlaceholderConfigurer
propertyPlaceholderConfigurer() {
- return new PropertySourcesPlaceholderConfigurer();
- }
-}
\ No newline at end of file
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ElasticsearchConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ElasticsearchConfig.java
deleted file mode 100644
index 9b8e197f35..0000000000
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ElasticsearchConfig.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.config;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-@Configuration
-@Getter
-@Setter
-public class ElasticsearchConfig {
-
- @Value("${elasticsearch.host}")
- private String host;
-
- @Value("${elasticsearch.port}")
- private int port;
-
- @Value("${elasticsearch.connTimeout:3000}")
- private int connTimeout;
-
- @Value("${elasticsearch.socketTimeout:5000}")
- private int socketTimeout;
-
- @Value("${elasticsearch.connectionRequestTimeout:500}")
- private int connectionRequestTimeout;
-
- @Value("${elasticsearch.authEnable:false}")
- private boolean authEnable;
-
- @Value("${elasticsearch.username}")
- private String username;
-
- @Value("${elasticsearch.password}")
- private String password;
-
- @Value("${elasticsearch.shardsNum:5}")
- private int shardsNum;
-
- @Value("${elasticsearch.replicaNum:1}")
- private int replicaNum;
-
- @Value("${elasticsearch.indexDeleteDay:5}")
- private int indexDeleteDay;
-
- @Value("${elasticsearch.enableCustomDocId:true}")
- private boolean enableCustomDocId;
-
- @Value("${elasticsearch.bulkInterval:10}")
- private int bulkInterval;
-
- @Value("${elasticsearch.bulkThreshold:5000}")
- private int bulkThreshold;
-
- @Value("${elasticsearch.auditIdSet}")
- private String auditIdSet;
-
- @Bean(destroyMethod = "close", name = "restClient")
- public RestHighLevelClient initRestClient() {
-
- // support es cluster with multi hosts
- List<HttpHost> hosts = new ArrayList<>();
- String[] hostArrays = host.split(",");
- for (String host : hostArrays) {
- if (StringUtils.isNotEmpty(host)) {
- hosts.add(new HttpHost(host.trim(), port, "http"));
- }
- }
-
- RestClientBuilder restClientBuilder =
RestClient.builder(hosts.toArray(new HttpHost[0]));
-
- // configurable auth
- if (authEnable) {
- final CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
- credentialsProvider.setCredentials(AuthScope.ANY, new
UsernamePasswordCredentials(username, password));
-
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder
- .setDefaultCredentialsProvider(credentialsProvider));
- }
-
- restClientBuilder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
- .setConnectTimeout(connTimeout).setSocketTimeout(socketTimeout)
- .setConnectionRequestTimeout(connectionRequestTimeout));
-
- return new RestHighLevelClient(restClientBuilder);
- }
-
- public List<String> getAuditIdList() {
- List<String> auditIdList = new ArrayList<>();
- if (!StringUtils.isEmpty(auditIdSet)) {
- auditIdList = Arrays.asList(auditIdSet.split(","));
- }
- return auditIdList;
- }
-}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
index c4a2db3328..ca3358701e 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
@@ -27,21 +27,10 @@ import org.springframework.stereotype.Component;
@Setter
public class StoreConfig {
- @Value("${audit.config.store.mode:mysql}")
+ // Supports common JDBC protocol. Such as mysql / StarRocks
+ @Value("${audit.config.store.mode:jdbc}")
private String store;
- public boolean isMysqlStore() {
- return store.contains("mysql");
- }
-
- public boolean isElasticsearchStore() {
- return store.contains("elasticsearch");
- }
-
- public boolean isClickHouseStore() {
- return store.contains("clickhouse");
- }
-
public boolean isJdbc() {
return store.contains("jdbc");
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidConfig.java
deleted file mode 100644
index 1e8fc91619..0000000000
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidConfig.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.db;
-
-import org.apache.inlong.audit.config.StoreConfig;
-
-import com.alibaba.druid.pool.DruidDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import
org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
-import
org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import javax.sql.DataSource;
-
-import java.sql.SQLException;
-
-@Configuration
-@EnableConfigurationProperties({DruidDataSourceProperties.class})
-public class DruidConfig {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(DruidConfig.class);
-
- @Autowired
- protected StoreConfig storeConfig;
- @Autowired
- private DruidDataSourceProperties properties;
-
- @Bean
- @ConditionalOnMissingBean
- public DataSource druidDataSource() {
- LOGGER.info("druidDataSource url = {} ", properties.getUrl());
- DruidDataSource druidDataSource = new DruidDataSource();
- druidDataSource.setDriverClassName(properties.getDriverClassName());
- druidDataSource.setUrl(properties.getUrl());
- druidDataSource.setUsername(properties.getUsername());
- druidDataSource.setPassword(properties.getPassword());
- druidDataSource.setInitialSize(properties.getInitialSize());
- druidDataSource.setMinIdle(properties.getMinIdle());
- druidDataSource.setMaxActive(properties.getMaxActive());
- druidDataSource.setMaxWait(properties.getMaxWait());
- druidDataSource.setTimeBetweenEvictionRunsMillis(properties
- .getTimeBetweenEvictionRunsMillis());
-
druidDataSource.setMinEvictableIdleTimeMillis(properties.getMinEvictableIdleTimeMillis());
- druidDataSource.setValidationQuery(properties.getValidationQuery());
- druidDataSource.setTestWhileIdle(properties.isTestWhileIdle());
- druidDataSource.setTestOnBorrow(properties.isTestOnBorrow());
- druidDataSource.setTestOnReturn(properties.isTestOnReturn());
-
druidDataSource.setPoolPreparedStatements(properties.isPoolPreparedStatements());
- druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(properties
- .getMaxPoolPreparedStatementPerConnectionSize());
- try {
- druidDataSource.setFilters(properties.getFilters());
- if (storeConfig.isMysqlStore()) {
- druidDataSource.init();
- }
- } catch (SQLException e) {
- LOGGER.error("init druidDataSource failed: ", e);
- }
- return druidDataSource;
- }
-
-}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidDataSourceProperties.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidDataSourceProperties.java
deleted file mode 100644
index e27399a177..0000000000
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidDataSourceProperties.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.db;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-@ConfigurationProperties(prefix = "spring.datasource.druid")
-@Getter
-@Setter
-public class DruidDataSourceProperties {
-
- private String driverClassName;
- private String url;
- private String username;
- private String password;
- private int initialSize;
- private int minIdle;
- private int maxActive = 100;
- private long maxWait;
- private long timeBetweenEvictionRunsMillis;
- private long minEvictableIdleTimeMillis;
- private String validationQuery;
- private boolean testWhileIdle;
- private boolean testOnBorrow;
- private boolean testOnReturn;
- private boolean poolPreparedStatements;
- private int maxPoolPreparedStatementPerConnectionSize;
- private String filters;
-
-}
\ No newline at end of file
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/dao/AuditDataDao.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/dao/AuditDataDao.java
deleted file mode 100644
index cb3227334b..0000000000
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/dao/AuditDataDao.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.db.dao;
-
-import org.apache.inlong.audit.db.entities.AuditDataPo;
-
-import org.springframework.stereotype.Repository;
-
-@Repository
-public interface AuditDataDao {
-
- int insert(AuditDataPo auditDataPo);
-}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java
deleted file mode 100644
index 60a9a1766e..0000000000
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.db.entities;
-
-import lombok.Getter;
-import lombok.Setter;
-
-import java.sql.Timestamp;
-import java.util.Date;
-
-@Getter
-@Setter
-public class AuditDataPo {
-
- private String ip;
- private String dockerId;
- private String threadId;
- private Date sdkTs;
- private Long packetId;
- private Date logTs;
- private String inlongGroupId;
- private String inlongStreamId;
- private String auditId;
- private String auditTag;
- private long auditVersion;
- private Long count;
- private Long size;
- private Long delay;
- private Timestamp updateTime;
-
-}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
deleted file mode 100644
index 7bebd26090..0000000000
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.db.entities;
-
-import lombok.Getter;
-import lombok.Setter;
-
-import java.sql.Timestamp;
-
-@Getter
-@Setter
-public class ClickHouseDataPo {
-
- private String ip;
- private String dockerId;
- private String threadId;
- private Timestamp sdkTs;
- private Long packetId;
- private Timestamp logTs;
- private String inlongGroupId;
- private String inlongStreamId;
- private String auditId;
- private String auditTag;
- private long auditVersion;
- private Long count;
- private Long size;
- private Long delay;
- private Timestamp updateTime;
-
-}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java
deleted file mode 100644
index c41a182f62..0000000000
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.db.entities;
-
-import lombok.Getter;
-import lombok.Setter;
-
-import java.util.Date;
-
-@Getter
-@Setter
-public class ESDataPo {
-
- private String ip;
- private String dockerId;
- private String threadId;
- private long sdkTs;
- private Date logTs;
- private String inlongGroupId;
- private String inlongStreamId;
- private String auditId;
- private String auditTag;
- private long auditVersion;
- private long count;
- private long size;
- private long delay;
- private long packetId;
-
- public String getDocId() {
- String docId = ip + dockerId + threadId + sdkTs + packetId + logTs +
inlongGroupId + inlongStreamId
- + auditId + auditTag + count;
- return docId;
- }
-}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index 2e2f4d9469..bf32271a69 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -17,12 +17,10 @@
package org.apache.inlong.audit.service;
-import org.apache.inlong.audit.config.ClickHouseConfig;
import org.apache.inlong.audit.config.JdbcConfig;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.consts.ConfigConstants;
-import org.apache.inlong.audit.db.dao.AuditDataDao;
import org.apache.inlong.audit.file.RemoteConfigJson;
import org.apache.inlong.audit.service.consume.BaseConsume;
import org.apache.inlong.audit.service.consume.KafkaConsume;
@@ -60,16 +58,8 @@ public class AuditMsgConsumerServer implements
InitializingBean {
@Autowired
private MessageQueueConfig mqConfig;
@Autowired
- private AuditDataDao auditDataDao;
- @Autowired
- private ElasticsearchService esService;
- @Autowired
private StoreConfig storeConfig;
@Autowired
- private ClickHouseConfig chConfig;
- // ClickHouseService
- private ClickHouseService ckService;
- @Autowired
private JdbcConfig jdbcConfig;
private JdbcService jdbcService;
private static final String DEFAULT_CONFIG_PROPERTIES =
"application.properties";
@@ -104,12 +94,6 @@ public class AuditMsgConsumerServer implements
InitializingBean {
if (mqConsume == null) {
LOG.error("Unknown MessageQueue {}", mqConfig.getMqType());
}
- if (storeConfig.isElasticsearchStore()) {
- esService.startTimerRoutine();
- }
- if (storeConfig.isClickHouseStore()) {
- ckService.start();
- }
if (storeConfig.isJdbc()) {
jdbcService.start();
}
@@ -123,17 +107,6 @@ public class AuditMsgConsumerServer implements
InitializingBean {
*/
private List<InsertData> getInsertServiceList() {
List<InsertData> insertServiceList = new ArrayList<>();
- if (storeConfig.isMysqlStore()) {
- insertServiceList.add(new MySqlService(auditDataDao));
- }
- if (storeConfig.isElasticsearchStore()) {
- insertServiceList.add(esService);
- }
- if (storeConfig.isClickHouseStore()) {
- // create ck object
- ckService = new ClickHouseService(chConfig);
- insertServiceList.add(ckService);
- }
if (storeConfig.isJdbc()) {
// create jdbc object
jdbcService = new JdbcService(jdbcConfig);
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
deleted file mode 100644
index 47c63aa395..0000000000
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.service;
-
-import org.apache.inlong.audit.config.ClickHouseConfig;
-import org.apache.inlong.audit.db.entities.ClickHouseDataPo;
-import org.apache.inlong.audit.protocol.AuditData;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * ClickHouseService
- */
-public class ClickHouseService implements InsertData, AutoCloseable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ClickHouseService.class);
- private static final String INSERT_SQL = "insert into audit_data (ip,
docker_id, thread_id, \r\n"
- + " sdk_ts, packet_id, log_ts, \r\n"
- + " inlong_group_id, inlong_stream_id, audit_id, audit_tag,
audit_version, \r\n"
- + " count, size, delay, \r\n"
- + " update_time)\r\n"
- + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
-
- private ClickHouseConfig chConfig;
-
- private ScheduledExecutorService timerService =
Executors.newSingleThreadScheduledExecutor();
- private LinkedBlockingQueue<ClickHouseDataPo> batchQueue;
- private AtomicBoolean needBatchOutput = new AtomicBoolean(false);
- private AtomicInteger batchCounter = new AtomicInteger(0);
- private AtomicLong lastCheckTime = new
AtomicLong(System.currentTimeMillis());
- private Connection conn;
-
- /**
- * Constructor
- *
- * @param chConfig ClickHouse service config, such as jdbc url, jdbc
username, jdbc password.
- */
- public ClickHouseService(ClickHouseConfig chConfig) {
- this.chConfig = chConfig;
- }
-
- /**
- * start
- */
- public void start() {
- // queue
- this.batchQueue = new LinkedBlockingQueue<>(
- chConfig.getBatchThreshold() * chConfig.getBatchIntervalMs() /
chConfig.getProcessIntervalMs());
- // connection
- try {
- Class.forName(chConfig.getDriver());
- this.reconnect();
- } catch (Exception e) {
- LOG.error("ClickHouseService start failure!", e);
- }
- // start timer
- timerService.scheduleWithFixedDelay(this::processOutput,
- chConfig.getProcessIntervalMs(),
- chConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
- }
-
- /**
- * processOutput
- */
- private void processOutput() {
- if (!this.needBatchOutput.get()
- && (System.currentTimeMillis() - lastCheckTime.get() <
chConfig.getBatchIntervalMs())) {
- return;
- }
- // output
- try (PreparedStatement pstat = this.conn.prepareStatement(INSERT_SQL))
{
- int counter = 0;
- // output data to clickhouse
- ClickHouseDataPo data = this.batchQueue.poll();
- while (data != null) {
- pstat.setString(1, data.getIp());
- pstat.setString(2, data.getDockerId());
- pstat.setString(3, data.getThreadId());
- pstat.setTimestamp(4, data.getSdkTs());
- pstat.setLong(5, data.getPacketId());
- pstat.setTimestamp(6, data.getLogTs());
- pstat.setString(7, data.getInlongGroupId());
- pstat.setString(8, data.getInlongStreamId());
- pstat.setString(9, data.getAuditId());
- pstat.setString(10, data.getAuditTag());
- pstat.setLong(11, data.getAuditVersion());
- pstat.setLong(12, data.getCount());
- pstat.setLong(13, data.getSize());
- pstat.setLong(14, data.getDelay());
- pstat.setTimestamp(15, data.getUpdateTime());
- pstat.addBatch();
- this.batchCounter.decrementAndGet();
- if (++counter >= chConfig.getBatchThreshold()) {
- pstat.executeBatch();
- this.conn.commit();
- counter = 0;
- }
- data = this.batchQueue.poll();
- }
- if (counter > 0) {
- pstat.executeBatch();
- this.conn.commit();
- }
- } catch (Exception e1) {
- LOG.error("Execute output to clickhouse failure!", e1);
- // re-connect clickhouse
- try {
- this.reconnect();
- } catch (SQLException e2) {
- LOG.error("Re-connect clickhouse failure!", e2);
- }
- }
- // recover flag
- lastCheckTime.set(System.currentTimeMillis());
- this.needBatchOutput.compareAndSet(true, false);
- }
-
- /**
- * reconnect
- *
- * @throws SQLException Exception when creating connection.
- */
- private void reconnect() throws SQLException {
- if (this.conn != null) {
- try {
- this.conn.close();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- this.conn = null;
- }
- this.conn = DriverManager.getConnection(chConfig.getUrl(),
chConfig.getUsername(),
- chConfig.getPassword());
- this.conn.setAutoCommit(false);
- }
-
- /**
- * insert
- *
- * @param msgBody audit data reading from Pulsar or other MessageQueue.
- */
- @Override
- public void insert(AuditData msgBody) {
- ClickHouseDataPo data = new ClickHouseDataPo();
- data.setIp(msgBody.getIp());
- data.setThreadId(msgBody.getThreadId());
- data.setDockerId(msgBody.getDockerId());
- data.setPacketId(msgBody.getPacketId());
- data.setSdkTs(new Timestamp(msgBody.getSdkTs()));
- data.setLogTs(new Timestamp(msgBody.getLogTs()));
- data.setAuditId(msgBody.getAuditId());
- data.setAuditTag(msgBody.getAuditTag());
- data.setAuditVersion(msgBody.getAuditVersion());
- data.setCount(msgBody.getCount());
- data.setDelay(msgBody.getDelay());
- data.setInlongGroupId(msgBody.getInlongGroupId());
- data.setInlongStreamId(msgBody.getInlongStreamId());
- data.setSize(msgBody.getSize());
- data.setUpdateTime(new Timestamp(System.currentTimeMillis()));
- try {
- this.batchQueue.offer(data, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- if (this.batchCounter.incrementAndGet() >=
chConfig.getBatchThreshold()) {
- this.needBatchOutput.compareAndSet(false, true);
- }
- } catch (InterruptedException e) {
- LOG.error(e.getMessage(), e);
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- @Override
- public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId
messageId) {
-
- }
-
- /**
- * close
- *
- * @throws Exception Exception when closing ClickHouse connection.
- */
- @Override
- public void close() throws Exception {
- this.conn.close();
- this.timerService.shutdown();
- }
-}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
deleted file mode 100644
index 3de234146f..0000000000
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.service;
-
-import org.apache.inlong.audit.config.ElasticsearchConfig;
-import org.apache.inlong.audit.db.entities.ESDataPo;
-import org.apache.inlong.audit.protocol.AuditData;
-
-import com.google.gson.FieldNamingPolicy;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.rest.RestStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Service;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-@Service
-public class ElasticsearchService implements InsertData, AutoCloseable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchService.class);
-
- private static ScheduledExecutorService timerService =
Executors.newScheduledThreadPool(1);
- private final Semaphore semaphore = new Semaphore(1);
- private List<ESDataPo> datalist = new ArrayList<>();
- @Autowired
- @Qualifier("restClient")
- private RestHighLevelClient client;
- @Autowired
- private ElasticsearchConfig esConfig;
-
- private final static String X_CONTENT_BUILDER_TYPE = "type";
- private final static String X_CONTENT_BUILDER_LONG_VALUE = "long";
- private final static String X_CONTENT_BUILDER_KEYWORD_VALUE = "keyword";
-
- public void startTimerRoutine() {
- timerService.scheduleAtFixedRate((new Runnable() {
-
- @Override
- public void run() {
- try {
- deleteTimeoutIndices();
- } catch (IOException e) {
- LOG.error("deleteTimeoutIndices has err: ", e);
- }
- }
- }), 1, 1, TimeUnit.DAYS);
-
- timerService.scheduleWithFixedDelay((new Runnable() {
-
- @Override
- public void run() {
- try {
- bulkInsert();
- } catch (IOException e) {
- LOG.error("bulkInsert has err: ", e);
- }
- }
- }), esConfig.getBulkInterval(), esConfig.getBulkInterval(),
TimeUnit.SECONDS);
- }
-
- public void insertData(ESDataPo data) {
- if (datalist.size() >= esConfig.getBulkThreshold()) {
- try {
- if (bulkInsert()) {
- LOG.info("success bulk insert {} docs",
esConfig.getBulkThreshold());
- } else {
- LOG.error("failed to bulk insert");
- }
- } catch (IOException e) {
- LOG.error("bulkInsert has err: ", e);
- }
- }
- try {
- semaphore.acquire();
- datalist.add(data);
- semaphore.release();
- } catch (InterruptedException e) {
- LOG.error("datalist semaphore has err: ", e);
- }
- }
-
- protected boolean createIndex(String index) throws IOException {
- if (existsIndex(index)) {
- return true;
- }
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
-
createIndexRequest.settings(Settings.builder().put("index.number_of_shards",
esConfig.getShardsNum())
- .put("index.number_of_replicas", esConfig.getReplicaNum()));
- createIndexRequest.mapping("_doc", generateBuilder());
- CreateIndexResponse response =
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
- boolean res = response.isAcknowledged();
- if (res) {
- LOG.info("success creating index {}", index);
- } else {
- LOG.info("fail to create index {}", index);
- }
- return res;
- }
-
- protected boolean existsIndex(String index) throws IOException {
- GetIndexRequest getIndexRequest = new GetIndexRequest();
- getIndexRequest.indices(index);
- return client.indices().exists(getIndexRequest,
RequestOptions.DEFAULT);
- }
-
- protected boolean bulkInsert() throws IOException {
- if (datalist.isEmpty()) {
- return true;
- }
- BulkRequest bulkRequest = new BulkRequest();
- try {
- semaphore.acquire();
- for (ESDataPo esDataPo : datalist) {
- SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
- String index = formatter.format(esDataPo.getLogTs()) + "_" +
esDataPo.getAuditId();
- GsonBuilder gsonBuilder = new GsonBuilder();
-
gsonBuilder.setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES)
- .setDateFormat("yyyy-MM-dd HH:mm:ss");
- Gson gson = gsonBuilder.create();
- String esJson = gson.toJson(esDataPo);
- if (!createIndex(index)) {
- LOG.error("fail to create index {}", index);
- continue;
- }
- IndexRequest indexRequest;
- if (esConfig.isEnableCustomDocId()) {
- indexRequest = new
IndexRequest(index).type("_doc").id(esDataPo.getDocId())
- .source(esJson, XContentType.JSON);
- } else {
- indexRequest = new
IndexRequest(index).type("_doc").source(esJson, XContentType.JSON);
- }
- bulkRequest.add(indexRequest);
- }
- BulkResponse bulkResponse = client.bulk(bulkRequest,
RequestOptions.DEFAULT);
- datalist.clear();
- semaphore.release();
- return bulkResponse.status().equals(RestStatus.OK);
- } catch (InterruptedException e) {
- LOG.error("datalist semaphore has err: ", e);
- }
- return false;
- }
-
- protected void deleteTimeoutIndices() throws IOException {
- List<String> auditIdList = esConfig.getAuditIdList();
- if (auditIdList.isEmpty()) {
- return;
- }
- SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
- Calendar calendar = Calendar.getInstance();
- calendar.add(Calendar.DATE, -esConfig.getIndexDeleteDay());
- Date deleteDay = calendar.getTime();
- String preIndex = formatter.format(deleteDay);
- for (String auditId : auditIdList) {
- String index = preIndex + "_" + auditId;
- deleteSingleIndex(index);
- }
-
- }
-
- protected boolean deleteSingleIndex(String index) throws IOException {
- if (!existsIndex(index)) {
- return true;
- }
- DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
- AcknowledgedResponse deleteIndexResponse =
client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
- boolean res = deleteIndexResponse.isAcknowledged();
- if (res) {
- LOG.info("success deleting index {}", index);
- } else {
- LOG.error("fail to delete index {}", index);
- }
- return res;
- }
-
- @Override
- public void close() {
- try {
- bulkInsert();
- } catch (IOException e) {
- LOG.error("bulkInsert has err: ", e);
- }
- timerService.shutdown();
- }
-
- protected XContentBuilder generateBuilder() throws IOException {
- XContentBuilder builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.startObject("properties");
- doBuild(builder, "audit_id", X_CONTENT_BUILDER_KEYWORD_VALUE);
- doBuild(builder, "audit_tag", X_CONTENT_BUILDER_KEYWORD_VALUE);
- doBuild(builder, "audit_version", X_CONTENT_BUILDER_KEYWORD_VALUE);
- doBuild(builder, "inlong_group_id", X_CONTENT_BUILDER_KEYWORD_VALUE);
- doBuild(builder, "inlong_stream_id", X_CONTENT_BUILDER_KEYWORD_VALUE);
- doBuild(builder, "docker_id", X_CONTENT_BUILDER_KEYWORD_VALUE);
- doBuild(builder, "thread_id", X_CONTENT_BUILDER_KEYWORD_VALUE);
- doBuild(builder, "ip", X_CONTENT_BUILDER_KEYWORD_VALUE);
- doBuild(builder, "log_ts", X_CONTENT_BUILDER_KEYWORD_VALUE);
- doBuild(builder, "sdk_ts", X_CONTENT_BUILDER_LONG_VALUE);
- doBuild(builder, "count", X_CONTENT_BUILDER_LONG_VALUE);
- doBuild(builder, "size", X_CONTENT_BUILDER_LONG_VALUE);
- doBuild(builder, "delay", X_CONTENT_BUILDER_LONG_VALUE);
- doBuild(builder, "packet_id", X_CONTENT_BUILDER_LONG_VALUE);
- builder.endObject();
- builder.endObject();
- return builder;
- }
-
- private void doBuild(XContentBuilder builder, String name, String value)
throws IOException {
- builder.startObject(name);
- builder.field(X_CONTENT_BUILDER_TYPE, value);
- builder.endObject();
- }
-
- /**
- * insert
- *
- * @param msgBody
- */
- @Override
- public void insert(AuditData msgBody) {
- ESDataPo esPo = new ESDataPo();
- esPo.setIp(msgBody.getIp());
- esPo.setThreadId(msgBody.getThreadId());
- esPo.setDockerId(msgBody.getDockerId());
- esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
- esPo.setLogTs(new Date(msgBody.getLogTs()));
- esPo.setAuditId(msgBody.getAuditId());
- esPo.setAuditTag(msgBody.getAuditTag());
- esPo.setAuditVersion(msgBody.getAuditVersion());
- esPo.setCount(msgBody.getCount());
- esPo.setDelay(msgBody.getDelay());
- esPo.setInlongGroupId(msgBody.getInlongGroupId());
- esPo.setInlongStreamId(msgBody.getInlongStreamId());
- esPo.setSize(msgBody.getSize());
- esPo.setPacketId(msgBody.getPacketId());
- this.insertData(esPo);
- }
-
- @Override
- public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId
messageId) {
-
- }
-}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
deleted file mode 100644
index 21f471d003..0000000000
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.service;
-
-import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.db.entities.AuditDataPo;
-import org.apache.inlong.audit.protocol.AuditData;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
-
-import java.util.Date;
-
-/**
- * MySqlService
- */
-public class MySqlService implements InsertData {
-
- private final AuditDataDao dao;
-
- public MySqlService(AuditDataDao dao) {
- this.dao = dao;
- }
-
- @Override
- public void insert(AuditData msgBody) {
- AuditDataPo po = new AuditDataPo();
- po.setIp(msgBody.getIp());
- po.setThreadId(msgBody.getThreadId());
- po.setDockerId(msgBody.getDockerId());
- po.setPacketId(msgBody.getPacketId());
- po.setSdkTs(new Date(msgBody.getSdkTs()));
- po.setLogTs(new Date(msgBody.getLogTs()));
- po.setAuditId(msgBody.getAuditId());
- po.setAuditTag(msgBody.getAuditTag());
- po.setAuditVersion(msgBody.getAuditVersion());
- po.setCount(msgBody.getCount());
- po.setDelay(msgBody.getDelay());
- po.setInlongGroupId(msgBody.getInlongGroupId());
- po.setInlongStreamId(msgBody.getInlongStreamId());
- po.setSize(msgBody.getSize());
- dao.insert(po);
- }
-
- @Override
- public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId
messageId) {
-
- }
-
-}
diff --git
a/inlong-audit/audit-store/src/main/resources/mapper/AuditDataDao.xml
b/inlong-audit/audit-store/src/main/resources/mapper/AuditDataDao.xml
deleted file mode 100644
index aadf7e47cf..0000000000
--- a/inlong-audit/audit-store/src/main/resources/mapper/AuditDataDao.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.inlong.audit.db.dao.AuditDataDao">
- <resultMap id="BaseResultMap"
type="org.apache.inlong.audit.db.entities.AuditDataPo">
- <result column="ip" jdbcType="VARCHAR" property="ip"/>
- <result column="docker_id" jdbcType="VARCHAR" property="dockerId"/>
- <result column="thread_id" jdbcType="VARCHAR" property="threadId"/>
- <result column="sdk_ts" jdbcType="TIMESTAMP" property="sdkTs"/>
- <result column="packet_id" jdbcType="BIGINT" property="packetId"/>
- <result column="log_ts" jdbcType="TIMESTAMP" property="logTs"/>
- <result column="inlong_group_id" jdbcType="VARCHAR"
property="inlongGroupId"/>
- <result column="inlong_stream_id" jdbcType="VARCHAR"
property="inlongStreamId"/>
- <result column="audit_id" jdbcType="VARCHAR" property="auditId"/>
- <result column="audit_tag" jdbcType="VARCHAR" property="auditTag"/>
- <result column="audit_version" jdbcType="BIGINT"
property="auditVersion"/>
- <result column="count" jdbcType="BIGINT" property="count"/>
- <result column="size" jdbcType="BIGINT" property="size"/>
- <result column="delay" jdbcType="BIGINT" property="delay"/>
- <result column="update_time" jdbcType="TIMESTAMP"
property="updateTime"/>
- </resultMap>
-
- <insert id="insert"
parameterType="org.apache.inlong.audit.db.entities.AuditDataPo">
- insert into audit_data (ip, docker_id, thread_id,
- sdk_ts, packet_id, log_ts,
- inlong_group_id, inlong_stream_id, audit_id,
audit_tag, audit_version,
- `count`, size, delay)
- values (#{ip,jdbcType=VARCHAR}, #{dockerId,jdbcType=VARCHAR},
#{threadId,jdbcType=VARCHAR},
- #{sdkTs,jdbcType=TIMESTAMP}, #{packetId,jdbcType=BIGINT},
#{logTs,jdbcType=TIMESTAMP},
- #{inlongGroupId,jdbcType=VARCHAR},
#{inlongStreamId,jdbcType=VARCHAR}, #{auditId,jdbcType=VARCHAR},
- #{auditTag,jdbcType=VARCHAR}, #{auditVersion,jdbcType=BIGINT},
- #{count,jdbcType=BIGINT}, #{size,jdbcType=BIGINT},
#{delay,jdbcType=BIGINT})
- </insert>
-</mapper>
\ No newline at end of file
diff --git
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/ElasticsearchServiceTest.java
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/ElasticsearchServiceTest.java
deleted file mode 100644
index 6aa5e3835b..0000000000
---
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/ElasticsearchServiceTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.service;
-
-import org.apache.inlong.audit.db.entities.ESDataPo;
-
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import java.io.IOException;
-import java.util.Date;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(SpringRunner.class)
-@ActiveProfiles(value = {"test"})
-@SpringBootTest(classes = ElasticsearchServiceTest.class)
-public class ElasticsearchServiceTest {
-
- private static ElasticsearchService elasticsearchService;
-
- private final String index = "20220112_1";
-
- @BeforeClass
- public static void setUp() throws IOException {
- elasticsearchService = mock(ElasticsearchService.class);
-
when(elasticsearchService.existsIndex(Mockito.anyString())).thenReturn(true);
-
when(elasticsearchService.createIndex(Mockito.anyString())).thenReturn(true);
-
when(elasticsearchService.deleteSingleIndex(Mockito.anyString())).thenReturn(true);
- }
-
- @Test
- public void testExistsIndex() throws IOException {
- boolean res = elasticsearchService.createIndex(index);
- Assert.assertTrue(res);
-
- res = elasticsearchService.existsIndex(index);
- Assert.assertTrue(res);
- }
-
- @Test
- public void testInsertData() {
- for (int i = 0; i < 5; i++) {
- ESDataPo po = new ESDataPo();
- po.setIp("0.0.0.0");
- po.setThreadId(String.valueOf(i));
- po.setDockerId(String.valueOf(i));
- po.setSdkTs(new Date().getTime());
- po.setLogTs(new Date());
- po.setAuditId("1");
- po.setCount(i);
- po.setDelay(i);
- po.setInlongGroupId(String.valueOf(i));
- po.setInlongStreamId(String.valueOf(i));
- po.setSize(i);
- po.setPacketId(i);
- elasticsearchService.insertData(po);
- }
- }
-
- @Test
- public void testDeleteSingleIndex() throws IOException {
- boolean res = elasticsearchService.createIndex(index);
- Assert.assertTrue(res);
- res = elasticsearchService.deleteSingleIndex(index);
- Assert.assertTrue(res);
- }
-
- @Test
- public void testDeleteTimeoutIndices() throws IOException {
- elasticsearchService.deleteTimeoutIndices();
- }
-
-}
diff --git
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
index e7f55fd594..5ddee887bd 100644
---
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
+++
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
@@ -17,14 +17,11 @@
package org.apache.inlong.audit.service.consume;
-import org.apache.inlong.audit.config.ClickHouseConfig;
+import org.apache.inlong.audit.config.JdbcConfig;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.service.ClickHouseService;
-import org.apache.inlong.audit.service.ElasticsearchService;
import org.apache.inlong.audit.service.InsertData;
-import org.apache.inlong.audit.service.MySqlService;
+import org.apache.inlong.audit.service.JdbcService;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -42,11 +39,9 @@ import static org.mockito.Mockito.when;
public class KafkaConsumeTest {
private KafkaConsumer consumer;
- private AuditDataDao auditDataDao;
- private ElasticsearchService esService;
- private ClickHouseConfig ckConfig;
private StoreConfig storeConfig;
private MessageQueueConfig mqConfig;
+ private JdbcConfig jdbcConfig;
private String topic = "inlong-audit";
private ConsumerRecords records;
@@ -77,9 +72,7 @@ public class KafkaConsumeTest {
*/
private List<InsertData> getInsertServiceList() {
List<InsertData> insertData = new ArrayList<>();
- insertData.add(new MySqlService(auditDataDao));
- insertData.add(esService);
- insertData.add(new ClickHouseService(ckConfig));
+ insertData.add(new JdbcService(jdbcConfig));
return insertData;
}
}
diff --git
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
index 12a5e97efc..4085f29514 100644
---
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
+++
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
@@ -17,14 +17,11 @@
package org.apache.inlong.audit.service.consume;
-import org.apache.inlong.audit.config.ClickHouseConfig;
+import org.apache.inlong.audit.config.JdbcConfig;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.service.ClickHouseService;
-import org.apache.inlong.audit.service.ElasticsearchService;
import org.apache.inlong.audit.service.InsertData;
-import org.apache.inlong.audit.service.MySqlService;
+import org.apache.inlong.audit.service.JdbcService;
import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -41,9 +38,7 @@ import static org.mockito.Mockito.when;
public class TubeConsumeTest {
private PullMessageConsumer pullMessageConsumer;
- private AuditDataDao auditDataDao;
- private ElasticsearchService esService;
- private ClickHouseConfig chConfig;
+ private JdbcConfig jdbcConfig;
private StoreConfig storeConfig;
private MessageQueueConfig mqConfig;
private String topic = "inlong-audit";
@@ -63,6 +58,7 @@ public class TubeConsumeTest {
/**
* testConsume
+ *
* @throws InterruptedException
*/
@Test
@@ -76,13 +72,12 @@ public class TubeConsumeTest {
/**
* getInsertServiceList
+ *
* @return
*/
private List<InsertData> getInsertServiceList() {
List<InsertData> insertServiceList = new ArrayList<>();
- insertServiceList.add(new MySqlService(auditDataDao));
- insertServiceList.add(esService);
- insertServiceList.add(new ClickHouseService(chConfig));
+ insertServiceList.add(new JdbcService(jdbcConfig));
return insertServiceList;
}
}
diff --git
a/inlong-audit/audit-store/src/test/resources/mapper/AuditDataDao.xml
b/inlong-audit/audit-store/src/test/resources/mapper/AuditDataDao.xml
deleted file mode 100644
index 1215347a03..0000000000
--- a/inlong-audit/audit-store/src/test/resources/mapper/AuditDataDao.xml
+++ /dev/null
@@ -1,47 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.inlong.audit.db.dao.AuditDataDao">
- <resultMap id="BaseResultMap"
type="org.apache.inlong.audit.db.entities.AuditDataPo">
- <result column="ip" jdbcType="VARCHAR" property="ip" />
- <result column="docker_id" jdbcType="VARCHAR" property="dockerId" />
- <result column="thread_id" jdbcType="VARCHAR" property="threadId" />
- <result column="sdk_ts" jdbcType="TIMESTAMP" property="sdkTs" />
- <result column="packet_id" jdbcType="BIGINT" property="packetId" />
- <result column="log_ts" jdbcType="TIMESTAMP" property="logTs" />
- <result column="inlong_group_id" jdbcType="VARCHAR"
property="inlongGroupId" />
- <result column="inlong_stream_id" jdbcType="VARCHAR"
property="inlongStreamId" />
- <result column="audit_id" jdbcType="VARCHAR" property="auditId" />
- <result column="count" jdbcType="BIGINT" property="count" />
- <result column="size" jdbcType="BIGINT" property="size" />
- <result column="delay" jdbcType="BIGINT" property="delay" />
- </resultMap>
-
- <insert id="insert"
parameterType="org.apache.inlong.audit.db.entities.AuditDataPo">
- insert into audit_data (ip, docker_id, thread_id,
- sdk_ts, packet_id, log_ts,
- inlong_group_id, inlong_stream_id, audit_id,
- count, size, delay)
- values (#{ip,jdbcType=VARCHAR}, #{dockerId,jdbcType=VARCHAR},
#{threadId,jdbcType=VARCHAR},
- #{sdkTs,jdbcType=TIMESTAMP}, #{packetId,jdbcType=BIGINT},
#{logTs,jdbcType=TIMESTAMP},
- #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{auditId,jdbcType=VARCHAR},
- #{count,jdbcType=BIGINT}, #{size,jdbcType=BIGINT},
#{delay,jdbcType=BIGINT})
- </insert>
-</mapper>
\ No newline at end of file
diff --git a/inlong-audit/conf/application.properties
b/inlong-audit/conf/application.properties
index d721e375cb..b1587668b3 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -16,37 +16,11 @@
# specific language governing permissions and limitations
# under the License.
#
-# datasource config
-# datasource config, set org.postgresql.Driver if using PostgreSQL
-spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
-spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL&allowPublicKeyRetrieval=true
-spring.datasource.druid.username=root
-spring.datasource.druid.password=inlong
-spring.datasource.druid.filters=stat,log4j,config
-spring.datasource.druid.max-active=100
-spring.datasource.druid.initial-size=1
-spring.datasource.druid.max-wait=60000
-spring.datasource.druid.min-idle=1
-spring.datasource.druid.time-between-eviction-runs-millis=60000
-spring.datasource.druid.min-evictable-idle-time-millis=300000
-spring.datasource.druid.validation-query=select 'x'
-spring.datasource.druid.test-while-idle=true
-spring.datasource.druid.test-on-borrow=false
-spring.datasource.druid.test-on-return=false
-spring.datasource.druid.pool-prepared-statements=true
-spring.datasource.druid.filter.wall.config.multi-statement-allow=true
-spring.datasource.druid.max-open-prepared-statements=50
-spring.datasource.druid.max-pool-prepared-statement-per-connection-size=20
-
-# mybatis config
-mybatis.mapper-locations=classpath*:mapper/*.xml
-mybatis.type-aliases-package=org.apache.inlong.audit.db.entities
-
# proxy.type: pulsar / tube / kafka
audit.config.proxy.type=pulsar
-# store.server: mysql / clickhouse / elasticsearch
-audit.config.store.mode=mysql
+# Supports common JDBC protocol
+audit.config.store.mode=jdbc
# manger config
manager.hosts=127.0.0.1:8083
@@ -70,35 +44,9 @@ audit.kafka.topic.replicationFactor=2
audit.kafka.consumer.name=inlong-audit-consumer
audit.kafka.group.id=audit-consumer-group
-# Invalid data will be discarded if exceed the threshold days
-msg.valid.threshold.days=7
-
-# es config
-elasticsearch.host=127.0.0.1
-elasticsearch.port=9200
-elasticsearch.authEnable=false
-elasticsearch.username=elastic
-elasticsearch.password=inlong
-elasticsearch.shardsNum=5
-elasticsearch.replicaNum=1
-elasticsearch.indexDeleteDay=5
-elasticsearch.enableDocId=true
-elasticsearch.bulkInterval=10
-elasticsearch.bulkThreshold=5000
-elasticsearch.auditIdSet=1,2
-
-# clickhouse config
-clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
-clickhouse.url=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
-clickhouse.username=default
-clickhouse.password=default
-clickhouse.batchIntervalMs=1000
-clickhouse.batchThreshold=500
-clickhouse.processIntervalMs=100
-
# Generic jdbc storage
jdbc.driver=com.mysql.cj.jdbc.Driver
-jdbc.url=jdbc:mysql://127.0.0.1:9020/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL
-jdbc.username=*******
-jdbc.password=********
+jdbc.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL
+jdbc.username=root
+jdbc.password=inlong
diff --git a/inlong-audit/conf/mapper/AuditDataDao.xml
b/inlong-audit/conf/mapper/AuditDataDao.xml
deleted file mode 100644
index ffb6ede2d3..0000000000
--- a/inlong-audit/conf/mapper/AuditDataDao.xml
+++ /dev/null
@@ -1,47 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.inlong.audit.db.dao.AuditDataDao">
- <resultMap id="BaseResultMap"
type="org.apache.inlong.audit.db.entities.AuditDataPo">
- <result column="ip" jdbcType="VARCHAR" property="ip" />
- <result column="docker_id" jdbcType="VARCHAR" property="dockerId" />
- <result column="thread_id" jdbcType="VARCHAR" property="threadId" />
- <result column="sdk_ts" jdbcType="TIMESTAMP" property="sdkTs" />
- <result column="packet_id" jdbcType="BIGINT" property="packetId" />
- <result column="log_ts" jdbcType="TIMESTAMP" property="logTs" />
- <result column="inlong_group_id" jdbcType="VARCHAR"
property="inlongGroupId" />
- <result column="inlong_stream_id" jdbcType="VARCHAR"
property="inlongStreamId" />
- <result column="audit_id" jdbcType="VARCHAR" property="auditId" />
- <result column="count" jdbcType="BIGINT" property="count" />
- <result column="size" jdbcType="BIGINT" property="size" />
- <result column="delay" jdbcType="BIGINT" property="delay" />
- </resultMap>
-
- <insert id="insert"
parameterType="org.apache.inlong.audit.db.entities.AuditDataPo">
- insert into audit_data (ip, docker_id, thread_id,
- sdk_ts, packet_id, log_ts,
- inlong_group_id, inlong_stream_id, audit_id,
- count, size, delay)
- values (#{ip,jdbcType=VARCHAR}, #{dockerId,jdbcType=VARCHAR},
#{threadId,jdbcType=VARCHAR},
- #{sdkTs,jdbcType=TIMESTAMP}, #{packetId,jdbcType=BIGINT},
#{logTs,jdbcType=TIMESTAMP},
- #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{auditId,jdbcType=VARCHAR},
- #{count,jdbcType=BIGINT}, #{size,jdbcType=BIGINT},
#{delay,jdbcType=BIGINT})
- </insert>
-</mapper>
diff --git a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
deleted file mode 100644
index 7e96f5e8bc..0000000000
--- a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- ----------------------------
--- Database for InLong Audit
--- ----------------------------
-CREATE DATABASE IF NOT EXISTS apache_inlong_audit;
-
-USE apache_inlong_audit;
-
--- ----------------------------
--- Table structure for audit_data
--- The table creation statement of the audit flow table is used to record the
real-time flow data of the audit.
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `audit_data`
-(
- `ip` String COMMENT 'Client IP',
- `docker_id` String COMMENT 'Client docker id',
- `thread_id` String COMMENT 'Client thread id',
- `sdk_ts` DateTime COMMENT 'SDK timestamp',
- `packet_id` Int64 COMMENT 'Packet id',
- `log_ts` DateTime COMMENT 'Log timestamp',
- `inlong_group_id` String COMMENT 'The target inlong group id',
- `inlong_stream_id` String COMMENT 'The target inlong stream id',
- `audit_id` String COMMENT 'Audit id',
- `audit_tag` String COMMENT 'Audit tag',
- `count` Int64 COMMENT 'Message count',
- `size` Int64 COMMENT 'Message size',
- `delay` Int64 COMMENT 'Message delay',
- `update_time` DateTime COMMENT 'Update time'
-) ENGINE = MergeTree
-ORDER BY inlong_group_id
-SETTINGS index_granularity = 8192;
diff --git a/inlong-audit/sql/apache_inlong_audit_mysql.sql
b/inlong-audit/sql/apache_inlong_audit_mysql.sql
index 4914fd7854..333052c97f 100644
--- a/inlong-audit/sql/apache_inlong_audit_mysql.sql
+++ b/inlong-audit/sql/apache_inlong_audit_mysql.sql
@@ -102,7 +102,7 @@ CREATE TABLE IF NOT EXISTS `leader_selector`
`leader_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
NOT NULL,
`last_seen_active` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`service_id`)
-) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_general_ci
COMMENT = 'selector db'
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_general_ci
COMMENT = 'selector db';
-- ----------------------------
-- Table structure for audit id config
@@ -113,7 +113,7 @@ CREATE TABLE IF NOT EXISTS `audit_id_config`
`status` int(11) DEFAULT '1' COMMENT 'Audit source config status.
0:Offline,1:Online',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'Update time',
PRIMARY KEY (`audit_id`)
-) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit id config'
+) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit id config';
-- ----------------------------
@@ -130,5 +130,5 @@ CREATE TABLE IF NOT EXISTS `audit_source_config`
`status` int(11) DEFAULT '1' COMMENT 'Audit source config status.
0:Offline,1:Online',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'Update time',
PRIMARY KEY (`source_name`, `jdbc_url`)
-) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit source config'
+) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit source config';