This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dcf31a5ad [INLONG-9918][Audit] Audit-service add codes of config 
(#9919)
8dcf31a5ad is described below

commit 8dcf31a5adbf5642d6bd4eb44a0e161b748d9b4e
Author: doleyzi <[email protected]>
AuthorDate: Sun Apr 7 11:26:21 2024 +0800

    [INLONG-9918][Audit] Audit-service add codes of config (#9919)
---
 .../src/main/java/channel/DataQueue.java           |  69 ++++++++++++
 .../src/main/java/config/ConfigConstants.java      | 104 +++++++++++++++++
 .../src/main/java/config/Configuration.java        |  93 +++++++++++++++
 .../{SourceEntities.java => SourceConfig.java}     |   7 +-
 .../src/main/java/source/AbstractSource.java       | 125 +++++++++++++++++++++
 .../SourceEntities.java => source/SourceStat.java} |  20 ++--
 6 files changed, 404 insertions(+), 14 deletions(-)

diff --git a/inlong-audit/audit-service/src/main/java/channel/DataQueue.java 
b/inlong-audit/audit-service/src/main/java/channel/DataQueue.java
new file mode 100644
index 0000000000..46b3d9068f
--- /dev/null
+++ b/inlong-audit/audit-service/src/main/java/channel/DataQueue.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package channel;
+
+import entities.StatData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Data queue. use in source and sink.
+ */
+public class DataQueue {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataQueue.class);
+
+    private final LinkedBlockingQueue<StatData> queue;
+
+    public DataQueue(int capacity) {
+        queue = new LinkedBlockingQueue<>(capacity);
+    }
+
+    /**
+     * Push data
+     *
+     * @param statDataPo
+     */
+    public void push(StatData statDataPo) throws InterruptedException {
+        queue.put(statDataPo);
+    }
+
+    /**
+     * Pull data
+     *
+     * @param timeout
+     * @param unit
+     * @return
+     */
+    public StatData pull(long timeout, TimeUnit unit) throws 
InterruptedException {
+        return queue.poll(timeout, unit);
+    }
+
+    /**
+     * destroy
+     */
+    public void destroy() {
+        if (queue != null) {
+            queue.clear();
+        }
+        LOG.info("destroy channel!");
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java 
b/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java
new file mode 100644
index 0000000000..a3bd9b29f9
--- /dev/null
+++ b/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config;
+
+/**
+ * Config constants
+ */
+public class ConfigConstants {
+
+    // Source config
+    public static final String KEY_CLICKHOUSE_DRIVER = "clickhouse.driver";
+    public static final String DEFAULT_CLICKHOUSE_DRIVER = 
"ru.yandex.clickhouse.ClickHouseDriver";
+    public static final String KEY_CLICKHOUSE_URL = "clickhouse.url";
+    public static final String KEY_CLICKHOUSE_USERNAME = "clickhouse.username";
+    public static final String KEY_CLICKHOUSE_PASSWORD = "clickhouse.password";
+
+    // DB config
+    public static final String KEY_MYSQL_DRIVER = "mysql.driver";
+    public static final String KEY_DEFAULT_MYSQL_DRIVER = 
"com.mysql.cj.jdbc.Driver";
+    public static final String KEY_MYSQL_URL = "mysql.url";
+    public static final String KEY_MYSQL_USERNAME = "mysql.username";
+    public static final String KEY_MYSQL_PASSWORD = "mysql.password";
+
+    // Time config
+    public static final String KEY_QUERY_SQL_TIME_OUT = "query.sql.timeout";
+    public static final int DEFAULT_QUERY_SQL_TIME_OUT = 300;
+    public static final String KEY_JDBC_TIME_OUT = "jdbc.timeout.second";
+    public static final int DEFAULT_JDBC_TIME_OUT = 300;
+    public static final String KEY_DATASOURCE_CONNECTION_TIMEOUT = 
"datasource.connection.timeout.ms";
+    public static final int DEFAULT_CONNECTION_TIMEOUT = 1000 * 60 * 5;
+    public static final String KEY_API_RESPONSE_TIMEOUT = 
"api.response.timeout";
+    public static final int DEFAULT_API_TIMEOUT = 30;
+    public static final String KEY_QUEUE_PULL_TIMEOUT = 
"queue.pull.timeout.ms";
+    public static final int DEFAULT_QUEUE_PULL_TIMEOUT = 1000;
+
+    // Interval config
+    public static final String KEY_SOURCE_CLICKHOUSE_STAT_INTERVAL = 
"source.clickhouse.stat.interval.minute";
+    public static final int DEFAULT_SOURCE_CLICKHOUSE_STAT_INTERVAL = 1;
+    public static final String KEY_SOURCE_DB_STAT_INTERVAL = 
"source.db.stat.interval.minute";
+    public static final int DEFAULT_SOURCE_DB_STAT_INTERVAL = 1;
+    public static final String KEY_SOURCE_DB_SINK_INTERVAL = 
"sink.db.interval.ms";
+    public static final int DEFAULT_SOURCE_DB_SINK_INTERVAL = 100;
+    public static final String KEY_SOURCE_DB_SINK_BATCH = "sink.db.batch";
+    public static final int DEFAULT_SOURCE_DB_SINK_BATCH = 1000;
+    public static final String KEY_SOURCE_DB_SINK_INTERNAL = 
"sink.db.internal.ms";
+    public static final int DEFAULT_SOURCE_DB_SINK_INTERNAL = 100;
+
+    // Api config
+    public static final String KEY_HOUR_API_PATH = "hour.api.path";
+    public static final String DEFAULT_HOUR_API_PATH = "/audit/query/hour";
+    public static final String KEY_DAY_API_PATH = "day.api.path";
+    public static final String DEFAULT_DAY_API_PATH = "/audit/query/day";
+    public static final String KEY_DAY_API_TABLE = "day.api.table";
+    public static final String DEFAULT_DAY_API_TABLE = "audit_data_day";
+    public static final String KEY_MINUTE_API_TABLE = "minute.api.table";
+    public static final String DEFAULT_MINUTE_API_TABLE = "audit_data_temp";
+    public static final String KEY_MINUTE_10_API_PATH = "minute.10.api.path";
+    public static final String DEFAULT_MINUTE_10_API_PATH = 
"/audit/query/minute/10";
+    public static final String KEY_MINUTE_30_API_PATH = "minute.30.api.path";
+    public static final String DEFAULT_MINUTE_30_API_PATH = 
"/audit/query/minute/30";
+    public static final String KEY_API_POOL_SIZE = "api.pool.size";
+    public static final int DEFAULT_POOL_SIZE = 10;
+    public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
+    public static final int DEFAULT_API_BACKLOG_SIZE = 100;
+
+    public static final String KEY_DATASOURCE_POOL_SIZE = 
"datasource.pool.size";
+    public static final int DEFAULT_DATASOURCE_POOL_SIZE = 1000;
+
+    public static final String KEY_DATA_QUEUE_SIZE = "data.queue.size";
+    public static final int DEFAULT_DATA_QUEUE_SIZE = 1000000;
+    public static final String KEY_AUDIT_IDS = "audit.ids";
+    public static final String DEFAULT_AUDIT_IDS = "3;4;5;6";
+
+    // Summary config
+    public static final String KEY_REALTIME_SUMMARY_SOURCE_TABLE = 
"realtime.summary.source.table";
+    public static final String DEFAULT_REALTIME_SUMMARY_SOURCE_TABLE = 
"audit_data";
+    public static final String KEY_REALTIME_SUMMARY_SINK_TABLE = 
"realtime.summary.sink.table";
+    public static final String DEFAULT_REALTIME_SUMMARY_SINK_TABLE = 
"audit_data_temp";
+    public static final String KEY_REALTIME_SUMMARY_BEFORE_TIMES = 
"realtime.summary.before.times";
+    public static final int DEFAULT_REALTIME_SUMMARY_BEFORE_TIMES = 6;
+
+    public static final String KEY_DAILY_SUMMARY_SOURCE_TABLE = 
"daily.summary.source.table";
+    public static final String DEFAULT_DAILY_SUMMARY_SOURCE_TABLE = 
"audit_data_temp";
+    public static final String KEY_DAILY_SUMMARY_SINK_TABLE = 
"daily.summary.sink.table";
+    public static final String DEFAULT_DAILY_SUMMARY_SINK_TABLE = 
"audit_data_day";
+    public static final String KEY_DAILY_SUMMARY_BEFORE_TIMES = 
"daily.summary.before.times";
+    public static final int DEFAULT_DAILY_SUMMARY_BEFORE_TIMES = 2;
+
+}
diff --git a/inlong-audit/audit-service/src/main/java/config/Configuration.java 
b/inlong-audit/audit-service/src/main/java/config/Configuration.java
new file mode 100644
index 0000000000..56a50f8266
--- /dev/null
+++ b/inlong-audit/audit-service/src/main/java/config/Configuration.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.util.Properties;
+
+/**
+ * Configuration. Only one instance in the process.
+ * Basically it use properties file to store configurations.
+ */
+public class Configuration {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Configuration.class);
+    public static final String DEFAULT_CONFIG_FILE = 
"conf/audit-service.properties";
+
+    private static volatile Configuration conf = null;
+    Properties properties = new Properties();
+
+    /**
+     * load config from agent file.
+     */
+    private Configuration() {
+        try (FileInputStream fileInputStream = new 
FileInputStream(DEFAULT_CONFIG_FILE)) {
+            properties.load(fileInputStream);
+        } catch (Exception e) {
+            LOG.error("Configuration has exception!", e);
+        }
+    }
+
+    /**
+     * singleton for configuration.
+     *
+     * @return static instance of Configuration
+     */
+    public static Configuration getInstance() {
+        if (conf == null) {
+            synchronized (Configuration.class) {
+                if (conf == null) {
+                    conf = new Configuration();
+                }
+            }
+        }
+        return conf;
+    }
+
+    /**
+     * @param key
+     * @param defaultValue
+     * @return
+     */
+    public String get(String key, String defaultValue) {
+        Object value = properties.get(key);
+        return value == null ? defaultValue : value.toString();
+    }
+
+    /**
+     * @param key
+     * @param defaultValue
+     * @return
+     */
+    public int get(String key, int defaultValue) {
+        Object value = properties.get(key);
+        return value == null ? defaultValue : (Integer) value;
+    }
+
+    /**
+     * @param key
+     * @return
+     */
+    public String get(String key) {
+        Object value = properties.get(key);
+        return value == null ? null : value.toString();
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/entities/SourceEntities.java 
b/inlong-audit/audit-service/src/main/java/entities/SourceConfig.java
similarity index 88%
copy from inlong-audit/audit-service/src/main/java/entities/SourceEntities.java
copy to inlong-audit/audit-service/src/main/java/entities/SourceConfig.java
index 9e079a875b..f26ea02566 100644
--- a/inlong-audit/audit-service/src/main/java/entities/SourceEntities.java
+++ b/inlong-audit/audit-service/src/main/java/entities/SourceConfig.java
@@ -19,14 +19,17 @@ package entities;
 
 import lombok.Data;
 
+/**
+ * Source config
+ */
 @Data
-public class SourceEntities {
+public class SourceConfig {
 
     private AuditCycle auditCycle;
     private String sourceTable;
     private int beforeTimes;
 
-    public SourceEntities(AuditCycle auditCycle, String sourceTable, int 
beforeTimes) {
+    public SourceConfig(AuditCycle auditCycle, String sourceTable, int 
beforeTimes) {
         this.auditCycle = auditCycle;
         this.sourceTable = sourceTable;
         this.beforeTimes = beforeTimes;
diff --git 
a/inlong-audit/audit-service/src/main/java/source/AbstractSource.java 
b/inlong-audit/audit-service/src/main/java/source/AbstractSource.java
new file mode 100644
index 0000000000..b2c8cc8f2e
--- /dev/null
+++ b/inlong-audit/audit-service/src/main/java/source/AbstractSource.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package source;
+
+import channel.DataQueue;
+import entities.SourceConfig;
+import entities.StartEndTime;
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Abstract source
+ */
+@Data
+public class AbstractSource {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractSource.class);
+    protected final ConcurrentHashMap<Integer, ScheduledExecutorService> 
statTimers = new ConcurrentHashMap<>();
+    protected DataQueue dataQueue;
+    protected List<String> auditIds;
+    protected int querySqlTimeout;
+    protected DataSource dataSource;
+    protected String querySql;
+    protected SourceConfig sourceConfig;
+
+    protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+    public AbstractSource(DataQueue dataQueue) {
+        this.dataQueue = dataQueue;
+    }
+
+    /**
+     * Get stat cycle of minute
+     *
+     * @param beforeHour
+     * @param dataCycle
+     * @return
+     */
+    public List<StartEndTime> getStatCycleMinute(int beforeHour, int 
dataCycle) {
+        List<StartEndTime> statCycleList = new LinkedList<>();
+        for (int step = 0; step < 60; step = step + dataCycle) {
+            Calendar calendar = Calendar.getInstance();
+            calendar.add(Calendar.HOUR_OF_DAY, -beforeHour);
+
+            calendar.set(Calendar.MINUTE, step);
+            calendar.set(Calendar.SECOND, 0);
+            SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+            StartEndTime statCycle = new StartEndTime();
+            statCycle.setStartTime(dateFormat.format(calendar.getTime()));
+
+            calendar.set(Calendar.MINUTE, step + dataCycle - 1);
+            calendar.set(Calendar.SECOND, 0);
+            statCycle.setEndTime(dateFormat.format(calendar.getTime()));
+            statCycleList.add(statCycle);
+        }
+        return statCycleList;
+    }
+
+    /**
+     * Get stat cycle of day
+     *
+     * @param beforeDay
+     * @return
+     */
+    public List<StartEndTime> getStatCycleDay(int beforeDay) {
+        StartEndTime statCycle = new StartEndTime();
+        Calendar calendar = Calendar.getInstance();
+        calendar.add(Calendar.DATE, -beforeDay);
+        calendar.set(Calendar.HOUR_OF_DAY, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+        statCycle.setStartTime(dateFormat.format(calendar.getTime()));
+
+        calendar.set(Calendar.HOUR_OF_DAY, 23);
+        calendar.set(Calendar.MINUTE, 59);
+        statCycle.setEndTime(dateFormat.format(calendar.getTime()));
+        return new ArrayList<StartEndTime>() {
+
+            {
+                add(statCycle);
+            }
+        };
+    }
+
+    /**
+     * Destory
+     */
+    public void destory() {
+        try {
+            dataSource.getConnection().close();
+        } catch (SQLException exception) {
+            LOG.error(exception.getMessage());
+        }
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/entities/SourceEntities.java 
b/inlong-audit/audit-service/src/main/java/source/SourceStat.java
similarity index 67%
rename from 
inlong-audit/audit-service/src/main/java/entities/SourceEntities.java
rename to inlong-audit/audit-service/src/main/java/source/SourceStat.java
index 9e079a875b..4477d25532 100644
--- a/inlong-audit/audit-service/src/main/java/entities/SourceEntities.java
+++ b/inlong-audit/audit-service/src/main/java/source/SourceStat.java
@@ -15,20 +15,16 @@
  * limitations under the License.
  */
 
-package entities;
+package source;
 
-import lombok.Data;
+/**
+ * Source stat interface.
+ */
+public interface SourceStat extends Runnable {
 
-@Data
-public class SourceEntities {
+    public void statByStep();
 
-    private AuditCycle auditCycle;
-    private String sourceTable;
-    private int beforeTimes;
+    public void aggregate(String auditId);
 
-    public SourceEntities(AuditCycle auditCycle, String sourceTable, int 
beforeTimes) {
-        this.auditCycle = auditCycle;
-        this.sourceTable = sourceTable;
-        this.beforeTimes = beforeTimes;
-    }
+    public void query(String startTime, String endTime, String auditId);
 }

Reply via email to