This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/develop by this push:
new 95fbf79 feat: add metrics for topic dimension read and write msg size
(#271)
95fbf79 is described below
commit 95fbf7936a53444a135ac8ecc45742bde62e86c6
Author: yx9o <[email protected]>
AuthorDate: Fri May 10 20:58:57 2024 +0800
feat: add metrics for topic dimension read and write msg size (#271)
---
.../rocketmq/mqtt/cs/session/loop/QueueCache.java | 35 +++++++++++++++-------
.../mqtt/ds/store/LmqQueueStoreManager.java | 14 +++++++++
.../ds/upstream/processor/PublishProcessor.java | 11 ++++++-
.../exporter/collector/MqttMetricsCollector.java | 4 +++
.../mqtt/exporter/collector/MqttMetricsInfo.java | 4 ++-
5 files changed, 56 insertions(+), 12 deletions(-)
diff --git
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
index 771d09f..52f91b8 100644
---
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
+++
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.mqtt.common.util.StatUtil;
import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
import org.apache.rocketmq.mqtt.cs.session.Session;
import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
+import org.apache.rocketmq.mqtt.exporter.exception.PrometheusException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -52,6 +53,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import static org.apache.rocketmq.mqtt.cs.session.loop.PullResultStatus.DONE;
import static org.apache.rocketmq.mqtt.cs.session.loop.PullResultStatus.LATER;
@@ -183,7 +185,7 @@ public class QueueCache {
CompletableFuture<PullResult>
callBackResult) {
if (subscription.isP2p() || subscription.isRetry()) {
StatUtil.addPv("NotPullCache", 1);
- collectorPullCacheStatus("NotPullCache");
+ collectorPullCacheStatus("NotPullCache", null);
CompletableFuture<PullResult> pullResult =
lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset,
count);
callbackResult(pullResult, callBackResult);
return DONE;
@@ -198,7 +200,7 @@ public class QueueCache {
CacheEntry cacheEntry = cache.getIfPresent(queue);
if (cacheEntry == null) {
StatUtil.addPv("NoPullCache", 1);
- collectorPullCacheStatus("NotPullCache");
+ collectorPullCacheStatus("NotPullCache", null);
CompletableFuture<PullResult> pullResult =
lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset,
count);
callbackResult(pullResult, callBackResult);
return DONE;
@@ -206,7 +208,7 @@ public class QueueCache {
if (cacheEntry.loading.get()) {
if (System.currentTimeMillis() - cacheEntry.startLoadingT > 1000) {
StatUtil.addPv("LoadPullCacheTimeout", 1);
- collectorPullCacheStatus("LoadPullCacheTimeout");
+ collectorPullCacheStatus("LoadPullCacheTimeout", null);
CompletableFuture<PullResult> pullResult =
lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset,
count);
callbackResult(pullResult, callBackResult);
return DONE;
@@ -217,12 +219,12 @@ public class QueueCache {
List<Message> cacheMsgList = cacheEntry.messageList;
if (cacheMsgList.isEmpty()) {
if (loadEvent.get(queue) != null) {
- collectorPullCacheStatus("EmptyPullCacheLATER");
+ collectorPullCacheStatus("EmptyPullCacheLATER", cacheMsgList);
StatUtil.addPv("EmptyPullCacheLATER", 1);
return LATER;
}
StatUtil.addPv("EmptyPullCache", 1);
- collectorPullCacheStatus("EmptyPullCache");
+ collectorPullCacheStatus("EmptyPullCache", cacheMsgList);
CompletableFuture<PullResult> pullResult =
lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset,
count);
callbackResult(pullResult, callBackResult);
return DONE;
@@ -230,7 +232,7 @@ public class QueueCache {
if (queueOffset.getOffset() < cacheMsgList.get(0).getOffset()) {
StatUtil.addPv("OutPullCache", 1);
- collectorPullCacheStatus("OutPullCache");
+ collectorPullCacheStatus("OutPullCache", cacheMsgList);
CompletableFuture<PullResult> pullResult =
lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset,
count);
callbackResult(pullResult, callBackResult);
return DONE;
@@ -249,11 +251,11 @@ public class QueueCache {
if (resultMsgs.isEmpty()) {
if (loadEvent.get(queue) != null) {
StatUtil.addPv("PullCacheLATER", 1);
- collectorPullCacheStatus("PullCacheLATER");
+ collectorPullCacheStatus("PullCacheLATER", resultMsgs);
return LATER;
}
StatUtil.addPv("OutPullCache2", 1);
- collectorPullCacheStatus("OutPullCache2");
+ collectorPullCacheStatus("OutPullCache2", resultMsgs);
CompletableFuture<PullResult> pullResult =
lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset,
count);
callbackResult(pullResult, callBackResult);
return DONE;
@@ -262,21 +264,34 @@ public class QueueCache {
pullResult.setMessageList(resultMsgs);
callBackResult.complete(pullResult);
StatUtil.addPv("PullFromCache", 1);
- collectorPullCacheStatus("PullFromCache");
+ collectorPullCacheStatus("PullFromCache", resultMsgs);
if (loadEvent.get(queue) != null) {
return LATER;
}
return DONE;
}
- private void collectorPullCacheStatus(String pullCacheStatus) {
+ private void collectorPullCacheStatus(String pullCacheStatus,
List<Message> resultMsgs) {
try {
MqttMetricsCollector.collectPullCacheStatusTps(1, pullCacheStatus);
+ collectReadBytes(resultMsgs);
} catch (Throwable e) {
logger.error("", e);
}
}
+ private void collectReadBytes(List<Message> msgFoundList) throws
PrometheusException {
+ if (null == msgFoundList || msgFoundList.isEmpty()) {
+ return;
+ }
+ Map<String, Integer> maps = msgFoundList.stream()
+ .collect(Collectors.groupingBy(Message::getFirstTopic,
+ Collectors.summingInt(msg ->
msg.getPayload().length)));
+ for (Map.Entry<String, Integer> entry : maps.entrySet()) {
+
MqttMetricsCollector.collectReadWriteMatchActionBytes(entry.getValue(),
entry.getKey(), "pullCache");
+ }
+ }
+
private void loadCache(boolean isFirst, String firstTopic, Queue queue,
QueueOffset queueOffset, int count,
QueueEvent event) {
loadStatus.put(queue, true);
diff --git
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index 49fe87e..04b9415 100644
---
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -255,6 +255,7 @@ public class LmqQueueStoreManager implements LmqQueueStore {
StatUtil.addPv(pullResult.getPullStatus().name(), 1);
try {
MqttMetricsCollector.collectPullStatusTps(1,
pullResult.getPullStatus().name());
+ collectReadBytes(pullResult.getMsgFoundList());
} catch (Throwable e) {
logger.error("collect prometheus error", e);
}
@@ -506,6 +507,7 @@ public class LmqQueueStoreManager implements LmqQueueStore {
StatUtil.addPv(popResult.getPopStatus().name(), 1);
try {
MqttMetricsCollector.collectPullStatusTps(1,
popResult.getPopStatus().name());
+ collectReadBytes(popResult.getMsgFoundList());
} catch (Throwable e) {
logger.error("collect prometheus error", e);
}
@@ -636,4 +638,16 @@ public class LmqQueueStoreManager implements LmqQueueStore
{
mQClientFactory.getMQClientAPIImpl().ackMessageAsync(findBrokerResult.getBrokerAddr(),
timeoutMillis, ackCallback, ackMessageRequestHeader);
}
+
+ private void collectReadBytes(List<MessageExt> msgFoundList) throws
PrometheusException {
+ if (null == msgFoundList || msgFoundList.isEmpty()) {
+ return;
+ }
+ Map<String, Integer> maps = msgFoundList.stream()
+ .collect(Collectors.groupingBy(MessageExt::getTopic,
+ Collectors.summingInt(msg -> msg.getBody().length)));
+ for (Map.Entry<String, Integer> entry : maps.entrySet()) {
+
MqttMetricsCollector.collectReadWriteMatchActionBytes(entry.getValue(),
entry.getKey(), "pull");
+ }
+ }
}
diff --git
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
index 8de2726..ed00576 100644
---
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
+++
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
@@ -36,6 +36,7 @@ import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
import org.apache.rocketmq.mqtt.ds.meta.WildcardManager;
import org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor;
+import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -105,7 +106,7 @@ public class PublishProcessor implements UpstreamProcessor,
WillMsgSender {
message.setMsgId(msgId);
message.setBornTimestamp(bornTime);
message.setEmpty(isEmpty);
-
+ collectWriteBytes(message.getFirstTopic(),
message.getPayload().length);
return lmqQueueStore.putMessage(queueNames, message);
}
@@ -116,4 +117,12 @@ public class PublishProcessor implements
UpstreamProcessor, WillMsgSender {
ctx.setClientId(clientId);
return put(ctx, message);
}
+
+ private void collectWriteBytes(String topic, int length) {
+ try {
+ MqttMetricsCollector.collectReadWriteMatchActionBytes(length,
topic, "put");
+ } catch (Throwable e) {
+ logger.error("Collect prometheus error", e);
+ }
+ }
}
diff --git
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
index d28e5b5..2580dd4 100644
---
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
+++
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
@@ -169,6 +169,10 @@ public class MqttMetricsCollector {
collect(MqttMetricsInfo.CONNECTIONS_SIZE, val, labels);
}
+ public static void collectReadWriteMatchActionBytes(long val, String...
labels) throws PrometheusException {
+ collect(MqttMetricsInfo.READ_WRITE_MATCH_ACTION_BYTES, val, labels);
+ }
+
private static String labels2String(String... labels) {
StringBuilder sb = new StringBuilder(128);
for (String label : labels) {
diff --git
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
index a395578..743f9d9 100644
---
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
+++
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
@@ -37,7 +37,9 @@ public enum MqttMetricsInfo {
READ_WRITE_MATCH_ACTION_RT(Type.GAUGE, SubSystem.DS,
"read_write_match_action_rt", "lmq read write match action rt.", null,
"hostName", "hostIp", "action", "status"),
CONNECTIONS_SIZE(Type.GAUGE, SubSystem.CS, "connections_size", "server
connections size.", null,
- "hostName", "hostIp");
+ "hostName", "hostIp"),
+ READ_WRITE_MATCH_ACTION_BYTES(Type.COUNTER, SubSystem.DS,
"read_write_match_action_bytes", "lmq read write match action bytes.", null,
+ "hostName", "hostIp", "topic", "action");
private final Type type;