This is an automated email from the ASF dual-hosted git repository.
fuyou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a9e1bcbe5 optimize metrics in pop processor (#5580)
a9e1bcbe5 is described below
commit a9e1bcbe5b2d7e78bc7598d7139b6876d8502e72
Author: SSpirits <[email protected]>
AuthorDate: Wed Nov 23 17:14:03 2022 +0800
optimize metrics in pop processor (#5580)
---
.../broker/processor/PeekMessageProcessor.java | 20 ++++++++++----------
.../broker/processor/PopMessageProcessor.java | 2 +-
2 files changed, 11 insertions(+), 11 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
index 020ee194d..12036666b 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
@@ -184,16 +184,6 @@ public class PeekMessageProcessor implements
NettyRequestProcessor {
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
- if
(!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
- .put(LABEL_TOPIC, requestHeader.getTopic())
- .put(LABEL_CONSUMER_GROUP,
requestHeader.getConsumerGroup())
- .put(LABEL_IS_SYSTEM,
TopicValidator.isSystemTopic(requestHeader.getTopic()) ||
MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
- .build();
-
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(),
attributes);
-
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
attributes);
- }
-
if
(this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills =
this.brokerController.getMessageStore().now();
final byte[] r =
this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId());
@@ -253,6 +243,16 @@ public class PeekMessageProcessor implements
NettyRequestProcessor {
requestHeader.getMaxMsgNums() -
getMessageResult.getMessageMapedList().size(), null);
}
if (getMessageTmpResult != null) {
+ if (!getMessageTmpResult.getMessageMapedList().isEmpty() &&
!isRetry) {
+ Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, requestHeader.getTopic())
+ .put(LABEL_CONSUMER_GROUP,
requestHeader.getConsumerGroup())
+ .put(LABEL_IS_SYSTEM,
TopicValidator.isSystemTopic(requestHeader.getTopic()) ||
MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
+ .build();
+
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(),
attributes);
+
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
attributes);
+ }
+
for (SelectMappedBufferResult mapedBuffer :
getMessageTmpResult.getMessageMapedList()) {
getMessageResult.addMessage(mapedBuffer);
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 481fdcab7..25beddb6d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -548,7 +548,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(),
topic,
getMessageTmpResult.getBufferTotalSize());
- if
(!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
+ if (!isRetry) {
Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, requestHeader.getTopic())
.put(LABEL_CONSUMER_GROUP,
requestHeader.getConsumerGroup())