This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d8cabc  fix #46 support prometheus collect lmq write read rt metrics 
(#47)
9d8cabc is described below

commit 9d8cabc757cd1459c6290009335f84f8fa00a548
Author: tianliuliu <[email protected]>
AuthorDate: Wed Apr 6 14:50:37 2022 +0800

    fix #46 support prometheus collect lmq write read rt metrics (#47)
---
 .../rocketmq/mqtt/ds/meta/WildcardManager.java     | 14 +++++++++++-
 .../mqtt/ds/store/LmqQueueStoreManager.java        | 25 ++++++++++++++++++----
 .../exporter/collector/MqttMetricsCollector.java   |  4 ++++
 .../mqtt/exporter/collector/MqttMetricsInfo.java   |  4 +++-
 4 files changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
index e40733e..2048ce1 100644
--- 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
+++ 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
@@ -24,6 +24,8 @@ import org.apache.rocketmq.mqtt.common.model.MqttTopic;
 import org.apache.rocketmq.mqtt.common.model.Trie;
 import org.apache.rocketmq.mqtt.common.util.StatUtil;
 import org.apache.rocketmq.mqtt.common.util.TopicUtils;
+import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
+import org.apache.rocketmq.mqtt.exporter.exception.PrometheusException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -115,7 +117,17 @@ public class WildcardManager {
             }
             return trie.getNodePath(topic);
         } finally {
-            StatUtil.addInvoke("MatchWildcards", System.currentTimeMillis() - 
start);
+            long rt = System.currentTimeMillis() - start;
+            StatUtil.addInvoke("MatchWildcards", rt);
+            collectMatchActionMetrics(rt);
+        }
+    }
+
+    private void collectMatchActionMetrics(long rt) {
+        try {
+            MqttMetricsCollector.collectLmqReadWriteMatchActionRt(rt, 
"MatchWildcards", "unknown");
+        } catch (PrometheusException e) {
+            logger.error("collect MatchWildcards metrics error", e);
         }
     }
 
diff --git 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index f4f1c63..1d7624a 100644
--- 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -56,6 +56,7 @@ import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
 import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
 import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
 import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
+import org.apache.rocketmq.mqtt.exporter.exception.PrometheusException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -190,14 +191,18 @@ public class LmqQueueStoreManager implements 
LmqQueueStore {
                         @Override
                         public void onSuccess(SendResult sendResult) {
                             result.complete(toStoreResult(sendResult));
-                            StatUtil.addInvoke("lmqWrite", 
System.currentTimeMillis() - start);
+                            long rt = System.currentTimeMillis() - start;
+                            StatUtil.addInvoke("lmqWrite", rt);
+                            collectLmqReadWriteMatchActionRt("lmqWrite", rt, 
true);
                         }
 
                         @Override
                         public void onException(Throwable e) {
                             logger.error("", e);
                             result.completeExceptionally(e);
-                            StatUtil.addInvoke("lmqWrite", 
System.currentTimeMillis() - start, false);
+                            long rt = System.currentTimeMillis() - start;
+                            StatUtil.addInvoke("lmqWrite", rt, false);
+                            collectLmqReadWriteMatchActionRt("lmqWrite", rt, 
false);
                         }
                     });
         } catch (Throwable e) {
@@ -206,6 +211,14 @@ public class LmqQueueStoreManager implements LmqQueueStore 
{
         return result;
     }
 
+    private void collectLmqReadWriteMatchActionRt(String action, long rt, 
boolean status) {
+        try {
+            MqttMetricsCollector.collectLmqReadWriteMatchActionRt(rt, action, 
String.valueOf(status));
+        } catch (PrometheusException e) {
+            logger.error("", e);
+        }
+    }
+
     @Override
     public CompletableFuture<PullResult> pullMessage(String firstTopic, Queue 
queue, QueueOffset queueOffset, long count) {
         CompletableFuture<PullResult> result = new CompletableFuture<>();
@@ -217,7 +230,9 @@ public class LmqQueueStoreManager implements LmqQueueStore {
                 @Override
                 public void 
onSuccess(org.apache.rocketmq.client.consumer.PullResult pullResult) {
                     result.complete(toLmqPullResult(queue, pullResult));
-                    StatUtil.addInvoke("lmqPull", System.currentTimeMillis() - 
start);
+                    long rt = System.currentTimeMillis() - start;
+                    StatUtil.addInvoke("lmqPull", rt);
+                    collectLmqReadWriteMatchActionRt("lmqPull", rt, true);
                     StatUtil.addPv(pullResult.getPullStatus().name(), 1);
                     try {
                         MqttMetricsCollector.collectPullStatusTps(1, 
pullResult.getPullStatus().name());
@@ -230,7 +245,9 @@ public class LmqQueueStoreManager implements LmqQueueStore {
                 public void onException(Throwable e) {
                     logger.error("", e);
                     result.completeExceptionally(e);
-                    StatUtil.addInvoke("lmqPull", System.currentTimeMillis() - 
start, false);
+                    long rt = System.currentTimeMillis() - start;
+                    StatUtil.addInvoke("lmqPull", rt, false);
+                    collectLmqReadWriteMatchActionRt("lmqPull", rt, false);
                 }
             });
         } catch (Throwable e) {
diff --git 
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
 
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
index 820f71b..60a0a1e 100644
--- 
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
+++ 
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
@@ -157,6 +157,10 @@ public class MqttMetricsCollector {
         collect(MqttMetricsInfo.PULL_CACHE_STATUS_TPS, val, labels);
     }
 
+    public static void collectLmqReadWriteMatchActionRt(long val, String... 
labels) throws PrometheusException {
+        collect(MqttMetricsInfo.READ_WRITE_MATCH_ACTION_RT, val, labels);
+    }
+
     private static String labels2String(String... labels) {
         StringBuilder sb = new StringBuilder(128);
         for (String label : labels) {
diff --git 
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
 
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
index f3b2e14..8471ce8 100644
--- 
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
+++ 
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
@@ -33,7 +33,9 @@ public enum MqttMetricsInfo {
     PULL_STATUS_TPS(Type.COUNTER, SubSystem.DS, "pull_status_tps_total", "ds 
pull msg status tps.", null,
         "hostName", "hostIp", "pullStatus"),
     PULL_CACHE_STATUS_TPS(Type.COUNTER, SubSystem.CS, 
"pull_cache_status_tps_total", "ds pull cache status tps.", null,
-        "hostName", "hostIp", "pullCacheStatus");
+        "hostName", "hostIp", "pullCacheStatus"),
+    READ_WRITE_MATCH_ACTION_RT(Type.GAUGE, SubSystem.DS, 
"read_write_match_action_rt", "lmq read write match action rt.", null,
+        "hostName", "hostIp", "action", "status");
 
 
     private final Type type;

Reply via email to