This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 75f2e984fb [INLONG-11360][Audit] Add a metric monitoring system for 
the Audit Store itself (#11363)
75f2e984fb is described below

commit 75f2e984fb95d0fdedf2f80121df352e95cbc4d7
Author: doleyzi <[email protected]>
AuthorDate: Wed Oct 16 18:35:54 2024 +0800

    [INLONG-11360][Audit] Add a metric monitoring system for the Audit Store 
itself (#11363)
---
 .../apache/inlong/audit/metric/AbstractMetric.java |  1 +
 .../apache/inlong/audit/metric/MetricsManager.java |  1 +
 .../metric/prometheus/ProxyPrometheusMetric.java   |  5 +++
 .../inlong/audit/{ => store}/Application.java      |  2 +-
 .../config/ConfigConstants.java}                   | 23 +++++------
 .../audit/{ => store}/config/JdbcConfig.java       |  2 +-
 .../{ => store}/config/MessageQueueConfig.java     |  2 +-
 .../audit/{ => store}/config/StoreConfig.java      |  2 +-
 .../audit/{db => store}/entities/JdbcDataPo.java   |  2 +-
 .../metric/MetricDimension.java}                   | 27 ++++++-------
 .../metric/MetricItem.java}                        | 38 ++++++++-----------
 .../inlong/audit/store}/metric/MetricsManager.java | 43 ++++++++++-----------
 .../metric/prometheus/StorePrometheusMetric.java}  | 44 +++++++++++++---------
 .../service/AuditMsgConsumerServer.java            | 26 +++++++++----
 .../audit/{ => store}/service/InsertData.java      |  2 +-
 .../audit/{ => store}/service/JdbcService.java     | 15 ++++++--
 .../{ => store}/service/consume/BaseConsume.java   | 15 ++++++--
 .../{ => store}/service/consume/KafkaConsume.java  | 10 +++--
 .../{ => store}/service/consume/PulsarConsume.java | 11 ++++--
 .../{ => store}/service/consume/TubeConsume.java   | 12 ++++--
 .../service/consume/KafkaConsumeTest.java          | 12 +++---
 .../service/consume/PulsarConsumeTest.java         |  2 +-
 .../service/consume/TubeConsumeTest.java           | 12 +++---
 inlong-audit/conf/application.properties           | 10 ++++-
 24 files changed, 181 insertions(+), 138 deletions(-)

diff --git 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
index 4c2f627916..a54995b8d6 100644
--- 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
@@ -20,4 +20,5 @@ package org.apache.inlong.audit.metric;
 public interface AbstractMetric {
 
     public void report();
+    public void stop();
 }
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
index 433fc71848..f27920159b 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
@@ -94,5 +94,6 @@ public class MetricsManager {
     }
     public void shutdown() {
         timer.shutdown();
+        metric.stop();
     }
 }
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
index 07c2397743..0871a613b3 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
@@ -80,4 +80,9 @@ public class ProxyPrometheusMetric extends Collector 
implements AbstractMetric {
     public void report() {
         LOGGER.info("Report proxy prometheus metric: {} ", 
metricItem.toString());
     }
+
+    @Override
+    public void stop() {
+        server.close();
+    }
 }
\ No newline at end of file
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/Application.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/Application.java
similarity index 97%
rename from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/Application.java
rename to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/Application.java
index 0bcb3aa973..46cc7212eb 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/Application.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/Application.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit;
+package org.apache.inlong.audit.store;
 
 import org.springframework.boot.WebApplicationType;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java
similarity index 59%
copy from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
copy to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java
index 7f9bcf8207..a0585efd37 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java
@@ -15,22 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service;
-
-import org.apache.inlong.audit.protocol.AuditData;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
+package org.apache.inlong.audit.store.config;
 
 /**
- * Insert Data interface
+ * Config constants
  */
-public interface InsertData {
-
-    /**
-     * insert audit data to storage.
-     */
-    void insert(AuditData msgBody);
+public class ConfigConstants {
 
-    void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId 
messageId);
+    public static final String AUDIT_STORE_SERVER_NAME = "audit-store";
+    public static final String KEY_PROMETHEUS_PORT = 
"audit.store.prometheus.port";
+    public static final int DEFAULT_PROMETHEUS_PORT = 10083;
+    public static final String KEY_STORE_METRIC_CLASSNAME = 
"audit.store.metric.classname";
+    public static final String DEFAULT_STORE_METRIC_CLASSNAME =
+            
"org.apache.inlong.audit.store.metric.prometheus.StorePrometheusMetric";
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
similarity index 97%
rename from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java
rename to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
index 42249200f0..0b6c7c36bb 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.store.config;
 
 import lombok.Data;
 import org.springframework.beans.factory.annotation.Value;
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/MessageQueueConfig.java
similarity index 98%
rename from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
rename to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/MessageQueueConfig.java
index 2cfa4b1d34..3a4e6c6cae 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/MessageQueueConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.store.config;
 
 import lombok.Getter;
 import lombok.Setter;
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/StoreConfig.java
similarity index 96%
rename from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
rename to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/StoreConfig.java
index ca3358701e..abee9852fe 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/StoreConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.store.config;
 
 import lombok.Getter;
 import lombok.Setter;
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/entities/JdbcDataPo.java
similarity index 96%
copy from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
copy to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/entities/JdbcDataPo.java
index ebc42f4a53..0c568225b3 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/entities/JdbcDataPo.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.db.entities;
+package org.apache.inlong.audit.store.entities;
 
 import lombok.Data;
 import org.apache.pulsar.client.api.Consumer;
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
similarity index 64%
copy from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
copy to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
index 7f9bcf8207..02c2258dbc 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
@@ -15,22 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service;
+package org.apache.inlong.audit.store.metric;
 
-import org.apache.inlong.audit.protocol.AuditData;
+public enum MetricDimension {
 
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
+    RECEIVE_COUNT_SUCCESS("receiveCountSuccess"),
+    RECEIVE_FAILED("receiveFailed"),
+    SEND_COUNT_SUCCESS("sendCountSuccess"),
+    SEND_COUNT_FAILED("sendCountFailed"),
+    SEND_DURATION("sendDuration");
 
-/**
- * Insert Data interface
- */
-public interface InsertData {
+    private final String key;
 
-    /**
-     * insert audit data to storage.
-     */
-    void insert(AuditData msgBody);
+    MetricDimension(String key) {
+        this.key = key;
+    }
 
-    void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId 
messageId);
+    public String getKey() {
+        return key;
+    }
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
similarity index 54%
rename from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
rename to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
index ebc42f4a53..0e5dd9ad18 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
@@ -15,32 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.db.entities;
+package org.apache.inlong.audit.store.metric;
 
 import lombok.Data;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
 
-import java.sql.Timestamp;
+import java.util.concurrent.atomic.AtomicLong;
 
 @Data
-public class JdbcDataPo {
+public class MetricItem {
 
-    private String ip;
-    private String dockerId;
-    private String threadId;
-    private Timestamp sdkTs;
-    private Long packetId;
-    private Timestamp logTs;
-    private String inLongGroupId;
-    private String inLongStreamId;
-    private String auditId;
-    private String auditTag;
-    private long auditVersion;
-    private Long count;
-    private Long size;
-    private Long delay;
-    private Timestamp updateTime;
-    private Consumer<byte[]> consumer;
-    private MessageId messageId;
+    public static final String K_DIMENSION_KEY = "dimensionName";
+    private AtomicLong receiveCountSuccess = new AtomicLong(0);
+    private AtomicLong receiveFailed = new AtomicLong(0);
+    private AtomicLong sendCountSuccess = new AtomicLong(0);
+    private AtomicLong sendCountFailed = new AtomicLong(0);
+    private AtomicLong sendDuration = new AtomicLong(0);
+    public void resetAllMetrics() {
+        receiveCountSuccess.set(0);
+        receiveFailed.set(0);
+        sendCountSuccess.set(0);
+        sendCountFailed.set(0);
+        sendDuration.set(0);
+    }
 }
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
similarity index 66%
copy from 
inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
copy to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
index 433fc71848..68b69609cf 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.metric;
+package org.apache.inlong.audit.store.metric;
 
 import org.apache.inlong.audit.file.ConfigManager;
+import org.apache.inlong.audit.metric.AbstractMetric;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,10 +29,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROXY_METRIC_CLASSNAME;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PROMETHEUS_PORT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PROXY_METRIC_CLASSNAME;
+import static 
org.apache.inlong.audit.store.config.ConfigConstants.DEFAULT_STORE_METRIC_CLASSNAME;
+import static 
org.apache.inlong.audit.store.config.ConfigConstants.KEY_STORE_METRIC_CLASSNAME;
 
 public class MetricsManager {
 
@@ -44,16 +43,15 @@ public class MetricsManager {
 
     private AbstractMetric metric;
 
-    public void init(String metricName) {
+    public void init() {
         try {
-            ConfigManager configManager = ConfigManager.getInstance();
-            String metricClassName = 
configManager.getValue(KEY_PROXY_METRIC_CLASSNAME, 
DEFAULT_PROXY_METRIC_CLASSNAME);
+            String metricClassName =
+                    
ConfigManager.getInstance().getValue(KEY_STORE_METRIC_CLASSNAME, 
DEFAULT_STORE_METRIC_CLASSNAME);
             LOGGER.info("Metric class name: {}", metricClassName);
             Constructor<?> constructor = Class.forName(metricClassName)
-                    .getDeclaredConstructor(String.class, MetricItem.class, 
int.class);
+                    .getDeclaredConstructor(MetricItem.class);
             constructor.setAccessible(true);
-            metric = (AbstractMetric) constructor.newInstance(metricName, 
metricItem,
-                    configManager.getValue(KEY_PROMETHEUS_PORT, 
DEFAULT_PROMETHEUS_PORT));
+            metric = (AbstractMetric) constructor.newInstance(metricItem);
 
             timer.scheduleWithFixedDelay(() -> {
                 metric.report();
@@ -72,27 +70,26 @@ public class MetricsManager {
     private final MetricItem metricItem = new MetricItem();
     protected final ScheduledExecutorService timer = 
Executors.newSingleThreadScheduledExecutor();
 
-    public void addReceiveCountInvalid(long count) {
-        metricItem.getReceiveCountInvalid().addAndGet(count);
-    }
-
-    public void addReceiveCountExpired(long count) {
-        metricItem.getReceiveCountExpired().addAndGet(count);
+    public void addReceiveSuccess(long count) {
+        metricItem.getReceiveCountSuccess().addAndGet(count);
     }
 
-    public void addReceiveSuccess(long count, long pack, long size) {
-        metricItem.getReceiveCountSuccess().addAndGet(count);
-        metricItem.getReceivePackSuccess().addAndGet(pack);
-        metricItem.getReceiveSizeSuccess().addAndGet(size);
+    public void addReceiveFailed(long pack) {
+        metricItem.getReceiveFailed().addAndGet(pack);
     }
 
-    public void addSendSuccess(long count) {
+    public void addSendSuccess(long count, long duration) {
         metricItem.getSendCountSuccess().addAndGet(count);
+        metricItem.getSendDuration().addAndGet(duration);
     }
-    public void addSendFailed(long count) {
+
+    public void addSendFailed(long count, long duration) {
         metricItem.getSendCountFailed().addAndGet(count);
+        metricItem.getSendDuration().addAndGet(duration);
     }
+
     public void shutdown() {
         timer.shutdown();
+        metric.stop();
     }
 }
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
similarity index 60%
copy from 
inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
copy to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
index 07c2397743..6aa60feb6c 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.metric.prometheus;
+package org.apache.inlong.audit.store.metric.prometheus;
 
+import org.apache.inlong.audit.file.ConfigManager;
 import org.apache.inlong.audit.metric.AbstractMetric;
-import org.apache.inlong.audit.metric.MetricDimension;
-import org.apache.inlong.audit.metric.MetricItem;
+import org.apache.inlong.audit.store.metric.MetricDimension;
+import org.apache.inlong.audit.store.metric.MetricItem;
 
 import io.prometheus.client.Collector;
 import io.prometheus.client.exporter.HTTPServer;
@@ -31,26 +32,28 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.inlong.audit.store.config.ConfigConstants.AUDIT_STORE_SERVER_NAME;
+import static 
org.apache.inlong.audit.store.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT;
+import static 
org.apache.inlong.audit.store.config.ConfigConstants.KEY_PROMETHEUS_PORT;
+
 /**
  * PrometheusMetric
  */
-public class ProxyPrometheusMetric extends Collector implements AbstractMetric 
{
+public class StorePrometheusMetric extends Collector implements AbstractMetric 
{
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxyPrometheusMetric.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StorePrometheusMetric.class);
     private static final String HELP_DESCRIPTION = "help";
 
     private final MetricItem metricItem;
-    private final String metricName;
     private HTTPServer server;
 
-    public ProxyPrometheusMetric(String metricName, MetricItem metricItem, int 
prometheusPort) {
-        this.metricName = metricName;
+    public StorePrometheusMetric(MetricItem metricItem) {
         this.metricItem = metricItem;
         try {
-            server = new HTTPServer(prometheusPort);
+            server = new 
HTTPServer(ConfigManager.getInstance().getValue(KEY_PROMETHEUS_PORT, 
DEFAULT_PROMETHEUS_PORT));
             this.register();
         } catch (IOException e) {
-            LOGGER.error("Construct proxy prometheus metric has IOException", 
e);
+            LOGGER.error("Construct store prometheus metric has IOException", 
e);
         }
     }
 
@@ -58,26 +61,31 @@ public class ProxyPrometheusMetric extends Collector 
implements AbstractMetric {
     public List<MetricFamilySamples> collect() {
         List<MetricFamilySamples.Sample> samples = Arrays.asList(
                 createSample(MetricDimension.RECEIVE_COUNT_SUCCESS, 
metricItem.getReceiveCountSuccess().doubleValue()),
-                createSample(MetricDimension.RECEIVE_PACK_SUCCESS, 
metricItem.getReceivePackSuccess().doubleValue()),
-                createSample(MetricDimension.RECEIVE_SIZE_SUCCESS, 
metricItem.getReceiveSizeSuccess().doubleValue()),
-                createSample(MetricDimension.RECEIVE_COUNT_INVALID, 
metricItem.getReceiveCountInvalid().doubleValue()),
-                createSample(MetricDimension.RECEIVE_COUNT_EXPIRED, 
metricItem.getReceiveCountExpired().doubleValue()),
+                createSample(MetricDimension.RECEIVE_FAILED, 
metricItem.getReceiveFailed().doubleValue()),
                 createSample(MetricDimension.SEND_COUNT_SUCCESS, 
metricItem.getSendCountSuccess().doubleValue()),
-                createSample(MetricDimension.SEND_COUNT_FAILED, 
metricItem.getSendCountFailed().doubleValue()));
+                createSample(MetricDimension.SEND_COUNT_FAILED, 
metricItem.getSendCountFailed().doubleValue()),
+                createSample(MetricDimension.SEND_DURATION, 
metricItem.getSendDuration().doubleValue()));
 
         MetricFamilySamples metricFamilySamples =
-                new MetricFamilySamples(metricName, Type.GAUGE, 
HELP_DESCRIPTION, samples);
+                new MetricFamilySamples(AUDIT_STORE_SERVER_NAME, Type.GAUGE, 
HELP_DESCRIPTION, samples);
 
         return Collections.singletonList(metricFamilySamples);
     }
 
     private MetricFamilySamples.Sample createSample(MetricDimension key, 
double value) {
-        return new MetricFamilySamples.Sample(metricName, 
Collections.singletonList(MetricItem.K_DIMENSION_KEY),
+        return new MetricFamilySamples.Sample(AUDIT_STORE_SERVER_NAME,
+                Collections.singletonList(MetricItem.K_DIMENSION_KEY),
                 Collections.singletonList(key.getKey()), value);
     }
 
     @Override
     public void report() {
-        LOGGER.info("Report proxy prometheus metric: {} ", 
metricItem.toString());
+        LOGGER.info("Report store prometheus metric: {} ", 
metricItem.toString());
     }
+
+    @Override
+    public void stop() {
+        server.close();
+    }
+
 }
\ No newline at end of file
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/AuditMsgConsumerServer.java
similarity index 90%
rename from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
rename to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/AuditMsgConsumerServer.java
index 51ed0caf60..1625138068 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/AuditMsgConsumerServer.java
@@ -15,17 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service;
+package org.apache.inlong.audit.store.service;
 
-import org.apache.inlong.audit.config.JdbcConfig;
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
 import org.apache.inlong.audit.consts.ConfigConstants;
 import org.apache.inlong.audit.file.RemoteConfigJson;
-import org.apache.inlong.audit.service.consume.BaseConsume;
-import org.apache.inlong.audit.service.consume.KafkaConsume;
-import org.apache.inlong.audit.service.consume.PulsarConsume;
-import org.apache.inlong.audit.service.consume.TubeConsume;
+import org.apache.inlong.audit.store.config.JdbcConfig;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.metric.MetricsManager;
+import org.apache.inlong.audit.store.service.consume.BaseConsume;
+import org.apache.inlong.audit.store.service.consume.KafkaConsume;
+import org.apache.inlong.audit.store.service.consume.PulsarConsume;
+import org.apache.inlong.audit.store.service.consume.TubeConsume;
 import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
 import org.apache.inlong.common.pojo.audit.MQInfo;
@@ -46,6 +47,8 @@ import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PreDestroy;
+
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
@@ -100,6 +103,8 @@ public class AuditMsgConsumerServer implements 
InitializingBean {
         if (mqConsume != null) {
             mqConsume.start();
         }
+
+        MetricsManager.getInstance().init();
     }
 
     /**
@@ -177,4 +182,9 @@ public class AuditMsgConsumerServer implements 
InitializingBean {
         }
         return null;
     }
+
+    @PreDestroy
+    public void shutdown() {
+        MetricsManager.getInstance().shutdown();
+    }
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/InsertData.java
similarity index 96%
rename from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
rename to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/InsertData.java
index 7f9bcf8207..ef038177ba 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/InsertData.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service;
+package org.apache.inlong.audit.store.service;
 
 import org.apache.inlong.audit.protocol.AuditData;
 
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
similarity index 94%
rename from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java
rename to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
index 7aaac09e64..68ba584d73 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service;
+package org.apache.inlong.audit.store.service;
 
-import org.apache.inlong.audit.config.JdbcConfig;
-import org.apache.inlong.audit.db.entities.JdbcDataPo;
 import org.apache.inlong.audit.protocol.AuditData;
+import org.apache.inlong.audit.store.config.JdbcConfig;
+import org.apache.inlong.audit.store.entities.JdbcDataPo;
+import org.apache.inlong.audit.store.metric.MetricsManager;
 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
@@ -113,6 +114,9 @@ public class JdbcService implements InsertData, 
AutoCloseable {
 
     private boolean executeBatch(List<JdbcDataPo> dataList) {
         boolean result = false;
+
+        long currentTimestamp = System.currentTimeMillis();
+
         try (PreparedStatement statement = 
connection.prepareStatement(INSERT_SQL)) {
             for (JdbcDataPo data : dataList) {
                 statement.setString(1, data.getIp());
@@ -135,7 +139,12 @@ public class JdbcService implements InsertData, 
AutoCloseable {
             statement.executeBatch();
             connection.commit();
             result = true;
+
+            MetricsManager.getInstance().addSendSuccess(dataList.size(), 
System.currentTimeMillis() - currentTimestamp);
+
         } catch (Exception exception) {
+
+            MetricsManager.getInstance().addSendFailed(dataList.size(), 
System.currentTimeMillis() - currentTimestamp);
             LOG.error("Execute batch has failure!", exception);
             try {
                 reconnect();
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/BaseConsume.java
similarity index 85%
rename from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
rename to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/BaseConsume.java
index e0fbb7180a..c92dcb29f6 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/BaseConsume.java
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
 
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
 import org.apache.inlong.audit.protocol.AuditData;
-import org.apache.inlong.audit.service.InsertData;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.metric.MetricsManager;
+import org.apache.inlong.audit.store.service.InsertData;
 
 import com.google.gson.Gson;
 import org.apache.pulsar.client.api.Consumer;
@@ -56,6 +57,9 @@ public abstract class BaseConsume {
      */
     protected void handleMessage(String body) throws Exception {
         AuditData msgBody = gson.fromJson(body, AuditData.class);
+
+        MetricsManager.getInstance().addReceiveSuccess(1);
+
         this.insertServiceList.forEach((service) -> {
             try {
                 service.insert(msgBody);
@@ -66,6 +70,9 @@ public abstract class BaseConsume {
     }
     protected void handleMessage(String body, Consumer<byte[]> consumer, 
MessageId messageId) {
         AuditData msgBody = gson.fromJson(body, AuditData.class);
+
+        MetricsManager.getInstance().addReceiveSuccess(1);
+
         this.insertServiceList.forEach((service) -> {
             try {
                 service.insert(msgBody, consumer, messageId);
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/KafkaConsume.java
similarity index 95%
rename from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
rename to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/KafkaConsume.java
index cc0d8cac21..34daef1c41 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/KafkaConsume.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
 
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.service.InsertData;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.metric.MetricsManager;
+import org.apache.inlong.audit.store.service.InsertData;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
@@ -188,6 +189,7 @@ public class KafkaConsume extends BaseConsume {
                         }
                     }
                 } catch (Exception e) {
+                    MetricsManager.getInstance().addReceiveFailed(1);
                     LOG.error("kafka consumer get message error {}", 
e.getMessage());
                 }
             }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
similarity index 95%
rename from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
rename to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
index 7d8efc79ce..c1a5fe92f2 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
 
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.service.InsertData;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.metric.MetricsManager;
+import org.apache.inlong.audit.store.service.InsertData;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
@@ -131,6 +132,8 @@ public class PulsarConsume extends BaseConsume {
                                     String body = new String(msg.getData(), 
StandardCharsets.UTF_8);
                                     handleMessage(body, consumer, 
msg.getMessageId());
                                 } catch (Exception e) {
+                                    
MetricsManager.getInstance().addReceiveFailed(1);
+
                                     LOG.error("Consumer has exception topic 
{}, subName {}, ex {}",
                                             topic,
                                             
mqConfig.getPulsarConsumerSubName(),
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/TubeConsume.java
similarity index 92%
rename from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
rename to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/TubeConsume.java
index 06d1b5b189..20cbd60e90 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/TubeConsume.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
 
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.service.InsertData;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.metric.MetricsManager;
+import org.apache.inlong.audit.store.service.InsertData;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
 import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
 import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
@@ -133,12 +134,15 @@ public class TubeConsume extends BaseConsume {
                         }
                         
pullMessageConsumer.confirmConsume(csmResult.getConfirmContext(), true);
                     } else {
+                        MetricsManager.getInstance().addReceiveFailed(1);
                         LOG.error("receive messages errorCode is {}, error 
meddage is {}", csmResult.getErrCode(),
                                 csmResult.getErrMsg());
                     }
                 } catch (TubeClientException e) {
+                    MetricsManager.getInstance().addReceiveFailed(1);
                     LOG.error("tube consumer getMessage error {}", 
e.getMessage());
                 } catch (Exception e) {
+                    MetricsManager.getInstance().addReceiveFailed(1);
                     LOG.error("handle audit message error {}", e.getMessage());
                 }
 
diff --git 
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
 
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/KafkaConsumeTest.java
similarity index 87%
rename from 
inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
rename to 
inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/KafkaConsumeTest.java
index 5ddee887bd..4568078f8f 100644
--- 
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
+++ 
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/KafkaConsumeTest.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
 
-import org.apache.inlong.audit.config.JdbcConfig;
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.service.InsertData;
-import org.apache.inlong.audit.service.JdbcService;
+import org.apache.inlong.audit.store.config.JdbcConfig;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.service.InsertData;
+import org.apache.inlong.audit.store.service.JdbcService;
 
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
diff --git 
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java
 
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/PulsarConsumeTest.java
similarity index 97%
rename from 
inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java
rename to 
inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/PulsarConsumeTest.java
index acc7ccd68f..9cd9b241ef 100644
--- 
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java
+++ 
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/PulsarConsumeTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
diff --git 
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
 
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/TubeConsumeTest.java
similarity index 88%
rename from 
inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
rename to 
inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/TubeConsumeTest.java
index 4085f29514..d1bb85c82b 100644
--- 
a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
+++ 
b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/TubeConsumeTest.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service.consume;
+package org.apache.inlong.audit.store.service.consume;
 
-import org.apache.inlong.audit.config.JdbcConfig;
-import org.apache.inlong.audit.config.MessageQueueConfig;
-import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.service.InsertData;
-import org.apache.inlong.audit.service.JdbcService;
+import org.apache.inlong.audit.store.config.JdbcConfig;
+import org.apache.inlong.audit.store.config.MessageQueueConfig;
+import org.apache.inlong.audit.store.config.StoreConfig;
+import org.apache.inlong.audit.store.service.InsertData;
+import org.apache.inlong.audit.store.service.JdbcService;
 import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
 import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
diff --git a/inlong-audit/conf/application.properties 
b/inlong-audit/conf/application.properties
index 063b01aa8f..ad1b4e487f 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -53,7 +53,13 @@ audit.store.jdbc.username=root
 audit.store.jdbc.password=inlong
 
 ############################
-# metric config
+# Audit Proxy metric config
 # org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric is the 
default monitoring
 ###########################
-audit.proxy.metric.classname=org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric
\ No newline at end of file
+audit.proxy.metric.classname=org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric
+
+############################
+# Audit Store metric config
+# org.apache.inlong.audit.store.metric.prometheus.StorePrometheusMetric is the 
default monitoring
+###########################
+audit.store.metric.classname=org.apache.inlong.audit.store.metric.prometheus.StorePrometheusMetric
\ No newline at end of file


Reply via email to