This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/main by this push:
new 9d8cabc fix #46 support prometheus collect lmq write read rt metrics
(#47)
9d8cabc is described below
commit 9d8cabc757cd1459c6290009335f84f8fa00a548
Author: tianliuliu <[email protected]>
AuthorDate: Wed Apr 6 14:50:37 2022 +0800
fix #46 support prometheus collect lmq write read rt metrics (#47)
---
.../rocketmq/mqtt/ds/meta/WildcardManager.java | 14 +++++++++++-
.../mqtt/ds/store/LmqQueueStoreManager.java | 25 ++++++++++++++++++----
.../exporter/collector/MqttMetricsCollector.java | 4 ++++
.../mqtt/exporter/collector/MqttMetricsInfo.java | 4 +++-
4 files changed, 41 insertions(+), 6 deletions(-)
diff --git
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
index e40733e..2048ce1 100644
---
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
+++
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
@@ -24,6 +24,8 @@ import org.apache.rocketmq.mqtt.common.model.MqttTopic;
import org.apache.rocketmq.mqtt.common.model.Trie;
import org.apache.rocketmq.mqtt.common.util.StatUtil;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
+import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
+import org.apache.rocketmq.mqtt.exporter.exception.PrometheusException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -115,7 +117,17 @@ public class WildcardManager {
}
return trie.getNodePath(topic);
} finally {
- StatUtil.addInvoke("MatchWildcards", System.currentTimeMillis() -
start);
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("MatchWildcards", rt);
+ collectMatchActionMetrics(rt);
+ }
+ }
+
+ private void collectMatchActionMetrics(long rt) {
+ try {
+ MqttMetricsCollector.collectLmqReadWriteMatchActionRt(rt,
"MatchWildcards", "unknown");
+ } catch (PrometheusException e) {
+ logger.error("collect MatchWildcards metrics error", e);
}
}
diff --git
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index f4f1c63..1d7624a 100644
---
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -56,6 +56,7 @@ import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
+import org.apache.rocketmq.mqtt.exporter.exception.PrometheusException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -190,14 +191,18 @@ public class LmqQueueStoreManager implements
LmqQueueStore {
@Override
public void onSuccess(SendResult sendResult) {
result.complete(toStoreResult(sendResult));
- StatUtil.addInvoke("lmqWrite",
System.currentTimeMillis() - start);
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("lmqWrite", rt);
+ collectLmqReadWriteMatchActionRt("lmqWrite", rt,
true);
}
@Override
public void onException(Throwable e) {
logger.error("", e);
result.completeExceptionally(e);
- StatUtil.addInvoke("lmqWrite",
System.currentTimeMillis() - start, false);
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("lmqWrite", rt, false);
+ collectLmqReadWriteMatchActionRt("lmqWrite", rt,
false);
}
});
} catch (Throwable e) {
@@ -206,6 +211,14 @@ public class LmqQueueStoreManager implements LmqQueueStore
{
return result;
}
+ private void collectLmqReadWriteMatchActionRt(String action, long rt,
boolean status) {
+ try {
+ MqttMetricsCollector.collectLmqReadWriteMatchActionRt(rt, action,
String.valueOf(status));
+ } catch (PrometheusException e) {
+ logger.error("", e);
+ }
+ }
+
@Override
public CompletableFuture<PullResult> pullMessage(String firstTopic, Queue
queue, QueueOffset queueOffset, long count) {
CompletableFuture<PullResult> result = new CompletableFuture<>();
@@ -217,7 +230,9 @@ public class LmqQueueStoreManager implements LmqQueueStore {
@Override
public void
onSuccess(org.apache.rocketmq.client.consumer.PullResult pullResult) {
result.complete(toLmqPullResult(queue, pullResult));
- StatUtil.addInvoke("lmqPull", System.currentTimeMillis() -
start);
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("lmqPull", rt);
+ collectLmqReadWriteMatchActionRt("lmqPull", rt, true);
StatUtil.addPv(pullResult.getPullStatus().name(), 1);
try {
MqttMetricsCollector.collectPullStatusTps(1,
pullResult.getPullStatus().name());
@@ -230,7 +245,9 @@ public class LmqQueueStoreManager implements LmqQueueStore {
public void onException(Throwable e) {
logger.error("", e);
result.completeExceptionally(e);
- StatUtil.addInvoke("lmqPull", System.currentTimeMillis() -
start, false);
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("lmqPull", rt, false);
+ collectLmqReadWriteMatchActionRt("lmqPull", rt, false);
}
});
} catch (Throwable e) {
diff --git
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
index 820f71b..60a0a1e 100644
---
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
+++
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
@@ -157,6 +157,10 @@ public class MqttMetricsCollector {
collect(MqttMetricsInfo.PULL_CACHE_STATUS_TPS, val, labels);
}
+ public static void collectLmqReadWriteMatchActionRt(long val, String...
labels) throws PrometheusException {
+ collect(MqttMetricsInfo.READ_WRITE_MATCH_ACTION_RT, val, labels);
+ }
+
private static String labels2String(String... labels) {
StringBuilder sb = new StringBuilder(128);
for (String label : labels) {
diff --git
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
index f3b2e14..8471ce8 100644
---
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
+++
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
@@ -33,7 +33,9 @@ public enum MqttMetricsInfo {
PULL_STATUS_TPS(Type.COUNTER, SubSystem.DS, "pull_status_tps_total", "ds
pull msg status tps.", null,
"hostName", "hostIp", "pullStatus"),
PULL_CACHE_STATUS_TPS(Type.COUNTER, SubSystem.CS,
"pull_cache_status_tps_total", "ds pull cache status tps.", null,
- "hostName", "hostIp", "pullCacheStatus");
+ "hostName", "hostIp", "pullCacheStatus"),
+ READ_WRITE_MATCH_ACTION_RT(Type.GAUGE, SubSystem.DS,
"read_write_match_action_rt", "lmq read write match action rt.", null,
+ "hostName", "hostIp", "action", "status");
private final Type type;