This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3d4716983b [INLONG-9977][Audit] Audit-service increases the
capabilities of openapi (#9978)
3d4716983b is described below
commit 3d4716983b93df61dd5fe88a8de953b85c79a413
Author: doleyzi <[email protected]>
AuthorDate: Thu Apr 11 21:36:34 2024 +0800
[INLONG-9977][Audit] Audit-service increases the capabilities of openapi
(#9978)
---
.../apache/inlong/audit/cache/AbstractCache.java | 22 ++-
.../org/apache/inlong/audit/cache/DayCache.java | 47 ++---
.../org/apache/inlong/audit/channel/DataQueue.java | 4 +-
.../inlong/audit/config/ConfigConstants.java | 61 ++----
.../apache/inlong/audit/config/Configuration.java | 16 +-
.../inlong/audit/config/OpenApiConstants.java | 63 ++++++
.../apache/inlong/audit/config/SqlConstants.java | 43 ++--
.../JdbcConfig.java} | 19 +-
.../org/apache/inlong/audit/entities/StatData.java | 1 +
.../org/apache/inlong/audit/main/Application.java | 57 ++++++
...erImpl.java => SelectorChangeListenerImpl.java} | 8 +-
.../audit/selector/api/SelectorChangeListener.java | 2 +-
.../inlong/audit/selector/api/SelectorConfig.java | 8 +-
.../inlong/audit/selector/api/SelectorFactory.java | 4 +-
.../inlong/audit/selector/impl/DBDataSource.java | 54 ++---
.../inlong/audit/selector/impl/SelectorImpl.java | 14 +-
.../inlong/audit/selector/task/DBMonitorTask.java | 22 +--
.../apache/inlong/audit/service/ApiService.java | 220 +++++++++++++++++++++
.../apache/inlong/audit/service/EtlService.java | 149 ++++++++------
.../org/apache/inlong/audit/sink/CacheSink.java | 4 +-
.../org/apache/inlong/audit/sink/JdbcSink.java | 4 +-
.../org/apache/inlong/audit/source/JdbcSource.java | 49 ++---
.../org/apache/inlong/audit/utils/JdbcUtils.java | 84 ++++++++
inlong-audit/bin/service-start.sh | 85 ++++++++
inlong-audit/bin/service-stop.sh | 42 ++++
inlong-audit/conf/audit-service.properties | 40 ++++
inlong-audit/pom.xml | 1 +
.../apcache_inlong_audit_aggregate.sql | 12 ++
28 files changed, 868 insertions(+), 267 deletions(-)
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
index a492a9269b..7a3ac871bd 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
@@ -20,30 +20,30 @@ package org.apache.inlong.audit.cache;
import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.AuditCycle;
import org.apache.inlong.audit.entities.StatData;
-import org.apache.inlong.audit.source.JdbcSource;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_API_CACHE_MAX_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_API_CACHE_EXPIRED_HOURS;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_API_CACHE_MAX_SIZE;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_HOURS;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_MAX_SIZE;
/**
* Abstract cache.
*/
public class AbstractCache {
- private static final Logger LOG =
LoggerFactory.getLogger(JdbcSource.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractCache.class);
protected final Cache<String, StatData> cache;
protected final ScheduledExecutorService monitorTimer =
Executors.newSingleThreadScheduledExecutor();
protected AuditCycle auditCycle;
@@ -82,7 +82,11 @@ public class AbstractCache {
* @return
*/
public List<StatData> getData(String key) {
- return Arrays.asList(cache.getIfPresent(key));
+ StatData statData = cache.getIfPresent(key);
+ if (null == statData) {
+ return new LinkedList<>();
+ }
+ return Collections.singletonList(statData);
}
/**
@@ -97,6 +101,6 @@ public class AbstractCache {
* Monitor
*/
private void monitor() {
- LOG.info("{} api local cache size={}", auditCycle,
cache.estimatedSize());
+ LOGGER.info("{} api local cache size={}", auditCycle,
cache.estimatedSize());
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java
index cd8ceda479..06ca4b75ae 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java
@@ -18,7 +18,9 @@
package org.apache.inlong.audit.cache;
import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.JdbcConfig;
import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.utils.JdbcUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
@@ -33,7 +35,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
-import java.util.Objects;
import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
@@ -44,11 +45,6 @@ import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_C
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER;
-import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
@@ -61,7 +57,7 @@ import static
org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY
*/
public class DayCache implements AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(DayCache.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DayCache.class);
private static volatile DayCache dayCache = null;
private DataSource dataSource;
@@ -114,21 +110,21 @@ public class DayCache implements AutoCloseable {
try (ResultSet resultSet = pstat.executeQuery()) {
while (resultSet.next()) {
StatData data = new StatData();
- data.setLogTs(startTime);
- data.setInlongGroupId(resultSet.getString(1));
- data.setInlongStreamId(resultSet.getString(2));
- data.setAuditId(resultSet.getString(3));
- data.setAuditTag(resultSet.getString(4));
- data.setCount(resultSet.getLong(5));
- data.setSize(resultSet.getLong(6));
- data.setDelay(resultSet.getLong(7));
+ data.setLogTs(resultSet.getString(1));
+ data.setInlongGroupId(resultSet.getString(2));
+ data.setInlongStreamId(resultSet.getString(3));
+ data.setAuditId(resultSet.getString(4));
+ data.setAuditTag(resultSet.getString(5));
+ data.setCount(resultSet.getLong(6));
+ data.setSize(resultSet.getLong(7));
+ data.setDelay(resultSet.getLong(8));
result.add(data);
}
} catch (SQLException sqlException) {
- LOG.error("Query has SQL exception! ", sqlException);
+ LOGGER.error("Query has SQL exception! ", sqlException);
}
} catch (Exception exception) {
- LOG.error("Query has exception! ", exception);
+ LOGGER.error("Query has exception! ", exception);
}
return result;
}
@@ -137,20 +133,13 @@ public class DayCache implements AutoCloseable {
* Create data source
*/
private void createDataSource() {
- String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER,
KEY_DEFAULT_MYSQL_DRIVER);
- String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL);
- String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME);
- String passWord = Configuration.getInstance().get(KEY_MYSQL_PASSWORD);
- assert (Objects.nonNull(driver)
- && Objects.nonNull(jdbcUrl)
- && Objects.nonNull(userName)
- && Objects.nonNull(passWord));
+ JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
HikariConfig config = new HikariConfig();
- config.setDriverClassName(driver);
- config.setJdbcUrl(jdbcUrl);
- config.setUsername(userName);
- config.setPassword(passWord);
+ config.setDriverClassName(jdbcConfig.getDriverClass());
+ config.setJdbcUrl(jdbcConfig.getJdbcUrl());
+ config.setUsername(jdbcConfig.getUserName());
+ config.setPassword(jdbcConfig.getPassword());
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
DEFAULT_CONNECTION_TIMEOUT));
config.addDataSourceProperty(CACHE_PREP_STMTS,
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
index 3169e9d377..31217c586b 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/channel/DataQueue.java
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
*/
public class DataQueue {
- private static final Logger LOG = LoggerFactory.getLogger(DataQueue.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataQueue.class);
private final LinkedBlockingQueue<StatData> queue;
@@ -65,6 +65,6 @@ public class DataQueue {
if (queue != null) {
queue.clear();
}
- LOG.info("destroy channel!");
+ LOGGER.info("destroy channel!");
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
index 64797df982..8209a1c814 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
@@ -37,46 +37,18 @@ public class ConfigConstants {
public static final String KEY_MYSQL_PASSWORD = "mysql.password";
// Time config
- public static final String KEY_QUERY_SQL_TIME_OUT = "query.sql.timeout";
- public static final int DEFAULT_QUERY_SQL_TIME_OUT = 300;
- public static final String KEY_JDBC_TIME_OUT = "jdbc.timeout.second";
- public static final int DEFAULT_JDBC_TIME_OUT = 300;
public static final String KEY_DATASOURCE_CONNECTION_TIMEOUT =
"datasource.connection.timeout.ms";
public static final int DEFAULT_CONNECTION_TIMEOUT = 1000 * 60 * 5;
- public static final String KEY_API_RESPONSE_TIMEOUT =
"api.response.timeout";
- public static final int DEFAULT_API_TIMEOUT = 30;
public static final String KEY_QUEUE_PULL_TIMEOUT =
"queue.pull.timeout.ms";
public static final int DEFAULT_QUEUE_PULL_TIMEOUT = 1000;
// Interval config
- public static final String KEY_SOURCE_CLICKHOUSE_STAT_INTERVAL =
"source.clickhouse.stat.interval.minute";
- public static final int DEFAULT_SOURCE_CLICKHOUSE_STAT_INTERVAL = 1;
public static final String KEY_SOURCE_DB_STAT_INTERVAL =
"source.db.stat.interval.minute";
public static final int DEFAULT_SOURCE_DB_STAT_INTERVAL = 1;
public static final String KEY_SOURCE_DB_SINK_INTERVAL =
"sink.db.interval.ms";
public static final int DEFAULT_SOURCE_DB_SINK_INTERVAL = 100;
public static final String KEY_SOURCE_DB_SINK_BATCH = "sink.db.batch";
public static final int DEFAULT_SOURCE_DB_SINK_BATCH = 1000;
- public static final String KEY_SOURCE_DB_SINK_INTERNAL =
"sink.db.internal.ms";
- public static final int DEFAULT_SOURCE_DB_SINK_INTERNAL = 100;
-
- // Api config
- public static final String KEY_HOUR_API_PATH = "hour.api.path";
- public static final String DEFAULT_HOUR_API_PATH = "/audit/query/hour";
- public static final String KEY_DAY_API_PATH = "day.api.path";
- public static final String DEFAULT_DAY_API_PATH = "/audit/query/day";
- public static final String KEY_DAY_API_TABLE = "day.api.table";
- public static final String DEFAULT_DAY_API_TABLE = "audit_data_day";
- public static final String KEY_MINUTE_API_TABLE = "minute.api.table";
- public static final String DEFAULT_MINUTE_API_TABLE = "audit_data_temp";
- public static final String KEY_MINUTE_10_API_PATH = "minute.10.api.path";
- public static final String DEFAULT_MINUTE_10_API_PATH =
"/audit/query/minute/10";
- public static final String KEY_MINUTE_30_API_PATH = "minute.30.api.path";
- public static final String DEFAULT_MINUTE_30_API_PATH =
"/audit/query/minute/30";
- public static final String KEY_API_POOL_SIZE = "api.pool.size";
- public static final int DEFAULT_POOL_SIZE = 10;
- public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
- public static final int DEFAULT_API_BACKLOG_SIZE = 100;
public static final String KEY_DATASOURCE_POOL_SIZE =
"datasource.pool.size";
public static final int DEFAULT_DATASOURCE_POOL_SIZE = 1000;
@@ -87,19 +59,14 @@ public class ConfigConstants {
public static final String DEFAULT_AUDIT_IDS = "3;4;5;6";
// Summary config
- public static final String KEY_REALTIME_SUMMARY_SOURCE_TABLE =
"realtime.summary.source.table";
- public static final String DEFAULT_REALTIME_SUMMARY_SOURCE_TABLE =
"audit_data";
- public static final String KEY_REALTIME_SUMMARY_SINK_TABLE =
"realtime.summary.sink.table";
- public static final String DEFAULT_REALTIME_SUMMARY_SINK_TABLE =
"audit_data_temp";
- public static final String KEY_REALTIME_SUMMARY_STAT_BACK_TIMES =
"realtime.summary.stat.back.times";
- public static final int DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES = 6;
-
- public static final String KEY_DAILY_SUMMARY_SOURCE_TABLE =
"daily.summary.source.table";
- public static final String DEFAULT_DAILY_SUMMARY_SOURCE_TABLE =
"audit_data_temp";
- public static final String KEY_DAILY_SUMMARY_SINK_TABLE =
"daily.summary.sink.table";
- public static final String DEFAULT_DAILY_SUMMARY_SINK_TABLE =
"audit_data_day";
- public static final String KEY_DAILY_SUMMARY_STAT_BACK_TIMES =
"daily.summary.stat.back.times";
- public static final int DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES = 2;
+ public static final String KEY_SUMMARY_REALTIME_STAT_BACK_TIMES =
"summary.realtime.stat.back.times";
+ public static final int DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES = 6;
+
+ public static final String KEY_SUMMARY_DAILY_STAT_BACK_TIMES =
"summary.daily.stat.back.times";
+ public static final int DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES = 2;
+
+ public static final String KEY_STAT_BACK_INITIAL_OFFSET =
"stat.back.initial.offset";
+ public static final int DEFAULT_STAT_BACK_INITIAL_OFFSET = 0;
// HA selector config
public static final String KEY_RELEASE_LEADER_INTERVAL =
"release.leader.interval";
@@ -107,6 +74,11 @@ public class ConfigConstants {
public static final String KEY_SELECTOR_THREAD_POOL_SIZE =
"selector.thread.pool.size";
public static final int DEFAULT_SELECTOR_THREAD_POOL_SIZE = 3;
+ public static final String KEY_SELECTOR_SERVICE_ID = "selector.service.id";
+ public static final String DEFAULT_SELECTOR_SERVICE_ID = "audit-service";
+ public static final String KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS =
"selector.follower.listen.cycle.ms";
+ public static final int DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS = 2000;
+
// HikariConfig
public static final String CACHE_PREP_STMTS = "cachePrepStmts";
public static final String PREP_STMT_CACHE_SIZE = "prepStmtCacheSize";
@@ -124,11 +96,4 @@ public class ConfigConstants {
public static final int MAX_INIT_COUNT = 2;
public static final int RANDOM_BOUND = 10;
- // Cache config
- public static final String KEY_API_CACHE_MAX_SIZE = "api.cache.max.size";
- public static final int DEFAULT_API_CACHE_MAX_SIZE = 50000000;
-
- public static final String KEY_API_CACHE_EXPIRED_HOURS =
"api.cache.expired.hours";
- public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
-
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
index c04aaf6aac..acfa076de2 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/Configuration.java
@@ -29,7 +29,7 @@ import java.util.Properties;
*/
public class Configuration {
- private static final Logger LOG =
LoggerFactory.getLogger(Configuration.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Configuration.class);
public static final String DEFAULT_CONFIG_FILE =
"conf/audit-service.properties";
private static volatile Configuration conf = null;
@@ -42,7 +42,7 @@ public class Configuration {
try (FileInputStream fileInputStream = new
FileInputStream(DEFAULT_CONFIG_FILE)) {
properties.load(fileInputStream);
} catch (Exception e) {
- LOG.error("Configuration has exception!", e);
+ LOGGER.error("Configuration has exception!", e);
}
}
@@ -87,6 +87,18 @@ public class Configuration {
return value == null ? defaultValue : (Integer) value;
}
+ /**
+ * Get double value
+ *
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public double get(String key, double defaultValue) {
+ Object value = properties.get(key);
+ return value == null ? defaultValue : (Double) value;
+ }
+
/**
* @param key
* @return
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
new file mode 100644
index 0000000000..8f6a656c17
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.config;
+
+/**
+ * Open api constants
+ */
+public class OpenApiConstants {
+
+ // Api config
+ public static final String KEY_API_HOUR_PATH = "api.hour.path";
+ public static final String DEFAULT_API_HOUR_PATH = "/audit/query/hour";
+ public static final String KEY_API_DAY_PATH = "api.day.path";
+ public static final String DEFAULT_API_DAY_PATH = "/audit/query/day";
+ public static final String KEY_API_MINUTE_10_PATH = "api.minute.10.path";
+ public static final String DEFAULT_API_MINUTE_10_PATH =
"/audit/query/minute/10";
+ public static final String KEY_API_MINUTE_30_PATH = "api.minute.30.path";
+ public static final String DEFAULT_API_MINUTE_30_PATH =
"/audit/query/minute/30";
+ public static final String KEY_API_POOL_SIZE = "api.pool.size";
+ public static final int DEFAULT_POOL_SIZE = 10;
+ public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
+ public static final int DEFAULT_API_BACKLOG_SIZE = 100;
+ public static final String KEY_API_REAL_LIMITER_QPS =
"api.real.limiter.qps";
+ public static final double DEFAULT_API_REAL_LIMITER_QPS = 100.0;
+
+ // Cache config
+ public static final String KEY_API_CACHE_MAX_SIZE = "api.cache.max.size";
+ public static final int DEFAULT_API_CACHE_MAX_SIZE = 50000000;
+
+ public static final String KEY_API_CACHE_EXPIRED_HOURS =
"api.cache.expired.hours";
+ public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
+
+ // Http config
+ public static final String START_TIME = "startTime";
+ public static final String END_TIME = "endTime";
+ public static final String AUDIT_ID = "auditId";
+ public static final String AUDIT_TAG = "auditTag";
+ public static final String INLONG_GROUP_Id = "inlongGroupId";
+ public static final String INLONG_STREAM_Id = "inlongStreamId";
+ public static final String KEY_HTTP_BODY_SUCCESS = "success";
+ public static final String KEY_HTTP_BODY_ERR_MSG = "errMsg";
+ public static final String KEY_HTTP_BODY_ERR_DATA = "data";
+ public static final String KEY_HTTP_HEADER_CONTENT_TYPE = "Content-Type";
+ public static final String VALUE_HTTP_HEADER_CONTENT_TYPE =
"application/json;charset=utf-8";
+ public static final int BIND_PORT = 80;
+ public static final int HTTP_RESPOND_CODE = 200;
+ public static final String DEFAULT_AUDIT_TAG = "";
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index 31d73aad81..04b4d8a4ed 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -37,34 +37,25 @@ public class SqlConstants {
// ClickHouse query sql
public static final String KEY_CLICKHOUSE_SOURCE_QUERY_SQL =
"clickhouse.source.query.sql";
public static final String DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL =
- "SELECT inlong_group_id, inlong_stream_id, audit_id, audit_tag\n" +
- " , sum(cnt) AS cnt, sum(size) AS size\n" +
- " , sum(delay) AS delay\n" +
+ "SELECT inlong_group_id, inlong_stream_id, audit_id\n" +
+ "\t, audit_tag, cnt, size, delay,MAX(audit_version)\n" +
"FROM (\n" +
- " SELECT max(audit_version), ip, docker_id,
thread_id\n" +
- " , inlong_group_id, inlong_stream_id, audit_id,
audit_tag, cnt\n" +
- " , size, delay\n" +
- " FROM (\n" +
- " SELECT audit_version, ip, docker_id, thread_id,
inlong_group_id\n" +
- " , inlong_stream_id, audit_id, audit_tag,
sum(count) AS cnt\n" +
- " , sum(size) AS size, sum(delay) AS delay\n" +
- " FROM (\n" +
- " SELECT audit_version, docker_id, thread_id,
sdk_ts, packet_id\n" +
- " , log_ts, ip, inlong_group_id,
inlong_stream_id, audit_id\n" +
- " , audit_tag, count, size, delay\n" +
- " FROM audit_data \n" +
- " WHERE log_ts BETWEEN ? AND ? \n" +
- " AND audit_id = ? \n" +
- " GROUP BY audit_version, docker_id, thread_id,
sdk_ts, packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id,
audit_tag, count, size, delay\n"
- +
- " ) t1\n" +
- " GROUP BY audit_version, ip, docker_id, thread_id,
inlong_group_id, inlong_stream_id, audit_id, audit_tag\n"
+ "\tSELECT audit_version, inlong_group_id,
inlong_stream_id, audit_id, audit_tag\n" +
+ "\t\t, SUM(count) AS cnt, SUM(size) AS size\n" +
+ "\t\t, SUM(delay) AS delay\n" +
+ "\tFROM (\n" +
+ "\t\tSELECT audit_version, docker_id, thread_id, sdk_ts,
packet_id\n" +
+ "\t\t\t, log_ts, ip, inlong_group_id, inlong_stream_id,
audit_id\n" +
+ "\t\t\t, audit_tag, count, size, delay\n" +
+ "\t\tFROM audit_data\n" +
+ "\t\tWHERE log_ts BETWEEN ? AND ? \n" +
+ "\t\t\tAND audit_id = ? \n" +
+ "\t\tGROUP BY audit_version, docker_id, thread_id, sdk_ts,
packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id, audit_tag,
count, size, delay\n"
+
- " ) t2\n" +
- " GROUP BY ip, docker_id, thread_id, inlong_group_id,
inlong_stream_id, audit_id, audit_tag, cnt, size, delay\n"
- +
- ") t3\n" +
- "GROUP BY inlong_group_id, inlong_stream_id, audit_id,
audit_tag";
+ "\t) t1\n" +
+ "\tGROUP BY audit_version, inlong_group_id,
inlong_stream_id, audit_id, audit_tag\n" +
+ ") t2\n" +
+ "GROUP BY inlong_group_id, inlong_stream_id, audit_id,
audit_tag, cnt, size, delay";
// Mysql query sql
public static final String KEY_MYSQL_SOURCE_QUERY_TEMP_SQL =
"mysql.query.temp.sql";
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/JdbcConfig.java
similarity index 77%
copy from
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
copy to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/JdbcConfig.java
index f914269150..ae9487cbbe 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/JdbcConfig.java
@@ -15,12 +15,17 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.selector.api;
+package org.apache.inlong.audit.entities;
-/**
- * Selector change listener
- */
-public abstract interface SelectorChangeListener {
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class JdbcConfig {
- public abstract void leaderChanged(boolean paramBoolean);
-}
\ No newline at end of file
+ String driverClass;
+ String JdbcUrl;
+ String userName;
+ String password;
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
index b5f450b27d..c6faed687c 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
@@ -24,6 +24,7 @@ import java.sql.Timestamp;
@Data
public class StatData {
+ private String auditVersion;
private String logTs;
private String inlongGroupId;
private String inlongStreamId;
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
new file mode 100644
index 0000000000..e2cace012c
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.main;
+
+import org.apache.inlong.audit.service.ApiService;
+import org.apache.inlong.audit.service.EtlService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Application {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Application.class);
+ private static final EtlService etlService = new EtlService();
+ private static final ApiService apiService = new ApiService();
+
+ public static void main(String[] args) {
+ try {
+ // Etl service aggregate the data from the data source and store
the aggregated data to the target storage
+ etlService.start();
+
+ // Api service provide audit data interface to external services
+ apiService.start();
+
+ stopIfKilled();
+ } catch (Exception ex) {
+ LOGGER.error("Running exception: ", ex);
+ }
+ }
+
+ private static void stopIfKilled() {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ etlService.stop();
+ apiService.stop();
+ LOGGER.info("Stopping gracefully");
+ } catch (Exception ex) {
+ LOGGER.error("Stop error: ", ex);
+ }
+ }));
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/ElectorChangeListenerImpl.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/SelectorChangeListenerImpl.java
similarity index 79%
rename from
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/ElectorChangeListenerImpl.java
rename to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/SelectorChangeListenerImpl.java
index d579250975..f9ff758d68 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/ElectorChangeListenerImpl.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/SelectorChangeListenerImpl.java
@@ -23,13 +23,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Elector change listener impl
+ * Selector change listener impl
*/
-public class ElectorChangeListenerImpl implements SelectorChangeListener {
+public class SelectorChangeListenerImpl implements SelectorChangeListener {
- private static final Logger logger =
LoggerFactory.getLogger(ElectorChangeListenerImpl.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SelectorChangeListenerImpl.class);
public void leaderChanged(boolean currentNodeIsLeader) {
- logger.info("LeaderChanged {}:", currentNodeIsLeader);
+ LOGGER.info("Leader changed {}:", currentNodeIsLeader);
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
index f914269150..064b84ee2e 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorChangeListener.java
@@ -22,5 +22,5 @@ package org.apache.inlong.audit.selector.api;
*/
public abstract interface SelectorChangeListener {
- public abstract void leaderChanged(boolean paramBoolean);
+ public abstract void leaderChanged(boolean currentNodeIsLeader);
}
\ No newline at end of file
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
index b4dd399021..5110c7b69a 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorConfig.java
@@ -31,7 +31,7 @@ import java.util.Objects;
@Data
public class SelectorConfig {
- private static final Logger logger =
LoggerFactory.getLogger(SelectorConfig.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SelectorConfig.class);
public static final String MONITOR_COMMON_NAME = "audit";
private final String serviceId;
private final String leaderId;
@@ -41,7 +41,7 @@ public class SelectorConfig {
private String dbUrl;
private String dbUser;
private String dbPasswd;
- private String electorDbName = "leader_election";
+ private String selectorDbName = "leader_selector";
private int leaderTimeout = 20;
private int tryToBeLeaderInterval = 5;
private int dbMonitorRunInterval = 20;
@@ -52,7 +52,7 @@ public class SelectorConfig {
private String cachePrepStmts = "true";
private int prepStmtCacheSize = 250;
private int prepStmtCacheSqlLimit = 2048;
- private String monitorName = "elector_leader_state";
+ private String monitorName = "selector_leader_state";
private String ip;
private SelectorChangeListener selectorChangeListener;
@@ -79,7 +79,7 @@ public class SelectorConfig {
try {
ip = InetAddress.getLocalHost().getHostAddress();
} catch (Exception e) {
- logger.error("Get local ip has exception:{}", e.getMessage());
+ LOGGER.error("Get local ip has exception:{}", e.getMessage());
ip = "N/A";
}
return ip;
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
index d7a38fbf39..3f107e48c3 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/api/SelectorFactory.java
@@ -24,7 +24,7 @@ import org.apache.inlong.audit.selector.impl.SelectorImpl;
*/
public class SelectorFactory {
- public static Selector getNewElector(SelectorConfig electorConfig) {
- return new SelectorImpl(electorConfig);
+ public static Selector getNewElector(SelectorConfig selectorConfig) {
+ return new SelectorImpl(selectorConfig);
}
}
\ No newline at end of file
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
index 2751321345..889cf87cb2 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class DBDataSource {
- private static final Logger logger =
LoggerFactory.getLogger(DBDataSource.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DBDataSource.class);
private String selectorSql = SqlConstants.SELECTOR_SQL;
private String replaceLeaderSql = SqlConstants.REPLACE_LEADER_SQL;
private String reLeaseSql = SqlConstants.RELEASE_SQL;
@@ -63,12 +63,12 @@ public class DBDataSource {
if (!selectorConfig.isUseDefaultLeader()) {
initDataSource();
if (needFormatSql) {
- formatSql(selectorConfig.getElectorDbName(),
selectorConfig.getServiceId(),
+ formatSql(selectorConfig.getSelectorDbName(),
selectorConfig.getServiceId(),
selectorConfig.getLeaderId());
}
}
} catch (Exception exception) {
- logger.error(exception.getMessage());
+ LOGGER.error(exception.getMessage());
throw exception;
}
}
@@ -88,7 +88,7 @@ public class DBDataSource {
if (datasource == null || datasource.isClosed()) {
HikariConfig config = new HikariConfig();
config.setDriverClassName(selectorConfig.getDbDriver());
- logger.info("Init dataSource:{}",
selectorConfig.getDbUrl());
+ LOGGER.info("Init dataSource:{}",
selectorConfig.getDbUrl());
config.setJdbcUrl(selectorConfig.getDbUrl());
config.setUsername(selectorConfig.getDbUser());
config.setPassword(selectorConfig.getDbPasswd());
@@ -107,7 +107,7 @@ public class DBDataSource {
initSucc = true;
} catch (Exception exception) {
- logger.error("DB url:{},user name:{},password:{},exception:{}",
+ LOGGER.error("DB url:{},user name:{},password:{},exception:{}",
selectorConfig.getDbUrl(),
selectorConfig.getDbUser(),
selectorConfig.getDbPasswd(),
@@ -144,15 +144,15 @@ public class DBDataSource {
try (PreparedStatement pstmt =
connection.prepareStatement(sql)) {
result = pstmt.executeUpdate();
} catch (Exception executeUpdatEexception) {
- logger.error("Exception :{}",
executeUpdatEexception.getMessage());
+ LOGGER.error("Exception :{}",
executeUpdatEexception.getMessage());
}
} catch (Exception pstmtEexception) {
- logger.error("Exception :{}", pstmtEexception.getMessage());
+ LOGGER.error("Exception :{}", pstmtEexception.getMessage());
}
getConnectionFailTimes.set(0);
} catch (Exception exception) {
getConnectionFailTimes.addAndGet(1);
- logger.warn("Get Connection fail. {}", exception.getMessage());
+ LOGGER.warn("Get Connection fail. {}", exception.getMessage());
}
return result;
}
@@ -165,12 +165,12 @@ public class DBDataSource {
try {
int result = executeUpdate(selectorSql);
if (result == 2) {
- logger.info("{} get the leader",
selectorConfig.getLeaderId());
+ LOGGER.info("{} get the leader",
selectorConfig.getLeaderId());
} else if (result == 1) {
- logger.info("{} do not get the leader",
selectorConfig.getLeaderId());
+ LOGGER.info("{} do not get the leader",
selectorConfig.getLeaderId());
}
} catch (Exception exception) {
- logger.error("Exception: {} ,sql:{}", exception.getMessage(),
selectorSql);
+ LOGGER.error("Exception: {} ,sql:{}", exception.getMessage(),
selectorSql);
}
}
@@ -187,13 +187,13 @@ public class DBDataSource {
try {
int result = executeUpdate(replaceLeaderSql);
if (result > 0) {
- logger.info("Replace leader success.sql:{}", replaceLeaderSql);
+ LOGGER.info("Replace leader success.sql:{}", replaceLeaderSql);
} else {
- logger.warn("Replace leader failed. sql:" + replaceLeaderSql);
+ LOGGER.warn("Replace leader failed. sql:" + replaceLeaderSql);
}
} catch (Exception exception) {
- logger.error("Exception :{} ", exception.getMessage());
+ LOGGER.error("Exception :{} ", exception.getMessage());
}
}
@@ -203,12 +203,12 @@ public class DBDataSource {
public void releaseLeader() {
try {
int result = executeUpdate(reLeaseSql);
- logger.info("ReleaseLeader sql:{}", reLeaseSql);
+ LOGGER.info("ReleaseLeader sql:{}", reLeaseSql);
if (result == 1) {
- logger.info("{} release the leader success",
selectorConfig.getLeaderId());
+ LOGGER.info("{} release the leader success",
selectorConfig.getLeaderId());
}
} catch (Exception exception) {
- logger.error("ReLease sql:{},exception {}:,", reLeaseSql,
exception.getMessage());
+ LOGGER.error("ReLease sql:{},exception {}:,", reLeaseSql,
exception.getMessage());
}
}
@@ -226,7 +226,7 @@ public class DBDataSource {
try {
if (null == datasource || datasource.isClosed()) {
- logger.warn("DataSource is closed init is again");
+ LOGGER.warn("DataSource is closed init is again");
initDataSource();
}
try (Connection connection = datasource.getConnection()) {
@@ -236,13 +236,13 @@ public class DBDataSource {
leaderId = resultSet.getString("leader");
}
} catch (Exception exception) {
- logger.error("Exception {}", exception.getMessage());
+ LOGGER.error("Exception {}", exception.getMessage());
}
} catch (Throwable connectionException) {
- logger.error("Exception {}",
connectionException.getMessage());
+ LOGGER.error("Exception {}",
connectionException.getMessage());
}
} catch (Exception datasourceException) {
- logger.error("Exception {}", datasourceException.getMessage());
+ LOGGER.error("Exception {}", datasourceException.getMessage());
}
return leaderId;
@@ -263,7 +263,7 @@ public class DBDataSource {
}
return false;
} catch (Exception exception) {
- logger.error("Exception {}", exception.getMessage());
+ LOGGER.error("Exception {}", exception.getMessage());
return true;
}
}
@@ -278,14 +278,14 @@ public class DBDataSource {
public void formatSql(String... params) {
selectorSql = MessageFormat.format(selectorSql, params);
selectorSql = selectorSql.replaceAll("#",
selectorConfig.getLeaderTimeout() + "");
- logger.info(selectorSql);
+ LOGGER.info(selectorSql);
replaceLeaderSql = MessageFormat.format(replaceLeaderSql, params);
- logger.info(replaceLeaderSql);
+ LOGGER.info(replaceLeaderSql);
reLeaseSql = MessageFormat.format(reLeaseSql, params);
- logger.info("ReLeaseSql:{}", reLeaseSql);
+ LOGGER.info("ReLeaseSql:{}", reLeaseSql);
isLeaderSql = MessageFormat.format(isLeaderSql, params);
- logger.info(isLeaderSql);
+ LOGGER.info(isLeaderSql);
searchCurrentLeaderSql = MessageFormat.format(searchCurrentLeaderSql,
params);
- logger.info(searchCurrentLeaderSql);
+ LOGGER.info(searchCurrentLeaderSql);
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
index 45352be5c2..c62de775db 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/SelectorImpl.java
@@ -37,7 +37,7 @@ import java.util.concurrent.TimeUnit;
*/
public class SelectorImpl extends Selector {
- private static final Logger logger =
LoggerFactory.getLogger(SelectorImpl.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SelectorImpl.class);
private final SelectorConfig selectorConfig;
private final ExecutorService fixedThreadPool;
private boolean canElector = true;
@@ -60,7 +60,7 @@ public class SelectorImpl extends Selector {
*/
public void init() throws Exception {
try {
- logger.info("Init selector impl...");
+ LOGGER.info("Init selector impl...");
dbDataSource.init(true);
@@ -68,7 +68,7 @@ public class SelectorImpl extends Selector {
fixedThreadPool.execute(new DBMonitorTask(selectorConfig,
dbDataSource));
} catch (Exception exception) {
- logger.error("Failed to init selector", exception);
+ LOGGER.error("Failed to init selector", exception);
}
}
@@ -89,14 +89,14 @@ public class SelectorImpl extends Selector {
try {
dbDataSource.releaseLeader();
} catch (Exception exception) {
- logger.error("Exception :{}", exception.getMessage());
+ LOGGER.error("Exception :{}", exception.getMessage());
}
try {
TimeUnit.SECONDS.sleep(Configuration.getInstance().get(ConfigConstants.KEY_RELEASE_LEADER_INTERVAL,
ConfigConstants.DEFAULT_RELEASE_LEADER_INTERVAL));
} catch (Exception exception) {
- logger.error("Exception :{}", exception.getMessage());
+ LOGGER.error("Exception :{}", exception.getMessage());
}
}
@@ -142,7 +142,7 @@ public class SelectorImpl extends Selector {
dbDataSource.init(false);
canSelect(true);
} catch (Exception exception) {
- logger.error("Exception :{}", exception.getMessage());
+ LOGGER.error("Exception :{}", exception.getMessage());
return false;
}
return true;
@@ -198,7 +198,7 @@ public class SelectorImpl extends Selector {
try {
TimeUnit.SECONDS.sleep(sleepTime);
} catch (Exception exception) {
- logger.error("Exception :{}", exception.getMessage());
+ LOGGER.error("Exception :{}", exception.getMessage());
}
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
index c1dc8e3d5c..4d980aa632 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/task/DBMonitorTask.java
@@ -30,34 +30,34 @@ import java.util.concurrent.TimeUnit;
*/
public class DBMonitorTask implements Runnable {
- private static final Logger logger =
LoggerFactory.getLogger(DBMonitorTask.class);
- private SelectorConfig electorConfig;
- private DBDataSource dbDataSource;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DBMonitorTask.class);
+ private final SelectorConfig selectorConfig;
+ private final DBDataSource dbDataSource;
private int dbClosedTimes = 0;
- private boolean replaced = true;
+ private final boolean replaced = true;
- public DBMonitorTask(SelectorConfig electorConfig, DBDataSource
dbDataSource) {
- this.electorConfig = electorConfig;
+ public DBMonitorTask(SelectorConfig selectorConfig, DBDataSource
dbDataSource) {
+ this.selectorConfig = selectorConfig;
this.dbDataSource = dbDataSource;
}
public void run() {
try {
while (true) {
- logger.info("DB monitor task run once");
-
TimeUnit.SECONDS.sleep(electorConfig.getDbMonitorRunInterval());
+ LOGGER.info("DB monitor task run once");
+
TimeUnit.SECONDS.sleep(selectorConfig.getDbMonitorRunInterval());
- if (!(electorConfig.isUseDefaultLeader()))
+ if (!(selectorConfig.isUseDefaultLeader()))
break;
}
if (dbDataSource.isDBDataSourceClosed()) {
dbClosedTimes += 1;
- logger.info("DB closed times :{}", dbClosedTimes);
+ LOGGER.info("DB closed times :{}", dbClosedTimes);
} else {
dbClosedTimes = 0;
}
} catch (Exception e) {
- logger.error("DB monitor task has exception {}", e.getMessage());
+ LOGGER.error("DB monitor task has exception {}", e.getMessage());
}
}
}
\ No newline at end of file
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
new file mode 100644
index 0000000000..6dac009c70
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service;
+
+import org.apache.inlong.audit.cache.DayCache;
+import org.apache.inlong.audit.cache.HalfHourCache;
+import org.apache.inlong.audit.cache.HourCache;
+import org.apache.inlong.audit.cache.TenMinutesCache;
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.AuditCycle;
+import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.utils.CacheUtils;
+
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+import static org.apache.inlong.audit.config.OpenApiConstants.AUDIT_ID;
+import static org.apache.inlong.audit.config.OpenApiConstants.AUDIT_TAG;
+import static org.apache.inlong.audit.config.OpenApiConstants.BIND_PORT;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_BACKLOG_SIZE;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_DAY_PATH;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_HOUR_PATH;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTE_10_PATH;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTE_30_PATH;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_AUDIT_TAG;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_POOL_SIZE;
+import static org.apache.inlong.audit.config.OpenApiConstants.END_TIME;
+import static
org.apache.inlong.audit.config.OpenApiConstants.HTTP_RESPOND_CODE;
+import static org.apache.inlong.audit.config.OpenApiConstants.INLONG_GROUP_Id;
+import static org.apache.inlong.audit.config.OpenApiConstants.INLONG_STREAM_Id;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_BACKLOG_SIZE;
+import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_DAY_PATH;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_HOUR_PATH;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTE_10_PATH;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTE_30_PATH;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_POOL_SIZE;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
+import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE;
+import static org.apache.inlong.audit.config.OpenApiConstants.START_TIME;
+import static
org.apache.inlong.audit.config.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE;
+
+public class ApiService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ApiService.class);
+
+ public void start() {
+ initHttpServer();
+ }
+
+ public void stop() {
+
+ }
+
+ private void initHttpServer() {
+ try {
+ HttpServer server = HttpServer.create(new
InetSocketAddress(BIND_PORT),
+ Configuration.getInstance().get(KEY_API_BACKLOG_SIZE,
DEFAULT_API_BACKLOG_SIZE));
+ server.setExecutor(Executors.newFixedThreadPool(
+ Configuration.getInstance().get(KEY_API_POOL_SIZE,
DEFAULT_POOL_SIZE)));
+
server.createContext(Configuration.getInstance().get(KEY_API_DAY_PATH,
DEFAULT_API_DAY_PATH),
+ new AuditHandler(AuditCycle.DAY));
+
server.createContext(Configuration.getInstance().get(KEY_API_HOUR_PATH,
DEFAULT_API_HOUR_PATH),
+ new AuditHandler(AuditCycle.HOUR));
+ server.createContext(
+ Configuration.getInstance().get(KEY_API_MINUTE_10_PATH,
DEFAULT_API_MINUTE_10_PATH),
+ new AuditHandler(AuditCycle.MINUTE_10));
+
server.createContext(Configuration.getInstance().get(KEY_API_MINUTE_30_PATH,
DEFAULT_API_MINUTE_30_PATH),
+ new AuditHandler(AuditCycle.MINUTE_30));
+ server.start();
+ } catch (Exception e) {
+ LOGGER.error("Init http server has exception!", e);
+ }
+ }
+
+ static class AuditHandler implements HttpHandler, AutoCloseable {
+
+ private final AuditCycle apiType;
+ private final RateLimiter limiter;
+
+ public AuditHandler(AuditCycle apiType) {
+ this.apiType = apiType;
+ limiter =
RateLimiter.create(Configuration.getInstance().get(KEY_API_REAL_LIMITER_QPS,
+ DEFAULT_API_REAL_LIMITER_QPS));
+ }
+
+ @Override
+ public void handle(HttpExchange exchange) {
+ if (null != limiter) {
+ limiter.acquire();
+ }
+
+ try (OutputStream os = exchange.getResponseBody()) {
+ JsonObject responseJson = new JsonObject();
+
+ Map<String, String> params =
parseRequestURI(exchange.getRequestURI().getQuery());
+ if (!checkParams(params)) {
+ handleInvalidParams(responseJson, exchange);
+ } else {
+ handleLegalParams(responseJson, params);
+ }
+
+ byte[] bytes =
responseJson.toString().getBytes(StandardCharsets.UTF_8);
+
+ exchange.getResponseHeaders().set(KEY_HTTP_HEADER_CONTENT_TYPE,
+ VALUE_HTTP_HEADER_CONTENT_TYPE);
+ exchange.sendResponseHeaders(HTTP_RESPOND_CODE, bytes.length);
+ os.write(bytes);
+ } catch (Exception e) {
+ LOGGER.error("Audit handler has exception!", e);
+ }
+ }
+
+ private Map<String, String> parseRequestURI(String query) {
+ Map<String, String> params = new HashMap<>();
+ if (query != null) {
+ String[] pairs = query.split("&");
+ for (String pair : pairs) {
+ String[] keyValue = pair.split("=");
+ if (keyValue.length == 2) {
+ String key = keyValue[0];
+ String value = keyValue[1];
+ params.put(key, value);
+ }
+ }
+ }
+ params.putIfAbsent(AUDIT_TAG, DEFAULT_AUDIT_TAG);
+ return params;
+ }
+
+ private boolean checkParams(Map<String, String> params) {
+ return params.containsKey(START_TIME)
+ && params.containsKey(END_TIME)
+ && params.containsKey(AUDIT_ID)
+ && params.containsKey(INLONG_GROUP_Id)
+ && params.containsKey(INLONG_STREAM_Id);
+ }
+
+ private void handleInvalidParams(JsonObject responseJson, HttpExchange
exchange) {
+ responseJson.addProperty(KEY_HTTP_BODY_SUCCESS, false);
+ responseJson.addProperty(KEY_HTTP_BODY_ERR_MSG, "Invalid params! "
+ exchange.getRequestURI());
+ Gson gson = new Gson();
+ responseJson.add(KEY_HTTP_BODY_ERR_DATA, gson.toJsonTree(new
LinkedList<>()));
+ }
+
+ private void handleLegalParams(JsonObject responseJson, Map<String,
String> params) {
+ String cacheKey = CacheUtils.buildCacheKey(params.get(START_TIME),
params.get(INLONG_GROUP_Id),
+ params.get(INLONG_STREAM_Id), params.get(AUDIT_ID),
params.get(AUDIT_TAG));
+ LOGGER.info("handleLegalParams cacheKey {}", cacheKey);
+ List<StatData> statData = null;
+ switch (apiType) {
+ case MINUTE_10:
+ statData = TenMinutesCache.getInstance().getData(cacheKey);
+ break;
+ case MINUTE_30:
+ statData = HalfHourCache.getInstance().getData(cacheKey);
+ break;
+ case HOUR:
+ statData = HourCache.getInstance().getData(cacheKey);
+ break;
+ case DAY:
+ statData = DayCache.getInstance().getData(
+ params.get(START_TIME),
+ params.get(END_TIME),
+ params.get(INLONG_GROUP_Id),
+ params.get(INLONG_STREAM_Id),
+ params.get(AUDIT_ID),
+ params.get(AUDIT_TAG));
+ break;
+ default:
+ LOGGER.error("Unsupported interface type! type is {}",
apiType);
+ }
+ if (null == statData)
+ statData = new LinkedList<>();
+
+ responseJson.addProperty(KEY_HTTP_BODY_SUCCESS, true);
+ responseJson.addProperty(KEY_HTTP_BODY_ERR_MSG, "");
+ Gson gson = new Gson();
+ responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
index 0574ac3771..eb3f40e5b4 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
@@ -23,33 +23,33 @@ import org.apache.inlong.audit.cache.TenMinutesCache;
import org.apache.inlong.audit.channel.DataQueue;
import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.AuditCycle;
+import org.apache.inlong.audit.entities.JdbcConfig;
import org.apache.inlong.audit.entities.SinkConfig;
import org.apache.inlong.audit.entities.SourceConfig;
+import org.apache.inlong.audit.selector.api.Selector;
+import org.apache.inlong.audit.selector.api.SelectorConfig;
+import org.apache.inlong.audit.selector.api.SelectorFactory;
import org.apache.inlong.audit.sink.CacheSink;
import org.apache.inlong.audit.sink.JdbcSink;
import org.apache.inlong.audit.source.JdbcSource;
+import org.apache.inlong.audit.utils.JdbcUtils;
+import org.apache.inlong.common.util.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Objects;
+import java.util.UUID;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CLICKHOUSE_DRIVER;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATA_QUEUE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_DRIVER;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_JDBC_URL;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_PASSWORD;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_USERNAME;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DAILY_SUMMARY_STAT_BACK_TIMES;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATA_QUEUE_SIZE;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER;
-import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_REALTIME_SUMMARY_STAT_BACK_TIMES;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_SERVICE_ID;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_DAILY_STAT_BACK_TIMES;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_REALTIME_STAT_BACK_TIMES;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SINK_INSERT_DAY_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL;
@@ -64,7 +64,7 @@ import static
org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY
*/
public class EtlService {
- private static final Logger LOG =
LoggerFactory.getLogger(JdbcSource.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EtlService.class);
private JdbcSource mysqlSourceOfTemp;
private JdbcSource mysqlSourceOfTenMinutesCache;
private JdbcSource mysqlSourceOfHalfHourCache;
@@ -77,23 +77,27 @@ public class EtlService {
private CacheSink cacheSinkOfHourCache;
private final int queueSize;
private final int statBackTimes;
+ private static Selector selector;
+ private boolean running = true;
public EtlService() {
queueSize = Configuration.getInstance().get(KEY_DATA_QUEUE_SIZE,
DEFAULT_DATA_QUEUE_SIZE);
- statBackTimes =
Configuration.getInstance().get(KEY_REALTIME_SUMMARY_STAT_BACK_TIMES,
- DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES);
+ statBackTimes =
Configuration.getInstance().get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
+ DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES);
}
/**
* Start the etl service.
*/
public void start() {
- clickhouseToMysql();
mysqlToMysqlOfDay();
mysqlToTenMinutesCache();
mysqlToHalfHourCache();
mysqlToHourCache();
+
+ initSelector();
+ waitToBeLeader();
}
/**
@@ -104,8 +108,8 @@ public class EtlService {
DataQueue dataQueue = new DataQueue(queueSize);
mysqlSourceOfTemp = new JdbcSource(dataQueue,
buildMysqlSourceConfig(AuditCycle.DAY,
-
Configuration.getInstance().get(KEY_DAILY_SUMMARY_STAT_BACK_TIMES,
- DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES)));
+
Configuration.getInstance().get(KEY_SUMMARY_DAILY_STAT_BACK_TIMES,
+ DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES)));
mysqlSourceOfTemp.start();
SinkConfig sinkConfig =
buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_DAY_SQL,
@@ -175,21 +179,13 @@ public class EtlService {
* @return
*/
private SinkConfig buildMysqlSinkConfig(String insertSql) {
- String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER,
KEY_DEFAULT_MYSQL_DRIVER);
- String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL);
- String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME);
- String passWord = Configuration.getInstance().get(KEY_MYSQL_PASSWORD);
- assert (Objects.nonNull(driver)
- && Objects.nonNull(jdbcUrl)
- && Objects.nonNull(userName)
- && Objects.nonNull(passWord));
-
+ JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
return new SinkConfig(
insertSql,
- driver,
- jdbcUrl,
- userName,
- passWord);
+ jdbcConfig.getDriverClass(),
+ jdbcConfig.getJdbcUrl(),
+ jdbcConfig.getUserName(),
+ jdbcConfig.getPassword());
}
/**
@@ -198,22 +194,15 @@ public class EtlService {
* @return
*/
private SourceConfig buildMysqlSourceConfig(AuditCycle auditCycle, int
statBackTimes) {
- String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER,
KEY_DEFAULT_MYSQL_DRIVER);
- String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL);
- String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME);
- String passWord = Configuration.getInstance().get(KEY_MYSQL_PASSWORD);
- assert (Objects.nonNull(driver)
- && Objects.nonNull(jdbcUrl)
- && Objects.nonNull(userName));
-
+ JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
return new SourceConfig(auditCycle,
Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL,
DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL),
statBackTimes,
- driver,
- jdbcUrl,
- userName,
- passWord);
+ jdbcConfig.getDriverClass(),
+ jdbcConfig.getJdbcUrl(),
+ jdbcConfig.getUserName(),
+ jdbcConfig.getPassword());
}
/**
@@ -222,30 +211,66 @@ public class EtlService {
* @return
*/
private SourceConfig buildClickhouseSourceConfig() {
- String driver = Configuration.getInstance().get(KEY_CLICKHOUSE_DRIVER,
DEFAULT_CLICKHOUSE_DRIVER);
- String jdbcUrl =
Configuration.getInstance().get(KEY_CLICKHOUSE_JDBC_URL);
- String userName =
Configuration.getInstance().get(KEY_CLICKHOUSE_USERNAME);
- String passWord =
Configuration.getInstance().get(KEY_CLICKHOUSE_PASSWORD);
- assert (Objects.nonNull(driver)
- && Objects.nonNull(jdbcUrl)
- && Objects.nonNull(userName)
- && Objects.nonNull(passWord));
-
+ JdbcConfig jdbcConfig = JdbcUtils.buildClickhouseConfig();
return new SourceConfig(AuditCycle.MINUTE_5,
Configuration.getInstance().get(KEY_CLICKHOUSE_SOURCE_QUERY_SQL,
DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL),
-
Configuration.getInstance().get(KEY_REALTIME_SUMMARY_STAT_BACK_TIMES,
- DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES),
- driver,
- jdbcUrl,
- userName,
- passWord);
+
Configuration.getInstance().get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
+ DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES),
+ jdbcConfig.getDriverClass(),
+ jdbcConfig.getJdbcUrl(),
+ jdbcConfig.getUserName(),
+ jdbcConfig.getPassword());
+ }
+
+ /**
+ * Init selector
+ */
+ private void initSelector() {
+ JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
+ String leaderId = NetworkUtils.getLocalIp() + "-" + UUID.randomUUID();
+ LOGGER.info("Init selector. Leader id is :{}", leaderId);
+ if (selector == null) {
+ SelectorConfig electorConfig = new SelectorConfig(
+ Configuration.getInstance().get(KEY_SELECTOR_SERVICE_ID,
DEFAULT_SELECTOR_SERVICE_ID),
+ leaderId,
+ jdbcConfig.getJdbcUrl(),
+ jdbcConfig.getUserName(), jdbcConfig.getPassword(),
jdbcConfig.getDriverClass());
+
+ selector = SelectorFactory.getNewElector(electorConfig);
+ try {
+ selector.init();
+ } catch (Exception e) {
+ LOGGER.error("Init selector has exception:", e);
+ }
+ }
+ }
+
+ /**
+ * Wait to be leader
+ */
+ public void waitToBeLeader() {
+ while (running) {
+ try {
+
Thread.sleep(Configuration.getInstance().get(KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS,
+ DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS));
+ } catch (Exception e) {
+ LOGGER.error("Wait to be Leader has exception! lost
Leadership!", e);
+ }
+
+ if (selector.isLeader()) {
+ LOGGER.info("I get Leadership! Begin to aggregate clickhouse
data to mysql");
+ clickhouseToMysql();
+ return;
+ }
+ }
}
/**
* Stop the etl service,and destroy related resources.
*/
public void stop() {
+ running = false;
mysqlSourceOfTemp.destroy();
mysqlSinkOfDay.destroy();
@@ -259,5 +284,7 @@ public class EtlService {
cacheSinkOfTenMinutesCache.destroy();
cacheSinkOfHalfHourCache.destroy();
cacheSinkOfHourCache.destroy();
+
+ selector.close();
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
index 8077817b1d..6f5f809a3d 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
@@ -40,7 +40,7 @@ import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_
*/
public class CacheSink {
- private static final Logger LOG = LoggerFactory.getLogger(CacheSink.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CacheSink.class);
private final ScheduledExecutorService sinkTimer =
Executors.newSingleThreadScheduledExecutor();
private final DataQueue dataQueue;
private final Cache<String, StatData> cache;
@@ -77,7 +77,7 @@ public class CacheSink {
data = dataQueue.pull(pullTimeOut, TimeUnit.MILLISECONDS);
}
} catch (Exception exception) {
- LOG.error("Process exception! ", exception);
+ LOGGER.error("Process exception! ", exception);
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
index 451cbdc50a..6f308ac3a9 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
@@ -60,7 +60,7 @@ import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL
*/
public class JdbcSink implements AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(JdbcSink.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JdbcSink.class);
private final ScheduledExecutorService sinkTimer =
Executors.newSingleThreadScheduledExecutor();
private final DataQueue dataQueue;
private final int insertBatch;
@@ -126,7 +126,7 @@ public class JdbcSink implements AutoCloseable {
preparedStatement.clearBatch();
}
} catch (Exception e) {
- LOG.error("Process exception! {}", e.getMessage());
+ LOGGER.error("Process exception! {}", e.getMessage());
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
index e4e8e1dd0d..ec828a4bf6 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
@@ -56,6 +56,7 @@ import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_STAT_INTERVAL;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_STAT_BACK_INITIAL_OFFSET;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_AUDIT_IDS;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
@@ -63,8 +64,10 @@ import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_STAT_INTERVAL;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_STAT_BACK_INITIAL_OFFSET;
import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
import static
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
+import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_AUDIT_TAG;
import static org.apache.inlong.audit.entities.AuditCycle.DAY;
import static org.apache.inlong.audit.entities.AuditCycle.HOUR;
@@ -74,7 +77,7 @@ import static
org.apache.inlong.audit.entities.AuditCycle.HOUR;
@Data
public class JdbcSource {
- private static final Logger LOG =
LoggerFactory.getLogger(JdbcSource.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JdbcSource.class);
private final ConcurrentHashMap<Integer, ScheduledExecutorService>
statTimers = new ConcurrentHashMap<>();
private DataQueue dataQueue;
private List<String> auditIds;
@@ -106,12 +109,14 @@ public class JdbcSource {
if (sourceConfig.getAuditCycle() == DAY) {
statInterval = HOUR.getValue();
}
- for (int statBackTime = 1; statBackTime <
sourceConfig.getStatBackTimes(); statBackTime++) {
+ int offset =
Configuration.getInstance().get(KEY_STAT_BACK_INITIAL_OFFSET,
+ DEFAULT_STAT_BACK_INITIAL_OFFSET);
+ for (int statBackTime = 0; statBackTime <
sourceConfig.getStatBackTimes(); statBackTime++) {
ScheduledExecutorService timer =
statTimers.computeIfAbsent(statBackTime, k ->
Executors.newSingleThreadScheduledExecutor());
- timer.scheduleWithFixedDelay(new StatServer(statBackTime),
+ timer.scheduleWithFixedDelay(new StatServer(offset++),
0,
- statInterval, TimeUnit.MINUTES);
+ statInterval + statBackTime, TimeUnit.MINUTES);
}
}
@@ -218,12 +223,12 @@ public class JdbcSource {
public void run() {
long currentTimestamp = System.currentTimeMillis();
- LOG.info("Stat source data at {},stat back times:{}",
currentTimestamp, statBackTimes);
+ LOGGER.info("Stat source data at {},stat back times:{}",
currentTimestamp, statBackTimes);
statByStep();
long timeCost = System.currentTimeMillis() - currentTimestamp;
- LOG.info("Stat source data cost time:{}ms,stat back times:{}",
timeCost, statBackTimes);
+ LOGGER.info("Stat source data cost time:{}ms,stat back times:{}",
timeCost, statBackTimes);
}
/**
@@ -255,7 +260,7 @@ public class JdbcSource {
long currentTimestamp = System.currentTimeMillis();
query(statCycle.getStartTime(), statCycle.getEndTime(),
auditId);
long timeCost = System.currentTimeMillis() - currentTimestamp;
- LOG.info("[{}]-[{}],{},stat back times:{},audit
id:{},cost:{}ms",
+ LOGGER.info("[{}]-[{}],{},stat back times:{},audit
id:{},cost:{}ms",
statCycle.getStartTime(), statCycle.getEndTime(),
sourceConfig.getAuditCycle(),
statBackTimes, auditId, timeCost);
@@ -280,29 +285,27 @@ public class JdbcSource {
pstat.setString(3, auditId);
try (ResultSet resultSet = pstat.executeQuery()) {
while (resultSet.next()) {
- String inlongGroupID = resultSet.getString(1);
- String InlongStreamID = resultSet.getString(2);
- String AuditId = resultSet.getString(3);
- String AuditTag = resultSet.getString(4);
- long count = resultSet.getLong(5);
- long size = resultSet.getLong(6);
- long delay = resultSet.getLong(7);
StatData data = new StatData();
data.setLogTs(startTime);
- data.setInlongGroupId(inlongGroupID);
- data.setInlongStreamId(InlongStreamID);
- data.setAuditId(AuditId);
- data.setAuditTag(AuditTag);
- data.setCount(count);
- data.setSize(size);
- data.setDelay(delay);
+ data.setInlongGroupId(resultSet.getString(1));
+ data.setInlongStreamId(resultSet.getString(2));
+ data.setAuditId(resultSet.getString(3));
+ String auditTag = resultSet.getString(4);
+ if (null == auditTag) {
+ data.setAuditTag(DEFAULT_AUDIT_TAG);
+ } else {
+ data.setAuditTag(auditTag);
+ }
+ data.setCount(resultSet.getLong(5));
+ data.setSize(resultSet.getLong(6));
+ data.setDelay(resultSet.getLong(7));
dataQueue.push(data);
}
} catch (SQLException sqlException) {
- LOG.error("Query has SQL exception! ", sqlException);
+ LOGGER.error("Query has SQL exception! ", sqlException);
}
} catch (Exception exception) {
- LOG.error("Query has exception! ", exception);
+ LOGGER.error("Query has exception! ", exception);
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
new file mode 100644
index 0000000000..97c28d68be
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.JdbcConfig;
+
+import java.util.Objects;
+
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CLICKHOUSE_DRIVER;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_DRIVER;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_JDBC_URL;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_PASSWORD;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_USERNAME;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER;
+import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME;
+
+/**
+ * Jdbc utils
+ */
+public class JdbcUtils {
+
+ /**
+ * Build mysql config
+ * @return
+ */
+ public static JdbcConfig buildMysqlConfig() {
+ return doBuild(Configuration.getInstance().get(KEY_MYSQL_DRIVER,
KEY_DEFAULT_MYSQL_DRIVER),
+ Configuration.getInstance().get(KEY_MYSQL_JDBC_URL),
+ Configuration.getInstance().get(KEY_MYSQL_USERNAME),
+ Configuration.getInstance().get(KEY_MYSQL_PASSWORD));
+ }
+
+ /**
+ * Build clickhouse config
+ * @return
+ */
+ public static JdbcConfig buildClickhouseConfig() {
+ return doBuild(
+ Configuration.getInstance().get(KEY_CLICKHOUSE_DRIVER,
DEFAULT_CLICKHOUSE_DRIVER),
+ Configuration.getInstance().get(KEY_CLICKHOUSE_JDBC_URL),
+ Configuration.getInstance().get(KEY_CLICKHOUSE_USERNAME),
+ Configuration.getInstance().get(KEY_CLICKHOUSE_PASSWORD));
+ }
+
+ /**
+ * Do build config
+ * @param driverClass
+ * @param jdbcUrl
+ * @param userName
+ * @param password
+ * @return
+ */
+ private static JdbcConfig doBuild(String driverClass, String jdbcUrl,
String userName, String password) {
+ assert (Objects.nonNull(driverClass)
+ && Objects.nonNull(jdbcUrl)
+ && Objects.nonNull(userName)
+ && Objects.nonNull(password));
+
+ return new JdbcConfig(
+ driverClass,
+ jdbcUrl,
+ userName,
+ password);
+ }
+}
diff --git a/inlong-audit/bin/service-start.sh
b/inlong-audit/bin/service-start.sh
new file mode 100644
index 0000000000..f1ea65a31e
--- /dev/null
+++ b/inlong-audit/bin/service-start.sh
@@ -0,0 +1,85 @@
+#!/bin/bash
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+BASE_DIR=$(cd "$(dirname "$0")"/../;pwd)
+
+# Enter the root directory path
+# shellcheck disable=SC2164
+cd "$BASE_DIR"
+
+# Prepare common dependency
+ROOT_DIR=$BASE_DIR/../..
+if [ -e $ROOT_DIR/bin/prepare_module_dependencies.sh ]; then
+ $ROOT_DIR/bin/prepare_module_dependencies.sh ./inlong-audit/lib
+fi
+
+PID=$(ps -ef | grep "audit-service" | grep -v grep | awk '{ print $2}')
+LOG_DIR="${BASE_DIR}/logs"
+
+if [ -n "$PID" ]; then
+ echo "Application has already started."
+ exit 0
+fi
+
+if [[ -z $JAVA_HOME ]]; then
+ JAVA=$(which java)
+ if [ $? != 0 ]; then
+ echo "Error: JAVA_HOME not set, and no java executable found in
$PATH." 1>&2
+ exit 1
+ fi
+else
+ JAVA=$JAVA_HOME/bin/java
+fi
+
+if [ ! -d "${LOG_DIR}" ]; then
+ mkdir "${LOG_DIR}"
+fi
+
+JAVA_OPTS="-server -XX:SurvivorRatio=2 -XX:+UseParallelGC"
+
+if [ -z "$AUDIT_JVM_HEAP_OPTS" ]; then
+ HEAP_OPTS="-Xms1g -Xmx1g"
+else
+ HEAP_OPTS="$AUDIT_JVM_HEAP_OPTS"
+fi
+JAVA_OPTS="${JAVA_OPTS} ${HEAP_OPTS}"
+
+# shellcheck disable=SC2010
+SERVER_JAR=$(ls -lt "${BASE_DIR}"/lib |grep audit-service | head -2 | tail -1
| awk '{print $NF}')
+
+nohup "$JAVA" $JAVA_OPTS -Daudit.log.path="$LOG_DIR"
-Dloader.path="$BASE_DIR/conf,$BASE_DIR/lib/" -jar "$BASE_DIR/lib/$SERVER_JAR"
1>"${LOG_DIR}"/service.log 2>"${LOG_DIR}"/service-error.log &
+
+PID_FILE="$BASE_DIR/bin/PID"
+
+# shellcheck disable=SC2009
+PID=$(ps -ef | grep "$BASE_DIR" | grep "$SERVER_JAR"| grep -v grep | awk '{
print $2}')
+
+sleep 3
+
+if [ -n "$PID" ]; then
+ echo -n "$PID" > "$PID_FILE"
+ echo "Application started."
+ exit 0
+else
+ echo "Application start failed."
+ exit 0
+fi
diff --git a/inlong-audit/bin/service-stop.sh b/inlong-audit/bin/service-stop.sh
new file mode 100644
index 0000000000..75cca34949
--- /dev/null
+++ b/inlong-audit/bin/service-stop.sh
@@ -0,0 +1,42 @@
+#!/bin/bash
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# this program kills the audit service
+for i in {1..5}
+do
+ pid=$(ps aux | grep "audit-service" | grep -v "grep" | awk '{print $2}')
+ if [ -z "$pid" ]; then
+ echo "audit-service is not running"
+ break
+ else
+ kill $pid
+ fi
+ sleep 10
+done
+
+pid=$(ps aux | grep "audit-service" | grep -v "grep" | awk '{print $2}')
+if [ -z "$pid" ]; then
+ echo "audit-service has stopped"
+else
+ kill -9 $pid
+fi
+
+echo "Stop audit-service successfully."
diff --git a/inlong-audit/conf/audit-service.properties
b/inlong-audit/conf/audit-service.properties
new file mode 100644
index 0000000000..66fa9e0fd5
--- /dev/null
+++ b/inlong-audit/conf/audit-service.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# clickhouse config
+clickhouse.jdbc.url=jdbc:clickhouse://*****:***/db_inlong_audit?socket_timeout=600000
+clickhouse.username=*****
+clickhouse.password=*****
+
+# mysql config
+mysql.jdbc.url=jdbc:mysql://*****:***/apache_inlong_audit?characterEncoding=utf8&useUnicode=true&rewriteBatchedStatements=true
+mysql.username=*****
+mysql.password=*****
+
+# summary config
+summary.realtime.stat.back.times=6
+summary.daily.stat.back.times=2
+audit.ids=3;4;5;6
+
+# api config
+api.cache.max.size=50000000
+api.cache.expired.hours=12
+api.real.limiter.qps=1000.0
+api.pool.size=10
+api.backlog.size=100
+
diff --git a/inlong-audit/pom.xml b/inlong-audit/pom.xml
index fa98312e8b..53996d2917 100644
--- a/inlong-audit/pom.xml
+++ b/inlong-audit/pom.xml
@@ -33,6 +33,7 @@
<modules>
<module>audit-proxy</module>
<module>audit-store</module>
+ <module>audit-service</module>
<module>audit-common</module>
<module>audit-sdk</module>
<module>audit-release</module>
diff --git a/inlong-audit/sql/audit-service/apcache_inlong_audit_aggregate.sql
b/inlong-audit/sql/audit-service/apcache_inlong_audit_aggregate.sql
index 5794355533..5e15717bff 100644
--- a/inlong-audit/sql/audit-service/apcache_inlong_audit_aggregate.sql
+++ b/inlong-audit/sql/audit-service/apcache_inlong_audit_aggregate.sql
@@ -66,3 +66,15 @@ CREATE TABLE IF NOT EXISTS `audit_data_day`
PRIMARY KEY
(`log_ts`,`inlong_group_id`,`inlong_stream_id`,`audit_id`,`audit_tag`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data day table';
+
+-- ----------------------------
+-- Table structure for selector
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `leader_selector` (
+ `service_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
NOT NULL,
+ `leader_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
NOT NULL,
+ `last_seen_active` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (`service_id`)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_general_ci
COMMENT = 'selector db'
+
+