This is an automated email from the ASF dual-hosted git repository.

fuyou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a9e1bcbe5 optimize metrics in pop processor (#5580)
a9e1bcbe5 is described below

commit a9e1bcbe5b2d7e78bc7598d7139b6876d8502e72
Author: SSpirits <[email protected]>
AuthorDate: Wed Nov 23 17:14:03 2022 +0800

    optimize metrics in pop processor (#5580)
---
 .../broker/processor/PeekMessageProcessor.java       | 20 ++++++++++----------
 .../broker/processor/PopMessageProcessor.java        |  2 +-
 2 files changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
index 020ee194d..12036666b 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
@@ -184,16 +184,6 @@ public class PeekMessageProcessor implements 
NettyRequestProcessor {
 
                 
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
 
-                if 
(!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
-                    Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
-                        .put(LABEL_TOPIC, requestHeader.getTopic())
-                        .put(LABEL_CONSUMER_GROUP, 
requestHeader.getConsumerGroup())
-                        .put(LABEL_IS_SYSTEM, 
TopicValidator.isSystemTopic(requestHeader.getTopic()) || 
MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
-                        .build();
-                    
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), 
attributes);
-                    
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
 attributes);
-                }
-
                 if 
(this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                     final long beginTimeMills = 
this.brokerController.getMessageStore().now();
                     final byte[] r = 
this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), 
requestHeader.getTopic(), requestHeader.getQueueId());
@@ -253,6 +243,16 @@ public class PeekMessageProcessor implements 
NettyRequestProcessor {
                 requestHeader.getMaxMsgNums() - 
getMessageResult.getMessageMapedList().size(), null);
         }
         if (getMessageTmpResult != null) {
+            if (!getMessageTmpResult.getMessageMapedList().isEmpty() && 
!isRetry) {
+                Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
+                    .put(LABEL_TOPIC, requestHeader.getTopic())
+                    .put(LABEL_CONSUMER_GROUP, 
requestHeader.getConsumerGroup())
+                    .put(LABEL_IS_SYSTEM, 
TopicValidator.isSystemTopic(requestHeader.getTopic()) || 
MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
+                    .build();
+                
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), 
attributes);
+                
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
 attributes);
+            }
+
             for (SelectMappedBufferResult mapedBuffer : 
getMessageTmpResult.getMessageMapedList()) {
                 getMessageResult.addMessage(mapedBuffer);
             }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 481fdcab7..25beddb6d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -548,7 +548,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                 
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(),
 topic,
                     getMessageTmpResult.getBufferTotalSize());
 
-                if 
(!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
+                if (!isRetry) {
                     Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
                         .put(LABEL_TOPIC, requestHeader.getTopic())
                         .put(LABEL_CONSUMER_GROUP, 
requestHeader.getConsumerGroup())

Reply via email to