This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8dcf31a5ad [INLONG-9918][Audit] Audit-service add codes of config
(#9919)
8dcf31a5ad is described below
commit 8dcf31a5adbf5642d6bd4eb44a0e161b748d9b4e
Author: doleyzi <[email protected]>
AuthorDate: Sun Apr 7 11:26:21 2024 +0800
[INLONG-9918][Audit] Audit-service add codes of config (#9919)
---
.../src/main/java/channel/DataQueue.java | 69 ++++++++++++
.../src/main/java/config/ConfigConstants.java | 104 +++++++++++++++++
.../src/main/java/config/Configuration.java | 93 +++++++++++++++
.../{SourceEntities.java => SourceConfig.java} | 7 +-
.../src/main/java/source/AbstractSource.java | 125 +++++++++++++++++++++
.../SourceEntities.java => source/SourceStat.java} | 20 ++--
6 files changed, 404 insertions(+), 14 deletions(-)
diff --git a/inlong-audit/audit-service/src/main/java/channel/DataQueue.java
b/inlong-audit/audit-service/src/main/java/channel/DataQueue.java
new file mode 100644
index 0000000000..46b3d9068f
--- /dev/null
+++ b/inlong-audit/audit-service/src/main/java/channel/DataQueue.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package channel;
+
+import entities.StatData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Data queue. use in source and sink.
+ */
+public class DataQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataQueue.class);
+
+ private final LinkedBlockingQueue<StatData> queue;
+
+ public DataQueue(int capacity) {
+ queue = new LinkedBlockingQueue<>(capacity);
+ }
+
+ /**
+ * Push data
+ *
+ * @param statDataPo
+ */
+ public void push(StatData statDataPo) throws InterruptedException {
+ queue.put(statDataPo);
+ }
+
+ /**
+ * Pull data
+ *
+ * @param timeout
+ * @param unit
+ * @return
+ */
+ public StatData pull(long timeout, TimeUnit unit) throws
InterruptedException {
+ return queue.poll(timeout, unit);
+ }
+
+ /**
+ * destroy
+ */
+ public void destroy() {
+ if (queue != null) {
+ queue.clear();
+ }
+ LOG.info("destroy channel!");
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java
b/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java
new file mode 100644
index 0000000000..a3bd9b29f9
--- /dev/null
+++ b/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config;
+
+/**
+ * Config constants
+ */
+public class ConfigConstants {
+
+ // Source config
+ public static final String KEY_CLICKHOUSE_DRIVER = "clickhouse.driver";
+ public static final String DEFAULT_CLICKHOUSE_DRIVER =
"ru.yandex.clickhouse.ClickHouseDriver";
+ public static final String KEY_CLICKHOUSE_URL = "clickhouse.url";
+ public static final String KEY_CLICKHOUSE_USERNAME = "clickhouse.username";
+ public static final String KEY_CLICKHOUSE_PASSWORD = "clickhouse.password";
+
+ // DB config
+ public static final String KEY_MYSQL_DRIVER = "mysql.driver";
+ public static final String KEY_DEFAULT_MYSQL_DRIVER =
"com.mysql.cj.jdbc.Driver";
+ public static final String KEY_MYSQL_URL = "mysql.url";
+ public static final String KEY_MYSQL_USERNAME = "mysql.username";
+ public static final String KEY_MYSQL_PASSWORD = "mysql.password";
+
+ // Time config
+ public static final String KEY_QUERY_SQL_TIME_OUT = "query.sql.timeout";
+ public static final int DEFAULT_QUERY_SQL_TIME_OUT = 300;
+ public static final String KEY_JDBC_TIME_OUT = "jdbc.timeout.second";
+ public static final int DEFAULT_JDBC_TIME_OUT = 300;
+ public static final String KEY_DATASOURCE_CONNECTION_TIMEOUT =
"datasource.connection.timeout.ms";
+ public static final int DEFAULT_CONNECTION_TIMEOUT = 1000 * 60 * 5;
+ public static final String KEY_API_RESPONSE_TIMEOUT =
"api.response.timeout";
+ public static final int DEFAULT_API_TIMEOUT = 30;
+ public static final String KEY_QUEUE_PULL_TIMEOUT =
"queue.pull.timeout.ms";
+ public static final int DEFAULT_QUEUE_PULL_TIMEOUT = 1000;
+
+ // Interval config
+ public static final String KEY_SOURCE_CLICKHOUSE_STAT_INTERVAL =
"source.clickhouse.stat.interval.minute";
+ public static final int DEFAULT_SOURCE_CLICKHOUSE_STAT_INTERVAL = 1;
+ public static final String KEY_SOURCE_DB_STAT_INTERVAL =
"source.db.stat.interval.minute";
+ public static final int DEFAULT_SOURCE_DB_STAT_INTERVAL = 1;
+ public static final String KEY_SOURCE_DB_SINK_INTERVAL =
"sink.db.interval.ms";
+ public static final int DEFAULT_SOURCE_DB_SINK_INTERVAL = 100;
+ public static final String KEY_SOURCE_DB_SINK_BATCH = "sink.db.batch";
+ public static final int DEFAULT_SOURCE_DB_SINK_BATCH = 1000;
+ public static final String KEY_SOURCE_DB_SINK_INTERNAL =
"sink.db.internal.ms";
+ public static final int DEFAULT_SOURCE_DB_SINK_INTERNAL = 100;
+
+ // Api config
+ public static final String KEY_HOUR_API_PATH = "hour.api.path";
+ public static final String DEFAULT_HOUR_API_PATH = "/audit/query/hour";
+ public static final String KEY_DAY_API_PATH = "day.api.path";
+ public static final String DEFAULT_DAY_API_PATH = "/audit/query/day";
+ public static final String KEY_DAY_API_TABLE = "day.api.table";
+ public static final String DEFAULT_DAY_API_TABLE = "audit_data_day";
+ public static final String KEY_MINUTE_API_TABLE = "minute.api.table";
+ public static final String DEFAULT_MINUTE_API_TABLE = "audit_data_temp";
+ public static final String KEY_MINUTE_10_API_PATH = "minute.10.api.path";
+ public static final String DEFAULT_MINUTE_10_API_PATH =
"/audit/query/minute/10";
+ public static final String KEY_MINUTE_30_API_PATH = "minute.30.api.path";
+ public static final String DEFAULT_MINUTE_30_API_PATH =
"/audit/query/minute/30";
+ public static final String KEY_API_POOL_SIZE = "api.pool.size";
+ public static final int DEFAULT_POOL_SIZE = 10;
+ public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
+ public static final int DEFAULT_API_BACKLOG_SIZE = 100;
+
+ public static final String KEY_DATASOURCE_POOL_SIZE =
"datasource.pool.size";
+ public static final int DEFAULT_DATASOURCE_POOL_SIZE = 1000;
+
+ public static final String KEY_DATA_QUEUE_SIZE = "data.queue.size";
+ public static final int DEFAULT_DATA_QUEUE_SIZE = 1000000;
+ public static final String KEY_AUDIT_IDS = "audit.ids";
+ public static final String DEFAULT_AUDIT_IDS = "3;4;5;6";
+
+ // Summary config
+ public static final String KEY_REALTIME_SUMMARY_SOURCE_TABLE =
"realtime.summary.source.table";
+ public static final String DEFAULT_REALTIME_SUMMARY_SOURCE_TABLE =
"audit_data";
+ public static final String KEY_REALTIME_SUMMARY_SINK_TABLE =
"realtime.summary.sink.table";
+ public static final String DEFAULT_REALTIME_SUMMARY_SINK_TABLE =
"audit_data_temp";
+ public static final String KEY_REALTIME_SUMMARY_BEFORE_TIMES =
"realtime.summary.before.times";
+ public static final int DEFAULT_REALTIME_SUMMARY_BEFORE_TIMES = 6;
+
+ public static final String KEY_DAILY_SUMMARY_SOURCE_TABLE =
"daily.summary.source.table";
+ public static final String DEFAULT_DAILY_SUMMARY_SOURCE_TABLE =
"audit_data_temp";
+ public static final String KEY_DAILY_SUMMARY_SINK_TABLE =
"daily.summary.sink.table";
+ public static final String DEFAULT_DAILY_SUMMARY_SINK_TABLE =
"audit_data_day";
+ public static final String KEY_DAILY_SUMMARY_BEFORE_TIMES =
"daily.summary.before.times";
+ public static final int DEFAULT_DAILY_SUMMARY_BEFORE_TIMES = 2;
+
+}
diff --git a/inlong-audit/audit-service/src/main/java/config/Configuration.java
b/inlong-audit/audit-service/src/main/java/config/Configuration.java
new file mode 100644
index 0000000000..56a50f8266
--- /dev/null
+++ b/inlong-audit/audit-service/src/main/java/config/Configuration.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.util.Properties;
+
+/**
+ * Configuration. Only one instance in the process.
+ * Basically it use properties file to store configurations.
+ */
+public class Configuration {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(Configuration.class);
+ public static final String DEFAULT_CONFIG_FILE =
"conf/audit-service.properties";
+
+ private static volatile Configuration conf = null;
+ Properties properties = new Properties();
+
+ /**
+ * load config from agent file.
+ */
+ private Configuration() {
+ try (FileInputStream fileInputStream = new
FileInputStream(DEFAULT_CONFIG_FILE)) {
+ properties.load(fileInputStream);
+ } catch (Exception e) {
+ LOG.error("Configuration has exception!", e);
+ }
+ }
+
+ /**
+ * singleton for configuration.
+ *
+ * @return static instance of Configuration
+ */
+ public static Configuration getInstance() {
+ if (conf == null) {
+ synchronized (Configuration.class) {
+ if (conf == null) {
+ conf = new Configuration();
+ }
+ }
+ }
+ return conf;
+ }
+
+ /**
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public String get(String key, String defaultValue) {
+ Object value = properties.get(key);
+ return value == null ? defaultValue : value.toString();
+ }
+
+ /**
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public int get(String key, int defaultValue) {
+ Object value = properties.get(key);
+ return value == null ? defaultValue : (Integer) value;
+ }
+
+ /**
+ * @param key
+ * @return
+ */
+ public String get(String key) {
+ Object value = properties.get(key);
+ return value == null ? null : value.toString();
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/entities/SourceEntities.java
b/inlong-audit/audit-service/src/main/java/entities/SourceConfig.java
similarity index 88%
copy from inlong-audit/audit-service/src/main/java/entities/SourceEntities.java
copy to inlong-audit/audit-service/src/main/java/entities/SourceConfig.java
index 9e079a875b..f26ea02566 100644
--- a/inlong-audit/audit-service/src/main/java/entities/SourceEntities.java
+++ b/inlong-audit/audit-service/src/main/java/entities/SourceConfig.java
@@ -19,14 +19,17 @@ package entities;
import lombok.Data;
+/**
+ * Source config
+ */
@Data
-public class SourceEntities {
+public class SourceConfig {
private AuditCycle auditCycle;
private String sourceTable;
private int beforeTimes;
- public SourceEntities(AuditCycle auditCycle, String sourceTable, int
beforeTimes) {
+ public SourceConfig(AuditCycle auditCycle, String sourceTable, int
beforeTimes) {
this.auditCycle = auditCycle;
this.sourceTable = sourceTable;
this.beforeTimes = beforeTimes;
diff --git
a/inlong-audit/audit-service/src/main/java/source/AbstractSource.java
b/inlong-audit/audit-service/src/main/java/source/AbstractSource.java
new file mode 100644
index 0000000000..b2c8cc8f2e
--- /dev/null
+++ b/inlong-audit/audit-service/src/main/java/source/AbstractSource.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package source;
+
+import channel.DataQueue;
+import entities.SourceConfig;
+import entities.StartEndTime;
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Abstract source
+ */
+@Data
+public class AbstractSource {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractSource.class);
+ protected final ConcurrentHashMap<Integer, ScheduledExecutorService>
statTimers = new ConcurrentHashMap<>();
+ protected DataQueue dataQueue;
+ protected List<String> auditIds;
+ protected int querySqlTimeout;
+ protected DataSource dataSource;
+ protected String querySql;
+ protected SourceConfig sourceConfig;
+
+ protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+ public AbstractSource(DataQueue dataQueue) {
+ this.dataQueue = dataQueue;
+ }
+
+ /**
+ * Get stat cycle of minute
+ *
+ * @param beforeHour
+ * @param dataCycle
+ * @return
+ */
+ public List<StartEndTime> getStatCycleMinute(int beforeHour, int
dataCycle) {
+ List<StartEndTime> statCycleList = new LinkedList<>();
+ for (int step = 0; step < 60; step = step + dataCycle) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.add(Calendar.HOUR_OF_DAY, -beforeHour);
+
+ calendar.set(Calendar.MINUTE, step);
+ calendar.set(Calendar.SECOND, 0);
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ StartEndTime statCycle = new StartEndTime();
+ statCycle.setStartTime(dateFormat.format(calendar.getTime()));
+
+ calendar.set(Calendar.MINUTE, step + dataCycle - 1);
+ calendar.set(Calendar.SECOND, 0);
+ statCycle.setEndTime(dateFormat.format(calendar.getTime()));
+ statCycleList.add(statCycle);
+ }
+ return statCycleList;
+ }
+
+ /**
+ * Get stat cycle of day
+ *
+ * @param beforeDay
+ * @return
+ */
+ public List<StartEndTime> getStatCycleDay(int beforeDay) {
+ StartEndTime statCycle = new StartEndTime();
+ Calendar calendar = Calendar.getInstance();
+ calendar.add(Calendar.DATE, -beforeDay);
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ statCycle.setStartTime(dateFormat.format(calendar.getTime()));
+
+ calendar.set(Calendar.HOUR_OF_DAY, 23);
+ calendar.set(Calendar.MINUTE, 59);
+ statCycle.setEndTime(dateFormat.format(calendar.getTime()));
+ return new ArrayList<StartEndTime>() {
+
+ {
+ add(statCycle);
+ }
+ };
+ }
+
+ /**
+ * Destory
+ */
+ public void destory() {
+ try {
+ dataSource.getConnection().close();
+ } catch (SQLException exception) {
+ LOG.error(exception.getMessage());
+ }
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/entities/SourceEntities.java
b/inlong-audit/audit-service/src/main/java/source/SourceStat.java
similarity index 67%
rename from
inlong-audit/audit-service/src/main/java/entities/SourceEntities.java
rename to inlong-audit/audit-service/src/main/java/source/SourceStat.java
index 9e079a875b..4477d25532 100644
--- a/inlong-audit/audit-service/src/main/java/entities/SourceEntities.java
+++ b/inlong-audit/audit-service/src/main/java/source/SourceStat.java
@@ -15,20 +15,16 @@
* limitations under the License.
*/
-package entities;
+package source;
-import lombok.Data;
+/**
+ * Source stat interface.
+ */
+public interface SourceStat extends Runnable {
-@Data
-public class SourceEntities {
+ public void statByStep();
- private AuditCycle auditCycle;
- private String sourceTable;
- private int beforeTimes;
+ public void aggregate(String auditId);
- public SourceEntities(AuditCycle auditCycle, String sourceTable, int
beforeTimes) {
- this.auditCycle = auditCycle;
- this.sourceTable = sourceTable;
- this.beforeTimes = beforeTimes;
- }
+ public void query(String startTime, String endTime, String auditId);
}