This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 75f2e984fb [INLONG-11360][Audit] Add a metric monitoring system for
the Audit Store itself (#11363)
75f2e984fb is described below
commit 75f2e984fb95d0fdedf2f80121df352e95cbc4d7
Author: doleyzi <[email protected]>
AuthorDate: Wed Oct 16 18:35:54 2024 +0800
[INLONG-11360][Audit] Add a metric monitoring system for the Audit Store
itself (#11363)
---
.../apache/inlong/audit/metric/AbstractMetric.java | 1 +
.../apache/inlong/audit/metric/MetricsManager.java | 1 +
.../metric/prometheus/ProxyPrometheusMetric.java | 5 +++
.../inlong/audit/{ => store}/Application.java | 2 +-
.../config/ConfigConstants.java} | 23 +++++------
.../audit/{ => store}/config/JdbcConfig.java | 2 +-
.../{ => store}/config/MessageQueueConfig.java | 2 +-
.../audit/{ => store}/config/StoreConfig.java | 2 +-
.../audit/{db => store}/entities/JdbcDataPo.java | 2 +-
.../metric/MetricDimension.java} | 27 ++++++-------
.../metric/MetricItem.java} | 38 ++++++++-----------
.../inlong/audit/store}/metric/MetricsManager.java | 43 ++++++++++-----------
.../metric/prometheus/StorePrometheusMetric.java} | 44 +++++++++++++---------
.../service/AuditMsgConsumerServer.java | 26 +++++++++----
.../audit/{ => store}/service/InsertData.java | 2 +-
.../audit/{ => store}/service/JdbcService.java | 15 ++++++--
.../{ => store}/service/consume/BaseConsume.java | 15 ++++++--
.../{ => store}/service/consume/KafkaConsume.java | 10 +++--
.../{ => store}/service/consume/PulsarConsume.java | 11 ++++--
.../{ => store}/service/consume/TubeConsume.java | 12 ++++--
.../service/consume/KafkaConsumeTest.java | 12 +++---
.../service/consume/PulsarConsumeTest.java | 2 +-
.../service/consume/TubeConsumeTest.java | 12 +++---
inlong-audit/conf/application.properties | 10 ++++-
24 files changed, 181 insertions(+), 138 deletions(-)
diff --git
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
index 4c2f627916..a54995b8d6 100644
---
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
@@ -20,4 +20,5 @@ package org.apache.inlong.audit.metric;
public interface AbstractMetric {
public void report();
+ public void stop();
}
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
index 433fc71848..f27920159b 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
@@ -94,5 +94,6 @@ public class MetricsManager {
}
public void shutdown() {
timer.shutdown();
+ metric.stop();
}
}
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
index 07c2397743..0871a613b3 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
@@ -80,4 +80,9 @@ public class ProxyPrometheusMetric extends Collector
implements AbstractMetric {
public void report() {
LOGGER.info("Report proxy prometheus metric: {} ",
metricItem.toString());
}
+
+ @Override
+ public void stop() {
+ server.close();
+ }
}
\ No newline at end of file
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/Application.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/Application.java
similarity index 97%
rename from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/Application.java
rename to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/Application.java
index 0bcb3aa973..46cc7212eb 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/Application.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/Application.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.audit;
+package org.apache.inlong.audit.store;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java
similarity index 59%
copy from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
copy to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java
index 7f9bcf8207..a0585efd37 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java
@@ -15,22 +15,17 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service;
-
-import org.apache.inlong.audit.protocol.AuditData;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
+package org.apache.inlong.audit.store.config;
/**
- * Insert Data interface
+ * Config constants
*/
-public interface InsertData {
-
- /**
- * insert audit data to storage.
- */
- void insert(AuditData msgBody);
+public class ConfigConstants {
- void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId
messageId);
+ public static final String AUDIT_STORE_SERVER_NAME = "audit-store";
+ public static final String KEY_PROMETHEUS_PORT =
"audit.store.prometheus.port";
+ public static final int DEFAULT_PROMETHEUS_PORT = 10083;
+ public static final String KEY_STORE_METRIC_CLASSNAME =
"audit.store.metric.classname";
+ public static final String DEFAULT_STORE_METRIC_CLASSNAME =
+
"org.apache.inlong.audit.store.metric.prometheus.StorePrometheusMetric";
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
similarity index 97%
rename from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java
rename to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
index 42249200f0..0b6c7c36bb 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.store.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/MessageQueueConfig.java
similarity index 98%
rename from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
rename to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/MessageQueueConfig.java
index 2cfa4b1d34..3a4e6c6cae 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/MessageQueueConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.store.config;
import lombok.Getter;
import lombok.Setter;
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/StoreConfig.java
similarity index 96%
rename from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
rename to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/StoreConfig.java
index ca3358701e..abee9852fe 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/StoreConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.store.config;
import lombok.Getter;
import lombok.Setter;
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/entities/JdbcDataPo.java
similarity index 96%
copy from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
copy to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/entities/JdbcDataPo.java
index ebc42f4a53..0c568225b3 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/entities/JdbcDataPo.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.db.entities;
+package org.apache.inlong.audit.store.entities;
import lombok.Data;
import org.apache.pulsar.client.api.Consumer;
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
similarity index 64%
copy from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
copy to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
index 7f9bcf8207..02c2258dbc 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
@@ -15,22 +15,23 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service;
+package org.apache.inlong.audit.store.metric;
-import org.apache.inlong.audit.protocol.AuditData;
+public enum MetricDimension {
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
+ RECEIVE_COUNT_SUCCESS("receiveCountSuccess"),
+ RECEIVE_FAILED("receiveFailed"),
+ SEND_COUNT_SUCCESS("sendCountSuccess"),
+ SEND_COUNT_FAILED("sendCountFailed"),
+ SEND_DURATION("sendDuration");
-/**
- * Insert Data interface
- */
-public interface InsertData {
+ private final String key;
- /**
- * insert audit data to storage.
- */
- void insert(AuditData msgBody);
+ MetricDimension(String key) {
+ this.key = key;
+ }
- void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId
messageId);
+ public String getKey() {
+ return key;
+ }
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
similarity index 54%
rename from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
rename to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
index ebc42f4a53..0e5dd9ad18 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
@@ -15,32 +15,26 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.db.entities;
+package org.apache.inlong.audit.store.metric;
import lombok.Data;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
-import java.sql.Timestamp;
+import java.util.concurrent.atomic.AtomicLong;
@Data
-public class JdbcDataPo {
+public class MetricItem {
- private String ip;
- private String dockerId;
- private String threadId;
- private Timestamp sdkTs;
- private Long packetId;
- private Timestamp logTs;
- private String inLongGroupId;
- private String inLongStreamId;
- private String auditId;
- private String auditTag;
- private long auditVersion;
- private Long count;
- private Long size;
- private Long delay;
- private Timestamp updateTime;
- private Consumer<byte[]> consumer;
- private MessageId messageId;
+ public static final String K_DIMENSION_KEY = "dimensionName";
+ private AtomicLong receiveCountSuccess = new AtomicLong(0);
+ private AtomicLong receiveFailed = new AtomicLong(0);
+ private AtomicLong sendCountSuccess = new AtomicLong(0);
+ private AtomicLong sendCountFailed = new AtomicLong(0);
+ private AtomicLong sendDuration = new AtomicLong(0);
+ public void resetAllMetrics() {
+ receiveCountSuccess.set(0);
+ receiveFailed.set(0);
+ sendCountSuccess.set(0);
+ sendCountFailed.set(0);
+ sendDuration.set(0);
+ }
}
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
similarity index 66%
copy from
inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
copy to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
index 433fc71848..68b69609cf 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.metric;
+package org.apache.inlong.audit.store.metric;
import org.apache.inlong.audit.file.ConfigManager;
+import org.apache.inlong.audit.metric.AbstractMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,10 +29,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT;
-import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROXY_METRIC_CLASSNAME;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PROMETHEUS_PORT;
-import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PROXY_METRIC_CLASSNAME;
+import static
org.apache.inlong.audit.store.config.ConfigConstants.DEFAULT_STORE_METRIC_CLASSNAME;
+import static
org.apache.inlong.audit.store.config.ConfigConstants.KEY_STORE_METRIC_CLASSNAME;
public class MetricsManager {
@@ -44,16 +43,15 @@ public class MetricsManager {
private AbstractMetric metric;
- public void init(String metricName) {
+ public void init() {
try {
- ConfigManager configManager = ConfigManager.getInstance();
- String metricClassName =
configManager.getValue(KEY_PROXY_METRIC_CLASSNAME,
DEFAULT_PROXY_METRIC_CLASSNAME);
+ String metricClassName =
+
ConfigManager.getInstance().getValue(KEY_STORE_METRIC_CLASSNAME,
DEFAULT_STORE_METRIC_CLASSNAME);
LOGGER.info("Metric class name: {}", metricClassName);
Constructor<?> constructor = Class.forName(metricClassName)
- .getDeclaredConstructor(String.class, MetricItem.class,
int.class);
+ .getDeclaredConstructor(MetricItem.class);
constructor.setAccessible(true);
- metric = (AbstractMetric) constructor.newInstance(metricName,
metricItem,
- configManager.getValue(KEY_PROMETHEUS_PORT,
DEFAULT_PROMETHEUS_PORT));
+ metric = (AbstractMetric) constructor.newInstance(metricItem);
timer.scheduleWithFixedDelay(() -> {
metric.report();
@@ -72,27 +70,26 @@ public class MetricsManager {
private final MetricItem metricItem = new MetricItem();
protected final ScheduledExecutorService timer =
Executors.newSingleThreadScheduledExecutor();
- public void addReceiveCountInvalid(long count) {
- metricItem.getReceiveCountInvalid().addAndGet(count);
- }
-
- public void addReceiveCountExpired(long count) {
- metricItem.getReceiveCountExpired().addAndGet(count);
+ public void addReceiveSuccess(long count) {
+ metricItem.getReceiveCountSuccess().addAndGet(count);
}
- public void addReceiveSuccess(long count, long pack, long size) {
- metricItem.getReceiveCountSuccess().addAndGet(count);
- metricItem.getReceivePackSuccess().addAndGet(pack);
- metricItem.getReceiveSizeSuccess().addAndGet(size);
+ public void addReceiveFailed(long pack) {
+ metricItem.getReceiveFailed().addAndGet(pack);
}
- public void addSendSuccess(long count) {
+ public void addSendSuccess(long count, long duration) {
metricItem.getSendCountSuccess().addAndGet(count);
+ metricItem.getSendDuration().addAndGet(duration);
}
- public void addSendFailed(long count) {
+
+ public void addSendFailed(long count, long duration) {
metricItem.getSendCountFailed().addAndGet(count);
+ metricItem.getSendDuration().addAndGet(duration);
}
+
public void shutdown() {
timer.shutdown();
+ metric.stop();
}
}
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
similarity index 60%
copy from
inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
copy to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
index 07c2397743..6aa60feb6c 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.metric.prometheus;
+package org.apache.inlong.audit.store.metric.prometheus;
+import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.metric.AbstractMetric;
-import org.apache.inlong.audit.metric.MetricDimension;
-import org.apache.inlong.audit.metric.MetricItem;
+import org.apache.inlong.audit.store.metric.MetricDimension;
+import org.apache.inlong.audit.store.metric.MetricItem;
import io.prometheus.client.Collector;
import io.prometheus.client.exporter.HTTPServer;
@@ -31,26 +32,28 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static
org.apache.inlong.audit.store.config.ConfigConstants.AUDIT_STORE_SERVER_NAME;
+import static
org.apache.inlong.audit.store.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT;
+import static
org.apache.inlong.audit.store.config.ConfigConstants.KEY_PROMETHEUS_PORT;
+
/**
* PrometheusMetric
*/
-public class ProxyPrometheusMetric extends Collector implements AbstractMetric
{
+public class StorePrometheusMetric extends Collector implements AbstractMetric
{
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProxyPrometheusMetric.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StorePrometheusMetric.class);
private static final String HELP_DESCRIPTION = "help";
private final MetricItem metricItem;
- private final String metricName;
private HTTPServer server;
- public ProxyPrometheusMetric(String metricName, MetricItem metricItem, int
prometheusPort) {
- this.metricName = metricName;
+ public StorePrometheusMetric(MetricItem metricItem) {
this.metricItem = metricItem;
try {
- server = new HTTPServer(prometheusPort);
+ server = new
HTTPServer(ConfigManager.getInstance().getValue(KEY_PROMETHEUS_PORT,
DEFAULT_PROMETHEUS_PORT));
this.register();
} catch (IOException e) {
- LOGGER.error("Construct proxy prometheus metric has IOException",
e);
+ LOGGER.error("Construct store prometheus metric has IOException",
e);
}
}
@@ -58,26 +61,31 @@ public class ProxyPrometheusMetric extends Collector
implements AbstractMetric {
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples.Sample> samples = Arrays.asList(
createSample(MetricDimension.RECEIVE_COUNT_SUCCESS,
metricItem.getReceiveCountSuccess().doubleValue()),
- createSample(MetricDimension.RECEIVE_PACK_SUCCESS,
metricItem.getReceivePackSuccess().doubleValue()),
- createSample(MetricDimension.RECEIVE_SIZE_SUCCESS,
metricItem.getReceiveSizeSuccess().doubleValue()),
- createSample(MetricDimension.RECEIVE_COUNT_INVALID,
metricItem.getReceiveCountInvalid().doubleValue()),
- createSample(MetricDimension.RECEIVE_COUNT_EXPIRED,
metricItem.getReceiveCountExpired().doubleValue()),
+ createSample(MetricDimension.RECEIVE_FAILED,
metricItem.getReceiveFailed().doubleValue()),
createSample(MetricDimension.SEND_COUNT_SUCCESS,
metricItem.getSendCountSuccess().doubleValue()),
- createSample(MetricDimension.SEND_COUNT_FAILED,
metricItem.getSendCountFailed().doubleValue()));
+ createSample(MetricDimension.SEND_COUNT_FAILED,
metricItem.getSendCountFailed().doubleValue()),
+ createSample(MetricDimension.SEND_DURATION,
metricItem.getSendDuration().doubleValue()));
MetricFamilySamples metricFamilySamples =
- new MetricFamilySamples(metricName, Type.GAUGE,
HELP_DESCRIPTION, samples);
+ new MetricFamilySamples(AUDIT_STORE_SERVER_NAME, Type.GAUGE,
HELP_DESCRIPTION, samples);
return Collections.singletonList(metricFamilySamples);
}
private MetricFamilySamples.Sample createSample(MetricDimension key,
double value) {
- return new MetricFamilySamples.Sample(metricName,
Collections.singletonList(MetricItem.K_DIMENSION_KEY),
+ return new MetricFamilySamples.Sample(AUDIT_STORE_SERVER_NAME,
+ Collections.singletonList(MetricItem.K_DIMENSION_KEY),
Collections.singletonList(key.getKey()), value);
}
@Override
public void report() {
- LOGGER.info("Report proxy prometheus metric: {} ",
metricItem.toString());
+ LOGGER.info("Report store prometheus metric: {} ",
metricItem.toString());
}
+
+ @Override
+ public void stop() {
+ server.close();
+ }
+
}
\ No newline at end of file
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/AuditMsgConsumerServer.java
similarity index 90%
rename from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
rename to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/AuditMsgConsumerServer.java
index 51ed0caf60..1625138068 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/AuditMsgConsumerServer.java
@@ -15,17 +15,18 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service;
+package org.apache.inlong.audit.store.service;
-import org.apache.inlong.audit.config.JdbcConfig;
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.consts.ConfigConstants;
import org.apache.inlong.audit.file.RemoteConfigJson;
-import org.apache.inlong.audit.service.consume.BaseConsume;
-import org.apache.inlong.audit.service.consume.KafkaConsume;
-import org.apache.inlong.audit.service.consume.PulsarConsume;
-import org.apache.inlong.audit.service.consume.TubeConsume;
+import org.apache.inlong.audit.store.config.JdbcConfig;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.metric.MetricsManager;
+import org.apache.inlong.audit.store.service.consume.BaseConsume;
+import org.apache.inlong.audit.store.service.consume.KafkaConsume;
+import org.apache.inlong.audit.store.service.consume.PulsarConsume;
+import org.apache.inlong.audit.store.service.consume.TubeConsume;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
import org.apache.inlong.common.pojo.audit.MQInfo;
@@ -46,6 +47,8 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import javax.annotation.PreDestroy;
+
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
@@ -100,6 +103,8 @@ public class AuditMsgConsumerServer implements
InitializingBean {
if (mqConsume != null) {
mqConsume.start();
}
+
+ MetricsManager.getInstance().init();
}
/**
@@ -177,4 +182,9 @@ public class AuditMsgConsumerServer implements
InitializingBean {
}
return null;
}
+
+ @PreDestroy
+ public void shutdown() {
+ MetricsManager.getInstance().shutdown();
+ }
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/InsertData.java
similarity index 96%
rename from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
rename to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/InsertData.java
index 7f9bcf8207..ef038177ba 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/InsertData.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service;
+package org.apache.inlong.audit.store.service;
import org.apache.inlong.audit.protocol.AuditData;
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
similarity index 94%
rename from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java
rename to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
index 7aaac09e64..68ba584d73 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service;
+package org.apache.inlong.audit.store.service;
-import org.apache.inlong.audit.config.JdbcConfig;
-import org.apache.inlong.audit.db.entities.JdbcDataPo;
import org.apache.inlong.audit.protocol.AuditData;
+import org.apache.inlong.audit.store.config.JdbcConfig;
+import org.apache.inlong.audit.store.entities.JdbcDataPo;
+import org.apache.inlong.audit.store.metric.MetricsManager;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
@@ -113,6 +114,9 @@ public class JdbcService implements InsertData,
AutoCloseable {
private boolean executeBatch(List<JdbcDataPo> dataList) {
boolean result = false;
+
+ long currentTimestamp = System.currentTimeMillis();
+
try (PreparedStatement statement =
connection.prepareStatement(INSERT_SQL)) {
for (JdbcDataPo data : dataList) {
statement.setString(1, data.getIp());
@@ -135,7 +139,12 @@ public class JdbcService implements InsertData,
AutoCloseable {
statement.executeBatch();
connection.commit();
result = true;
+
+ MetricsManager.getInstance().addSendSuccess(dataList.size(),
System.currentTimeMillis() - currentTimestamp);
+
} catch (Exception exception) {
+
+ MetricsManager.getInstance().addSendFailed(dataList.size(),
System.currentTimeMillis() - currentTimestamp);
LOG.error("Execute batch has failure!", exception);
try {
reconnect();
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/BaseConsume.java
similarity index 85%
rename from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
rename to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/BaseConsume.java
index e0fbb7180a..c92dcb29f6 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/BaseConsume.java
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.protocol.AuditData;
-import org.apache.inlong.audit.service.InsertData;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.metric.MetricsManager;
+import org.apache.inlong.audit.store.service.InsertData;
import com.google.gson.Gson;
import org.apache.pulsar.client.api.Consumer;
@@ -56,6 +57,9 @@ public abstract class BaseConsume {
*/
protected void handleMessage(String body) throws Exception {
AuditData msgBody = gson.fromJson(body, AuditData.class);
+
+ MetricsManager.getInstance().addReceiveSuccess(1);
+
this.insertServiceList.forEach((service) -> {
try {
service.insert(msgBody);
@@ -66,6 +70,9 @@ public abstract class BaseConsume {
}
protected void handleMessage(String body, Consumer<byte[]> consumer,
MessageId messageId) {
AuditData msgBody = gson.fromJson(body, AuditData.class);
+
+ MetricsManager.getInstance().addReceiveSuccess(1);
+
this.insertServiceList.forEach((service) -> {
try {
service.insert(msgBody, consumer, messageId);
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/KafkaConsume.java
similarity index 95%
rename from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
rename to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/KafkaConsume.java
index cc0d8cac21..34daef1c41 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/KafkaConsume.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.service.InsertData;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.metric.MetricsManager;
+import org.apache.inlong.audit.store.service.InsertData;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
@@ -188,6 +189,7 @@ public class KafkaConsume extends BaseConsume {
}
}
} catch (Exception e) {
+ MetricsManager.getInstance().addReceiveFailed(1);
LOG.error("kafka consumer get message error {}",
e.getMessage());
}
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
similarity index 95%
rename from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
rename to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
index 7d8efc79ce..c1a5fe92f2 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.service.InsertData;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.metric.MetricsManager;
+import org.apache.inlong.audit.store.service.InsertData;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
@@ -131,6 +132,8 @@ public class PulsarConsume extends BaseConsume {
String body = new String(msg.getData(),
StandardCharsets.UTF_8);
handleMessage(body, consumer,
msg.getMessageId());
} catch (Exception e) {
+
MetricsManager.getInstance().addReceiveFailed(1);
+
LOG.error("Consumer has exception topic
{}, subName {}, ex {}",
topic,
mqConfig.getPulsarConsumerSubName(),
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/TubeConsume.java
similarity index 92%
rename from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
rename to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/TubeConsume.java
index 06d1b5b189..20cbd60e90 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/TubeConsume.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.service.InsertData;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.metric.MetricsManager;
+import org.apache.inlong.audit.store.service.InsertData;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
@@ -133,12 +134,15 @@ public class TubeConsume extends BaseConsume {
}
pullMessageConsumer.confirmConsume(csmResult.getConfirmContext(), true);
} else {
+ MetricsManager.getInstance().addReceiveFailed(1);
LOG.error("receive messages errorCode is {}, error
meddage is {}", csmResult.getErrCode(),
csmResult.getErrMsg());
}
} catch (TubeClientException e) {
+ MetricsManager.getInstance().addReceiveFailed(1);
LOG.error("tube consumer getMessage error {}",
e.getMessage());
} catch (Exception e) {
+ MetricsManager.getInstance().addReceiveFailed(1);
LOG.error("handle audit message error {}", e.getMessage());
}
diff --git
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/KafkaConsumeTest.java
similarity index 87%
rename from
inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
rename to
inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/KafkaConsumeTest.java
index 5ddee887bd..4568078f8f 100644
---
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
+++
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/KafkaConsumeTest.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
-import org.apache.inlong.audit.config.JdbcConfig;
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.service.InsertData;
-import org.apache.inlong.audit.service.JdbcService;
+import org.apache.inlong.audit.store.config.JdbcConfig;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.service.InsertData;
+import org.apache.inlong.audit.store.service.JdbcService;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
diff --git
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/PulsarConsumeTest.java
similarity index 97%
rename from
inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java
rename to
inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/PulsarConsumeTest.java
index acc7ccd68f..9cd9b241ef 100644
---
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java
+++
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/PulsarConsumeTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.impl.PulsarClientImpl;
diff --git
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/TubeConsumeTest.java
similarity index 88%
rename from
inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
rename to
inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/TubeConsumeTest.java
index 4085f29514..d1bb85c82b 100644
---
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
+++
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/TubeConsumeTest.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
-import org.apache.inlong.audit.config.JdbcConfig;
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.service.InsertData;
-import org.apache.inlong.audit.service.JdbcService;
+import org.apache.inlong.audit.store.config.JdbcConfig;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.service.InsertData;
+import org.apache.inlong.audit.store.service.JdbcService;
import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
diff --git a/inlong-audit/conf/application.properties
b/inlong-audit/conf/application.properties
index 063b01aa8f..ad1b4e487f 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -53,7 +53,13 @@ audit.store.jdbc.username=root
audit.store.jdbc.password=inlong
############################
-# metric config
+# Audit Proxy metric config
# org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric is the
default monitoring
###########################
-audit.proxy.metric.classname=org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric
\ No newline at end of file
+audit.proxy.metric.classname=org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric
+
+############################
+# Audit Store metric config
+# org.apache.inlong.audit.store.metric.prometheus.StorePrometheusMetric is the
default monitoring
+###########################
+audit.store.metric.classname=org.apache.inlong.audit.store.metric.prometheus.StorePrometheusMetric
\ No newline at end of file