This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a2e0f464dc [INLONG-10085][Audit] Optimize the performance of the
audit-service (#10088)
a2e0f464dc is described below
commit a2e0f464dc649268e4bdeb5d1df8b46bbb2e5ef1
Author: doleyzi <[email protected]>
AuthorDate: Fri Apr 26 20:56:00 2024 +0800
[INLONG-10085][Audit] Optimize the performance of the audit-service (#10088)
---
inlong-audit/audit-service/pom.xml | 6 +
.../apache/inlong/audit/cache/AbstractCache.java | 62 +++++++--
.../apache/inlong/audit/cache/RealTimeQuery.java | 134 +++++++++++-------
.../inlong/audit/config/ConfigConstants.java | 13 ++
.../inlong/audit/config/OpenApiConstants.java | 7 +-
.../apache/inlong/audit/config/SqlConstants.java | 78 ++++++-----
.../apache/inlong/audit/entities/SinkConfig.java | 2 +-
.../apache/inlong/audit/entities/SourceConfig.java | 3 +
.../org/apache/inlong/audit/entities/StatData.java | 3 +-
.../org/apache/inlong/audit/main/Application.java | 66 +++++++++
.../inlong/audit/selector/impl/DBDataSource.java | 4 +-
.../apache/inlong/audit/service/ApiService.java | 153 ++++++++++++---------
.../apache/inlong/audit/service/EtlService.java | 67 +--------
.../org/apache/inlong/audit/sink/JdbcSink.java | 2 +-
.../org/apache/inlong/audit/source/JdbcSource.java | 8 +-
15 files changed, 379 insertions(+), 229 deletions(-)
diff --git a/inlong-audit/audit-service/pom.xml
b/inlong-audit/audit-service/pom.xml
index 2a67f41afc..e71c33586b 100644
--- a/inlong-audit/audit-service/pom.xml
+++ b/inlong-audit/audit-service/pom.xml
@@ -86,6 +86,12 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>audit-common</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
<resources>
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
index b36d4b53e3..3ad1a43bff 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
@@ -20,19 +20,23 @@ package org.apache.inlong.audit.cache;
import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.AuditCycle;
import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.utils.CacheUtils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_HOURS;
@@ -50,6 +54,9 @@ public class AbstractCache {
protected AuditCycle auditCycle;
private static final int DEFAULT_MONITOR_INTERVAL = 1;
+ // According to the startTime and endTime of the request parameters, the
maximum number of cache keys generated.
+ private static final int MAX_CACHE_KEY_SIZE = 1440;
+
protected AbstractCache(AuditCycle auditCycle) {
cache = Caffeine.newBuilder()
.maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE,
@@ -77,18 +84,55 @@ public class AbstractCache {
}
/**
- * Get data
*
- * @param key
+ * @param startTime
+ * @param endTime
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @param auditId
+ * @param auditTag
* @return
*/
- public List<StatData> getData(String key) {
- StatData statData = cache.getIfPresent(key);
- if (null == statData) {
- // Compatible with scenarios where the auditTag openapi parameter
can be empty.
- statData = cache.getIfPresent(key + DEFAULT_AUDIT_TAG);
+ public List<StatData> getData(String startTime, String endTime, String
inlongGroupId,
+ String inlongStreamId, String auditId, String auditTag) {
+ List<StatData> result = new LinkedList<>();
+ List<String> keyList = buildCacheKeyList(startTime, endTime,
inlongGroupId,
+ inlongStreamId, auditId, auditTag);
+ for (String cacheKey : keyList) {
+ StatData statData = cache.getIfPresent(cacheKey);
+ if (null == statData) {
+ // Compatible with scenarios where the auditTag openapi
parameter can be empty.
+ statData = cache.getIfPresent(cacheKey + DEFAULT_AUDIT_TAG);
+ }
+ if (null != statData) {
+ result.add(statData);
+ }
+ }
+ return result;
+ }
+
+ private List<String> buildCacheKeyList(String startTime, String endTime,
String inlongGroupId,
+ String inlongStreamId, String auditId, String auditTag) {
+ List<String> keyList = new LinkedList<>();
+ try {
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ Date startDate = dateFormat.parse(startTime);
+ Date endDate = dateFormat.parse(endTime);
+ for (int index = 0; index < MAX_CACHE_KEY_SIZE; index++) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(startDate);
+ calendar.add(Calendar.MINUTE, index * auditCycle.getValue());
+ calendar.set(Calendar.SECOND, 0);
+ if (calendar.getTime().compareTo(endDate) > 0) {
+ break;
+ }
+ String time = dateFormat.format(calendar.getTime());
+ keyList.add(CacheUtils.buildCacheKey(time, inlongGroupId,
inlongStreamId, auditId, auditTag));
+ }
+ } catch (Exception exception) {
+ LOGGER.error("It has exception when build cache key list!",
exception);
}
- return statData == null ? new LinkedList<>() :
Collections.singletonList(statData);
+ return keyList;
}
/**
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
index b94388a43e..ff9de5c6d5 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
@@ -21,9 +21,9 @@ import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.JdbcConfig;
import org.apache.inlong.audit.entities.StatData;
import org.apache.inlong.audit.service.ConfigService;
+import org.apache.inlong.audit.utils.CacheUtils;
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.dbcp.BasicDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,22 +33,21 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
-import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
-import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IPS_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_MINUTE_SQL;
@@ -64,7 +63,7 @@ public class RealTimeQuery {
private static final Logger LOGGER =
LoggerFactory.getLogger(RealTimeQuery.class);
private static volatile RealTimeQuery realTimeQuery = null;
- private final List<DataSource> dataSourceList = new LinkedList<>();
+ private final List<BasicDataSource> dataSourceList = new LinkedList<>();
private final String queryLogTsSql;
private final String queryIdsByIpSql;
@@ -73,7 +72,26 @@ public class RealTimeQuery {
private RealTimeQuery() {
List<JdbcConfig> jdbcConfigList =
ConfigService.getInstance().getAllAuditSource();
for (JdbcConfig jdbcConfig : jdbcConfigList) {
- dataSourceList.add(createDataSource(jdbcConfig));
+ BasicDataSource dataSource = new BasicDataSource();
+ dataSource.setDriverClassName(jdbcConfig.getDriverClass());
+ dataSource.setUrl(jdbcConfig.getJdbcUrl());
+ dataSource.setUsername(jdbcConfig.getUserName());
+ dataSource.setPassword(jdbcConfig.getPassword());
+
dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
+ DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
+
dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS,
+ DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS));
+
dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS,
+ DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS));
+
dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
+ DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
+ dataSource.setTestOnBorrow(true);
+ dataSource.setValidationQuery("SELECT 1");
+ dataSource
+
.setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS,
+ DEFAULT_DATASOURCE_DETECT_INTERVAL_MS));
+
+ dataSourceList.add(dataSource);
}
queryLogTsSql =
Configuration.getInstance().get(KEY_SOURCE_QUERY_MINUTE_SQL,
@@ -95,29 +113,6 @@ public class RealTimeQuery {
return realTimeQuery;
}
- /**
- * Create data source.
- */
- private DataSource createDataSource(JdbcConfig jdbcConfig) {
- HikariConfig config = new HikariConfig();
- config.setDriverClassName(jdbcConfig.getDriverClass());
- config.setJdbcUrl(jdbcConfig.getJdbcUrl());
- config.setUsername(jdbcConfig.getUserName());
- config.setPassword(jdbcConfig.getPassword());
-
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
- DEFAULT_CONNECTION_TIMEOUT));
- config.addDataSourceProperty(CACHE_PREP_STMTS,
- Configuration.getInstance().get(KEY_CACHE_PREP_STMTS,
DEFAULT_CACHE_PREP_STMTS));
- config.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
- Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE,
DEFAULT_PREP_STMT_CACHE_SIZE));
- config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
- Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT,
DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
- config.setMaximumPoolSize(
- Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE,
- DEFAULT_DATASOURCE_POOL_SIZE));
- return new HikariDataSource(config);
- }
-
/**
* Query the audit data of log time.
*
@@ -130,17 +125,57 @@ public class RealTimeQuery {
*/
public List<StatData> queryLogTs(String startTime, String endTime, String
inlongGroupId,
String inlongStreamId, String auditId) {
+ long currentTime = System.currentTimeMillis();
List<StatData> statDataList = new LinkedList<>();
+ if (dataSourceList.isEmpty()) {
+ return statDataList;
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
for (DataSource dataSource : dataSourceList) {
- statDataList =
- doQueryLogTs(dataSource, startTime, endTime,
inlongGroupId, inlongStreamId, auditId);
- if (!statDataList.isEmpty()) {
- break;
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+ List<StatData> statDataListTemp =
+ doQueryLogTs(dataSource, startTime, endTime,
inlongGroupId, inlongStreamId, auditId);
+ statDataList.addAll(statDataListTemp);
+ });
+ futures.add(future);
+ }
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[futures.size()])).join();
+ LOGGER.info("Query log ts by params: {} {} {} {} {}, cost {} ms",
startTime, endTime, inlongGroupId,
+ inlongStreamId, auditId, System.currentTimeMillis() -
currentTime);
+ return filterMaxAuditVersion(statDataList);
+ }
+
+ /**
+ * @param allStatData
+ * @return
+ */
+ public List<StatData> filterMaxAuditVersion(List<StatData> allStatData) {
+ HashMap<String, List<StatData>> allData = new HashMap<>();
+ for (StatData statData : allStatData) {
+ String dataKey = CacheUtils.buildCacheKey(
+ statData.getLogTs(),
+ statData.getInlongGroupId(),
+ statData.getInlongStreamId(),
+ statData.getAuditId(),
+ statData.getAuditTag());
+ List<StatData> statDataList = allData.computeIfAbsent(dataKey, k
-> new LinkedList<>());
+ statDataList.add(statData);
+ }
+ List<StatData> result = new LinkedList<>();
+ for (Map.Entry<String, List<StatData>> entry : allData.entrySet()) {
+ long maxAuditVersion = Long.MIN_VALUE;
+ for (StatData maxData : entry.getValue()) {
+ maxAuditVersion =
+ maxData.getAuditVersion() > maxAuditVersion ?
maxData.getAuditVersion() : maxAuditVersion;
+ }
+ for (StatData statData : entry.getValue()) {
+ if (statData.getAuditVersion() == maxAuditVersion) {
+ result.add(statData);
+ break;
+ }
}
- LOGGER.info("Change another audit source to query data! Params is:
{} {} {} {} {}",
- startTime, endTime, inlongGroupId, inlongStreamId,
auditId);
}
- return statDataList;
+ return result;
}
/**
@@ -175,6 +210,7 @@ public class RealTimeQuery {
data.setCount(resultSet.getLong(6));
data.setSize(resultSet.getLong(7));
data.setDelay(resultSet.getLong(8));
+ data.setAuditVersion(resultSet.getLong(9));
result.add(data);
}
} catch (SQLException sqlException) {
@@ -203,6 +239,8 @@ public class RealTimeQuery {
break;
}
}
+ LOGGER.info("Query ids by params:{} {} {} {}, result size:{} ",
startTime,
+ endTime, ip, auditId, statDataList.size());
return statDataList;
}
@@ -265,6 +303,8 @@ public class RealTimeQuery {
break;
}
}
+ LOGGER.info("Query ips by params:{} {} {} {} {}, result size:{} ",
+ startTime, endTime, inlongGroupId, inlongStreamId, auditId,
statDataList.size());
return statDataList;
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
index 8d7ddf7f80..57701790d1 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
@@ -29,11 +29,24 @@ public class ConfigConstants {
public static final String KEY_MYSQL_USERNAME = "mysql.username";
public static final String KEY_MYSQL_PASSWORD = "mysql.password";
+ public static final String KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS =
"datasource.max.total.connections";
+ public static final int DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS = 10;
+
+ public static final String KEY_DATASOURCE_MAX_IDLE_CONNECTIONS =
"datasource.max.idle.connections";
+ public static final int DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS = 2;
+
+ public static final String KEY_DATASOURCE_MIN_IDLE_CONNECTIONS =
"datasource.min.idle.connections";
+ public static final int DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS = 1;
+
+ public static final String KEY_DATASOURCE_DETECT_INTERVAL_MS =
"datasource.detect.interval.ms";
+ public static final int DEFAULT_DATASOURCE_DETECT_INTERVAL_MS = 60000;
+
// Time config
public static final String KEY_DATASOURCE_CONNECTION_TIMEOUT =
"datasource.connection.timeout.ms";
public static final int DEFAULT_CONNECTION_TIMEOUT = 1000 * 60 * 5;
public static final String KEY_QUEUE_PULL_TIMEOUT =
"queue.pull.timeout.ms";
public static final int DEFAULT_QUEUE_PULL_TIMEOUT = 1000;
+ public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
// Interval config
public static final String KEY_SOURCE_DB_STAT_INTERVAL =
"source.db.stat.interval.minute";
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
index c2f4fd3d71..05643c43bf 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
@@ -33,8 +33,8 @@ public class OpenApiConstants {
public static final String DEFAULT_API_GET_IPS_PATH =
"/audit/query/getIps";
public static final String KEY_API_GET_IDS_PATH = "api.get.ids.path";
public static final String DEFAULT_API_GET_IDS_PATH =
"/audit/query/getIds";
- public static final String KEY_API_POOL_SIZE = "api.pool.size";
- public static final int DEFAULT_POOL_SIZE = 10;
+ public static final String KEY_API_THREAD_POOL_SIZE =
"api.thread.pool.size";
+ public static final int DEFAULT_API_THREAD_POOL_SIZE = 10;
public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
public static final int DEFAULT_API_BACKLOG_SIZE = 100;
public static final String KEY_API_REAL_LIMITER_QPS =
"api.real.limiter.qps";
@@ -61,7 +61,8 @@ public class OpenApiConstants {
public static final String KEY_HTTP_BODY_ERR_DATA = "data";
public static final String KEY_HTTP_HEADER_CONTENT_TYPE = "Content-Type";
public static final String VALUE_HTTP_HEADER_CONTENT_TYPE =
"application/json;charset=utf-8";
- public static final int BIND_PORT = 80;
+ public static final String KEY_HTTP_SERVER_BIND_PORT =
"api.http.server.bind.port";
+ public static final int DEFAULT_HTTP_SERVER_BIND_PORT = 10080;
public static final int HTTP_RESPOND_CODE = 200;
public static final String DEFAULT_PARAMS_AUDIT_TAG = "";
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index 648ed1d72a..884172d32a 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -37,25 +37,38 @@ public class SqlConstants {
// Source query sql
public static final String KEY_SOURCE_STAT_SQL = "source.stat.sql";
public static final String DEFAULT_SOURCE_STAT_SQL =
- "SELECT inlong_group_id, inlong_stream_id, audit_id\n" +
- "\t, audit_tag, cnt, size, delay,MAX(audit_version)\n" +
+ "SELECT inlong_group_id, inlong_stream_id, audit_id, audit_tag\n" +
+ "\t, SUM(cnt) AS cnt, SUM(size) AS size\n" +
+ "\t, SUM(delay) AS delay\n" +
"FROM (\n" +
- "\tSELECT audit_version, inlong_group_id,
inlong_stream_id, audit_id, audit_tag\n" +
- "\t\t, SUM(count) AS cnt, SUM(size) AS size\n" +
- "\t\t, SUM(delay) AS delay\n" +
+ "\tSELECT t_all_version.log_ts,
t_all_version.inlong_group_id, t_all_version.inlong_stream_id,
t_all_version.audit_id, t_all_version.audit_tag\n"
+ +
+ "\t\t, t_all_version.cnt, t_all_version.size,
t_all_version.delay\n" +
"\tFROM (\n" +
- "\t\tSELECT audit_version, docker_id, thread_id, sdk_ts,
packet_id\n" +
- "\t\t\t, log_ts, ip, inlong_group_id, inlong_stream_id,
audit_id\n" +
- "\t\t\t, audit_tag, count, size, delay\n" +
+ "\t\tSELECT audit_version, log_ts, inlong_group_id,
inlong_stream_id, audit_id\n" +
+ "\t\t\t, audit_tag, SUM(count) AS cnt, SUM(size) AS
size\n" +
+ "\t\t\t, SUM(delay) AS delay\n" +
"\t\tFROM audit_data\n" +
- "\t\tWHERE log_ts BETWEEN ? AND ? \n" +
- "\t\t\tAND audit_id = ? \n" +
- "\t\tGROUP BY audit_version, docker_id, thread_id, sdk_ts,
packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id, audit_tag,
count, size, delay\n "
- +
- "\t) t1\n" +
- "\tGROUP BY audit_version, inlong_group_id,
inlong_stream_id, audit_id, audit_tag\n" +
- ") t2\n" +
- "GROUP BY inlong_group_id, inlong_stream_id, audit_id,
audit_tag, cnt, size, delay";
+ "\t\tWHERE log_ts BETWEEN ? AND ?\n" +
+ "\t\t\tAND audit_id = ?\n" +
+ "\t\tGROUP BY audit_version, log_ts, inlong_group_id,
inlong_stream_id, audit_id, audit_tag\n" +
+ "\t) t_all_version\n" +
+ "\t\tJOIN (\n" +
+ "\t\t\tSELECT max(audit_version) AS audit_version, log_ts,
inlong_group_id, inlong_stream_id\n" +
+ "\t\t\t\t, audit_id, audit_tag\n" +
+ "\t\t\tFROM audit_data\n" +
+ "\t\t\tWHERE log_ts BETWEEN ? AND ?\n" +
+ "\t\t\t\tAND audit_id = ?\n" +
+ "\t\t\tGROUP BY log_ts, inlong_group_id, inlong_stream_id,
audit_id, audit_tag\n" +
+ "\t\t) t_max_version\n" +
+ "\t\tON t_all_version.audit_version =
t_max_version.audit_version\n" +
+ "\t\t\tAND t_all_version.log_ts = t_max_version.log_ts\n" +
+ "\t\t\tAND t_all_version.inlong_group_id =
t_max_version.inlong_group_id\n" +
+ "\t\t\tAND t_all_version.inlong_stream_id =
t_max_version.inlong_stream_id\n" +
+ "\t\t\tAND t_all_version.audit_id =
t_max_version.audit_id\n" +
+ "\t\t\tAND t_all_version.audit_tag =
t_max_version.audit_tag\n" +
+ ") t_sum\n" +
+ "GROUP BY inlong_group_id, inlong_stream_id, audit_id,
audit_tag";
public static final String KEY_SOURCE_QUERY_IPS_SQL =
"source.query.ips.sql";
public static final String DEFAULT_SOURCE_QUERY_IPS_SQL =
@@ -82,27 +95,22 @@ public class SqlConstants {
public static final String KEY_SOURCE_QUERY_MINUTE_SQL =
"source.query.minute.sql";
public static final String DEFAULT_SOURCE_QUERY_MINUTE_SQL =
"SELECT log_ts, inlong_group_id, inlong_stream_id, audit_id,
audit_tag\n" +
- "\t, cnt, size, delay, max(audit_version)\n" +
+ "\t, sum(count) AS cnt, sum(size) AS size\n" +
+ "\t, sum(delay) AS delay, audit_version\n" +
"FROM (\n" +
- "\tSELECT audit_version, log_ts, inlong_group_id,
inlong_stream_id, audit_id\n" +
- "\t\t, audit_tag, sum(count) AS cnt, sum(size) AS size\n" +
- "\t\t, sum(delay) AS delay\n" +
- "\tFROM (\n" +
- "\t\tSELECT audit_version, docker_id, thread_id, sdk_ts,
packet_id\n" +
- "\t\t\t, log_ts, ip, inlong_group_id, inlong_stream_id,
audit_id\n" +
- "\t\t\t, audit_tag, count, size, delay\n" +
- "\t\tFROM audit_data\n" +
- "\t\tWHERE log_ts BETWEEN ? AND ?\n" +
- "\t\t\tAND inlong_group_id = ?\n" +
- "\t\t\tAND inlong_stream_id = ?\n" +
- "\t\t\tAND audit_id = ? \n" +
- "\t\tGROUP BY audit_version, docker_id, thread_id, sdk_ts,
packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id, audit_tag,
count, size, delay\n"
+ "\tSELECT audit_version, docker_id, thread_id, sdk_ts,
packet_id\n" +
+ "\t\t, log_ts, ip, inlong_group_id, inlong_stream_id,
audit_id\n" +
+ "\t\t, audit_tag, count, size, delay\n" +
+ "\tFROM audit_data\n" +
+ "\tWHERE log_ts BETWEEN ? AND ?\n" +
+ "\t\tAND inlong_group_id = ?\n" +
+ "\t\tAND inlong_stream_id = ?\n" +
+ "\t\tAND audit_id = ?\n" +
+ "\tGROUP BY audit_version, docker_id, thread_id, sdk_ts,
packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id, audit_tag,
count, size, delay\n"
+
- "\t) t1\n" +
- "\tGROUP BY audit_version, log_ts, inlong_group_id,
inlong_stream_id, audit_id, audit_tag\n" +
- ") t2\n" +
- "GROUP BY log_ts, inlong_group_id, inlong_stream_id,
audit_id, audit_tag, cnt, size, delay \n" +
- "limit 1440 ";
+ ") t_distinct\n" +
+ "GROUP BY audit_version, log_ts, inlong_group_id,
inlong_stream_id, audit_id, audit_tag\n" +
+ "LIMIT 1440";
// Mysql query sql
public static final String KEY_MYSQL_SOURCE_QUERY_TEMP_SQL =
"mysql.query.temp.sql";
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java
index d2e137ec83..3cb15a9016 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java
@@ -30,7 +30,7 @@ public class SinkConfig {
private String insertSql;
private final String driverClassName;
private final String jdbcUrl;
- private final String username;
+ private final String userName;
private final String password;
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
index 428ca0389f..88730a203a 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
@@ -17,12 +17,14 @@
package org.apache.inlong.audit.entities;
+import lombok.AllArgsConstructor;
import lombok.Data;
/**
* Source config
*/
@Data
+@AllArgsConstructor
public class SourceConfig {
private AuditCycle auditCycle;
@@ -32,6 +34,7 @@ public class SourceConfig {
private final String jdbcUrl;
private final String username;
private final String password;
+ private boolean needJoin = false;
public SourceConfig(AuditCycle auditCycle,
String querySql,
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
index f8f8981f58..910aec7ce8 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
@@ -24,7 +24,7 @@ import java.sql.Timestamp;
@Data
public class StatData {
- private String auditVersion;
+ private long auditVersion;
private String logTs;
private String inlongGroupId;
private String inlongStreamId;
@@ -35,4 +35,5 @@ public class StatData {
private Long delay;
private Timestamp updateTime;
private String ip;
+ private String sourceName;
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
index ecd13953fc..067667133d 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
@@ -17,18 +17,34 @@
package org.apache.inlong.audit.main;
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.JdbcConfig;
+import org.apache.inlong.audit.selector.api.Selector;
+import org.apache.inlong.audit.selector.api.SelectorConfig;
+import org.apache.inlong.audit.selector.api.SelectorFactory;
import org.apache.inlong.audit.service.ApiService;
import org.apache.inlong.audit.service.ConfigService;
import org.apache.inlong.audit.service.EtlService;
+import org.apache.inlong.audit.utils.JdbcUtils;
+import org.apache.inlong.common.util.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.UUID;
+
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_SERVICE_ID;
+
public class Application {
private static final Logger LOGGER =
LoggerFactory.getLogger(Application.class);
private static final EtlService etlService = new EtlService();
private static final ApiService apiService = new ApiService();
+ private static Selector selector;
+ private static boolean running = true;
public static void main(String[] args) {
try {
@@ -41,7 +57,11 @@ public class Application {
// Api service provide audit data interface to external services
apiService.start();
+ // Cleanup resource when program exit.
stopIfKilled();
+
+ // Waiting to become the leader node.
+ waitToBeLeader();
} catch (Exception ex) {
LOGGER.error("Running exception: ", ex);
}
@@ -50,12 +70,58 @@ public class Application {
private static void stopIfKilled() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
+ running = false;
etlService.stop();
apiService.stop();
+ selector.close();
LOGGER.info("Stopping gracefully");
} catch (Exception ex) {
LOGGER.error("Stop error: ", ex);
}
}));
}
+
+ /**
+ * Init selector
+ */
+ private static void initSelector() {
+ JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
+ String leaderId = NetworkUtils.getLocalIp() + "-" + UUID.randomUUID();
+ LOGGER.info("Init selector. Leader id is :{}", leaderId);
+ if (selector == null) {
+ SelectorConfig electorConfig = new SelectorConfig(
+ Configuration.getInstance().get(KEY_SELECTOR_SERVICE_ID,
DEFAULT_SELECTOR_SERVICE_ID),
+ leaderId,
+ jdbcConfig.getJdbcUrl(),
+ jdbcConfig.getUserName(), jdbcConfig.getPassword(),
jdbcConfig.getDriverClass());
+
+ selector = SelectorFactory.getNewElector(electorConfig);
+ try {
+ selector.init();
+ } catch (Exception e) {
+ LOGGER.error("Init selector has exception:", e);
+ }
+ }
+ }
+
+ /**
+ * Wait to be leader
+ */
+ private static void waitToBeLeader() {
+ initSelector();
+ while (running) {
+ try {
+
Thread.sleep(Configuration.getInstance().get(KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS,
+ DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS));
+ } catch (Exception e) {
+ LOGGER.error("Wait to be Leader has exception! lost
Leadership!", e);
+ }
+
+ if (selector.isLeader()) {
+ LOGGER.info("I get Leadership! Begin to aggregate clickhouse
data to mysql");
+ etlService.auditSourceToMysql();
+ return;
+ }
+ }
+ }
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
index 889cf87cb2..14340be873 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
@@ -165,9 +165,9 @@ public class DBDataSource {
try {
int result = executeUpdate(selectorSql);
if (result == 2) {
- LOGGER.info("{} get the leader",
selectorConfig.getLeaderId());
+ LOGGER.info("{} become the leader",
selectorConfig.getLeaderId());
} else if (result == 1) {
- LOGGER.info("{} do not get the leader",
selectorConfig.getLeaderId());
+ LOGGER.info("{} waiting to be the leader",
selectorConfig.getLeaderId());
}
} catch (Exception exception) {
LOGGER.error("Exception: {} ,sql:{}", exception.getMessage(),
selectorSql);
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
index 3b7c2a8264..d8dcb7ebbf 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
@@ -26,7 +26,6 @@ import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.ApiType;
import org.apache.inlong.audit.entities.AuditCycle;
import org.apache.inlong.audit.entities.StatData;
-import org.apache.inlong.audit.utils.CacheUtils;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
@@ -44,9 +43,9 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import static org.apache.inlong.audit.config.OpenApiConstants.BIND_PORT;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_BACKLOG_SIZE;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_DAY_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IDS_PATH;
@@ -54,8 +53,9 @@ import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IP
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_HOUR_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTES_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_PARAMS_AUDIT_TAG;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_POOL_SIZE;
import static
org.apache.inlong.audit.config.OpenApiConstants.HTTP_RESPOND_CODE;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_BACKLOG_SIZE;
import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_DAY_PATH;
@@ -63,12 +63,13 @@ import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IDS_PA
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IPS_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_HOUR_PATH;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTES_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_POOL_SIZE;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT;
import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_CYCLE;
import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_ID;
import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_TAG;
@@ -87,7 +88,6 @@ import static
org.apache.inlong.audit.entities.ApiType.MINUTES;
public class ApiService {
private static final Logger LOGGER =
LoggerFactory.getLogger(ApiService.class);
-
public void start() {
initHttpServer();
}
@@ -97,11 +97,12 @@ public class ApiService {
}
private void initHttpServer() {
+ int bindPort =
Configuration.getInstance().get(KEY_HTTP_SERVER_BIND_PORT,
DEFAULT_HTTP_SERVER_BIND_PORT);
try {
- HttpServer server = HttpServer.create(new
InetSocketAddress(BIND_PORT),
+ HttpServer server = HttpServer.create(new
InetSocketAddress(bindPort),
Configuration.getInstance().get(KEY_API_BACKLOG_SIZE,
DEFAULT_API_BACKLOG_SIZE));
server.setExecutor(Executors.newFixedThreadPool(
- Configuration.getInstance().get(KEY_API_POOL_SIZE,
DEFAULT_POOL_SIZE)));
+ Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE,
DEFAULT_API_THREAD_POOL_SIZE)));
server.createContext(Configuration.getInstance().get(KEY_API_DAY_PATH,
DEFAULT_API_DAY_PATH),
new AuditHandler(DAY));
server.createContext(Configuration.getInstance().get(KEY_API_HOUR_PATH,
DEFAULT_API_HOUR_PATH),
@@ -113,15 +114,19 @@ public class ApiService {
server.createContext(Configuration.getInstance().get(KEY_API_GET_IPS_PATH,
DEFAULT_API_GET_IPS_PATH),
new AuditHandler(GET_IPS));
server.start();
+ LOGGER.info("Init http server success. Bind port is: {}",
bindPort);
} catch (Exception e) {
LOGGER.error("Init http server has exception!", e);
}
}
- static class AuditHandler implements HttpHandler, AutoCloseable {
+ class AuditHandler implements HttpHandler, AutoCloseable {
private final ApiType apiType;
private final RateLimiter limiter;
+ private final ExecutorService executorService =
+ Executors.newFixedThreadPool(
+
Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE,
DEFAULT_API_THREAD_POOL_SIZE));
public AuditHandler(ApiType apiType) {
this.apiType = apiType;
@@ -134,26 +139,32 @@ public class ApiService {
if (null != limiter) {
limiter.acquire();
}
+ executorService.execute(new Runnable() {
- try (OutputStream os = exchange.getResponseBody()) {
- JsonObject responseJson = new JsonObject();
-
- Map<String, String> params =
parseRequestURI(exchange.getRequestURI().getQuery());
- if (checkNecessaryParams(params)) {
- handleLegalParams(responseJson, params);
- } else {
- handleInvalidParams(responseJson, exchange);
- }
+ @Override
+ public void run() {
+ try (OutputStream os = exchange.getResponseBody()) {
+ JsonObject responseJson = new JsonObject();
+ Map<String, String> params =
parseRequestURI(exchange.getRequestURI().getQuery());
+ if (checkNecessaryParams(params)) {
+ handleLegalParams(responseJson, params);
+ } else {
+ handleInvalidParams(responseJson, exchange);
+ }
- byte[] bytes =
responseJson.toString().getBytes(StandardCharsets.UTF_8);
+ byte[] bytes =
responseJson.toString().getBytes(StandardCharsets.UTF_8);
- exchange.getResponseHeaders().set(KEY_HTTP_HEADER_CONTENT_TYPE,
- VALUE_HTTP_HEADER_CONTENT_TYPE);
- exchange.sendResponseHeaders(HTTP_RESPOND_CODE, bytes.length);
- os.write(bytes);
- } catch (Exception e) {
- LOGGER.error("Audit handler has exception!", e);
- }
+
exchange.getResponseHeaders().set(KEY_HTTP_HEADER_CONTENT_TYPE,
+ VALUE_HTTP_HEADER_CONTENT_TYPE);
+ exchange.sendResponseHeaders(HTTP_RESPOND_CODE,
bytes.length);
+ os.write(bytes);
+ } catch (Exception e) {
+ LOGGER.error("Audit handler has exception!", e);
+ } finally {
+ exchange.close();
+ }
+ }
+ });
}
private Map<String, String> parseRequestURI(String query) {
@@ -209,42 +220,47 @@ public class ApiService {
private void handleLegalParams(JsonObject responseJson, Map<String,
String> params) {
List<StatData> statData = null;
- switch (apiType) {
- case MINUTES:
- statData = handleMinutesApi(params);
- break;
- case HOUR:
- String cacheKey =
-
CacheUtils.buildCacheKey(params.get(PARAMS_START_TIME),
params.get(PARAMS_INLONG_GROUP_Id),
- params.get(PARAMS_INLONG_STREAM_Id),
params.get(PARAMS_AUDIT_ID),
- params.get(PARAMS_AUDIT_TAG));
- statData = HourCache.getInstance().getData(cacheKey);
- break;
- case DAY:
- statData = DayCache.getInstance().getData(
- params.get(PARAMS_START_TIME),
- params.get(PARAMS_END_TIME),
- params.get(PARAMS_INLONG_GROUP_Id),
- params.get(PARAMS_INLONG_STREAM_Id),
- params.get(PARAMS_AUDIT_ID));
- break;
- case GET_IDS:
- statData = RealTimeQuery.getInstance().queryIdsByIp(
- params.get(PARAMS_START_TIME),
- params.get(PARAMS_END_TIME),
- params.get(PARAMS_IP),
- params.get(PARAMS_AUDIT_ID));
- break;
- case GET_IPS:
- statData = RealTimeQuery.getInstance().queryIpsById(
- params.get(PARAMS_START_TIME),
- params.get(PARAMS_END_TIME),
- params.get(PARAMS_INLONG_GROUP_Id),
- params.get(PARAMS_INLONG_STREAM_Id),
- params.get(PARAMS_AUDIT_ID));
- break;
- default:
- LOGGER.error("Unsupported interface type! type is {}",
apiType);
+ try {
+ switch (apiType) {
+ case MINUTES:
+ statData = handleMinutesApi(params);
+ break;
+ case HOUR:
+ statData =
HourCache.getInstance().getData(params.get(PARAMS_START_TIME),
+ params.get(PARAMS_END_TIME),
+ params.get(PARAMS_INLONG_GROUP_Id),
+ params.get(PARAMS_INLONG_STREAM_Id),
+ params.get(PARAMS_AUDIT_ID),
+ params.get(PARAMS_AUDIT_TAG));
+ break;
+ case DAY:
+ statData = DayCache.getInstance().getData(
+ params.get(PARAMS_START_TIME),
+ params.get(PARAMS_END_TIME),
+ params.get(PARAMS_INLONG_GROUP_Id),
+ params.get(PARAMS_INLONG_STREAM_Id),
+ params.get(PARAMS_AUDIT_ID));
+ break;
+ case GET_IDS:
+ statData = RealTimeQuery.getInstance().queryIdsByIp(
+ params.get(PARAMS_START_TIME),
+ params.get(PARAMS_END_TIME),
+ params.get(PARAMS_IP),
+ params.get(PARAMS_AUDIT_ID));
+ break;
+ case GET_IPS:
+ statData = RealTimeQuery.getInstance().queryIpsById(
+ params.get(PARAMS_START_TIME),
+ params.get(PARAMS_END_TIME),
+ params.get(PARAMS_INLONG_GROUP_Id),
+ params.get(PARAMS_INLONG_STREAM_Id),
+ params.get(PARAMS_AUDIT_ID));
+ break;
+ default:
+ LOGGER.error("Unsupported interface type! type is {}",
apiType);
+ }
+ } catch (Exception exception) {
+ LOGGER.error("Handle legal params has exception ", exception);
}
if (null == statData)
@@ -257,9 +273,6 @@ public class ApiService {
}
private List<StatData> handleMinutesApi(Map<String, String> params) {
- String cacheKey =
CacheUtils.buildCacheKey(params.get(PARAMS_START_TIME),
- params.get(PARAMS_INLONG_GROUP_Id),
- params.get(PARAMS_INLONG_STREAM_Id),
params.get(PARAMS_AUDIT_ID), params.get(PARAMS_AUDIT_TAG));
int cycle = Integer.parseInt(params.get(PARAMS_AUDIT_CYCLE));
List<StatData> statData = null;
switch (AuditCycle.fromInt(cycle)) {
@@ -271,10 +284,18 @@ public class ApiService {
params.get(PARAMS_AUDIT_ID));
break;
case MINUTE_10:
- statData = TenMinutesCache.getInstance().getData(cacheKey);
+ statData =
TenMinutesCache.getInstance().getData(params.get(PARAMS_START_TIME),
+ params.get(PARAMS_END_TIME),
+ params.get(PARAMS_INLONG_GROUP_Id),
+ params.get(PARAMS_INLONG_STREAM_Id),
params.get(PARAMS_AUDIT_ID),
+ params.get(PARAMS_AUDIT_TAG));
break;
case MINUTE_30:
- statData = HalfHourCache.getInstance().getData(cacheKey);
+ statData =
HalfHourCache.getInstance().getData(params.get(PARAMS_START_TIME),
+ params.get(PARAMS_END_TIME),
+ params.get(PARAMS_INLONG_GROUP_Id),
+ params.get(PARAMS_INLONG_STREAM_Id),
params.get(PARAMS_AUDIT_ID),
+ params.get(PARAMS_AUDIT_TAG));
break;
default:
LOGGER.error("Unsupported cycle type! cycle is {}", cycle);
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
index 0e0b7ff4d0..95e1cddd75 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
@@ -26,29 +26,22 @@ import org.apache.inlong.audit.entities.AuditCycle;
import org.apache.inlong.audit.entities.JdbcConfig;
import org.apache.inlong.audit.entities.SinkConfig;
import org.apache.inlong.audit.entities.SourceConfig;
-import org.apache.inlong.audit.selector.api.Selector;
-import org.apache.inlong.audit.selector.api.SelectorConfig;
-import org.apache.inlong.audit.selector.api.SelectorFactory;
import org.apache.inlong.audit.sink.CacheSink;
import org.apache.inlong.audit.sink.JdbcSink;
import org.apache.inlong.audit.source.JdbcSource;
import org.apache.inlong.audit.utils.JdbcUtils;
-import org.apache.inlong.common.util.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
-import java.util.UUID;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATA_QUEUE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATA_QUEUE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_SERVICE_ID;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_DAILY_STAT_BACK_TIMES;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_REALTIME_STAT_BACK_TIMES;
@@ -77,11 +70,8 @@ public class EtlService {
private CacheSink cacheSinkOfTenMinutesCache;
private CacheSink cacheSinkOfHalfHourCache;
private CacheSink cacheSinkOfHourCache;
-
private final int queueSize;
private final int statBackTimes;
- private static Selector selector;
- private boolean running = true;
private final String serviceId;
public EtlService() {
@@ -100,9 +90,6 @@ public class EtlService {
mysqlToTenMinutesCache();
mysqlToHalfHourCache();
mysqlToHourCache();
-
- initSelector();
- waitToBeLeader();
}
/**
@@ -166,7 +153,7 @@ public class EtlService {
* The default audit data cycle is 5 minutes,and stored in a temporary
table.
* Support multiple audit source clusters.
*/
- private void auditSourceToMysql() {
+ public void auditSourceToMysql() {
DataQueue dataQueue = new DataQueue(queueSize);
List<JdbcConfig> sourceList =
ConfigService.getInstance().getAuditSourceByServiceId(serviceId);
for (JdbcConfig jdbcConfig : sourceList) {
@@ -229,64 +216,22 @@ public class EtlService {
jdbcConfig.getDriverClass(),
jdbcConfig.getJdbcUrl(),
jdbcConfig.getUserName(),
- jdbcConfig.getPassword());
- }
-
- /**
- * Init selector
- */
- private void initSelector() {
- JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
- String leaderId = NetworkUtils.getLocalIp() + "-" + UUID.randomUUID();
- LOGGER.info("Init selector. Leader id is :{}", leaderId);
- if (selector == null) {
- SelectorConfig electorConfig = new SelectorConfig(
- serviceId,
- leaderId,
- jdbcConfig.getJdbcUrl(),
- jdbcConfig.getUserName(), jdbcConfig.getPassword(),
jdbcConfig.getDriverClass());
-
- selector = SelectorFactory.getNewElector(electorConfig);
- try {
- selector.init();
- } catch (Exception e) {
- LOGGER.error("Init selector has exception:", e);
- }
- }
- }
-
- /**
- * Wait to be leader
- */
- public void waitToBeLeader() {
- while (running) {
- try {
-
Thread.sleep(Configuration.getInstance().get(KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS,
- DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS));
- } catch (Exception e) {
- LOGGER.error("Wait to be Leader has exception! lost
Leadership!", e);
- }
-
- if (selector.isLeader()) {
- LOGGER.info("I get Leadership! Begin to aggregate clickhouse
data to mysql");
- auditSourceToMysql();
- return;
- }
- }
+ jdbcConfig.getPassword(),
+ true);
}
/**
* Stop the etl service,and destroy related resources.
*/
public void stop() {
- running = false;
mysqlSourceOfTemp.destroy();
mysqlSinkOfDay.destroy();
for (JdbcSource source : auditJdbcSources) {
source.destroy();
}
- mysqlSinkOfTemp.destroy();
+ if (null != mysqlSinkOfTemp)
+ mysqlSinkOfTemp.destroy();
mysqlSourceOfTenMinutesCache.destroy();
mysqlSourceOfHalfHourCache.destroy();
@@ -295,7 +240,5 @@ public class EtlService {
cacheSinkOfTenMinutesCache.destroy();
cacheSinkOfHalfHourCache.destroy();
cacheSinkOfHourCache.destroy();
-
- selector.close();
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
index 6f308ac3a9..db3e76a143 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
@@ -137,7 +137,7 @@ public class JdbcSink implements AutoCloseable {
HikariConfig config = new HikariConfig();
config.setDriverClassName(sinkConfig.getDriverClassName());
config.setJdbcUrl(sinkConfig.getJdbcUrl());
- config.setUsername(sinkConfig.getUsername());
+ config.setUsername(sinkConfig.getUserName());
config.setPassword(sinkConfig.getPassword());
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
DEFAULT_CONNECTION_TIMEOUT));
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
index 4be0e7cb22..5964f8f84f 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
+import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
@@ -82,8 +83,6 @@ public class JdbcSource {
private DataSource dataSource;
private String querySql;
private SourceConfig sourceConfig;
-
- private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final int MAX_MINUTE = 60;
public JdbcSource(DataQueue dataQueue, SourceConfig sourceConfig) {
@@ -281,6 +280,11 @@ public class JdbcSource {
pstat.setString(1, startTime);
pstat.setString(2, endTime);
pstat.setString(3, auditId);
+ if (sourceConfig.isNeedJoin()) {
+ pstat.setString(4, startTime);
+ pstat.setString(5, endTime);
+ pstat.setString(6, auditId);
+ }
try (ResultSet resultSet = pstat.executeQuery()) {
while (resultSet.next()) {
StatData data = new StatData();