This is an automated email from the ASF dual-hosted git repository.

oliverwqcwrw pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 59f056d75 [ISSUE #5942] Fix the produce count include the quantity of 
the system topic(#5943)
59f056d75 is described below

commit 59f056d75a317fe1f4cc94390f95bf26b87102a2
Author: Oliver <[email protected]>
AuthorDate: Tue Feb 7 09:51:00 2023 +0800

    [ISSUE #5942] Fix the produce count include the quantity of the system 
topic(#5943)
---
 .../broker/processor/PopReviveService.java         |  2 +-
 .../broker/processor/ReplyMessageProcessor.java    |  2 +-
 .../broker/processor/SendMessageProcessor.java     |  2 +-
 .../broker/schedule/ScheduleMessageService.java    |  2 +-
 .../rocketmq/example/quickstart/Producer.java      |  2 +-
 .../apache/rocketmq/store/stats/BrokerStats.java   |  4 ++--
 .../rocketmq/store/stats/BrokerStatsManager.java   | 27 ++++++++++++++++++++--
 .../rocketmq/store/timer/TimerMessageStore.java    |  2 +-
 .../store/stats/BrokerStatsManagerTest.java        | 13 +++++++++++
 9 files changed, 46 insertions(+), 10 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index d0e9dbc36..95aa52091 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -131,7 +131,7 @@ public class PopReviveService extends ServiceThread {
             return false;
         }
         
this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(popCheckPoint);
-        this.brokerController.getBrokerStatsManager().incBrokerPutNums(1);
+        
this.brokerController.getBrokerStatsManager().incBrokerPutNums(popCheckPoint.getTopic(),
 1);
         
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
         
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
 putMessageResult.getAppendMessageResult().getWroteBytes());
         if (brokerController.getPopMessageProcessor() != null) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index dbc87a870..b2db356c8 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -295,7 +295,7 @@ public class ReplyMessageProcessor extends 
AbstractSendMessageProcessor {
             
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), 
putMessageResult.getAppendMessageResult().getMsgNum(), 1);
             
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
                 putMessageResult.getAppendMessageResult().getWroteBytes());
-            
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
+            
this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(), 
putMessageResult.getAppendMessageResult().getMsgNum());
 
             if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
                 Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 45517e1bb..6faa7525b 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -429,7 +429,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
             
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), 
putMessageResult.getAppendMessageResult().getMsgNum(), 1);
             
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
                 putMessageResult.getAppendMessageResult().getWroteBytes());
-            
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
+            
this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(), 
putMessageResult.getAppendMessageResult().getMsgNum());
             
this.brokerController.getBrokerStatsManager().incTopicPutLatency(msg.getTopic(),
 queueIdInt,
                 (int) (this.brokerController.getMessageStore().now() - 
beginTimeMillis));
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 372fb83ea..26e757ab4 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -758,7 +758,7 @@ public class ScheduleMessageService extends ConfigManager {
 
                 
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutNums(this.topic,
 result.getAppendMessageResult().getMsgNum(), 1);
                 
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutSize(this.topic,
 result.getAppendMessageResult().getWroteBytes());
-                
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
+                
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(this.topic,
 result.getAppendMessageResult().getMsgNum());
 
                 attributes = BrokerMetricsManager.newAttributesBuilder()
                     .put(LABEL_TOPIC, topic)
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java 
b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
index b78c85468..2c67e463e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
@@ -54,7 +54,7 @@ public class Producer {
          * </pre>
          */
         // Uncomment the following line while debugging, namesrvAddr should be 
set to your local address
-//        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
+        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
 
         /*
          * Launch the instance.
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
index d864dd50a..fb717550f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
@@ -43,7 +43,7 @@ public class BrokerStats {
         this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning;
 
         this.msgPutTotalTodayMorning =
-            
this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
+            
this.defaultMessageStore.getBrokerStatsManager().getBrokerPutNumsWithoutSystemTopic();
         this.msgGetTotalTodayMorning =
             
this.defaultMessageStore.getBrokerStatsManager().getBrokerGetNumsWithoutSystemTopic();
 
@@ -84,7 +84,7 @@ public class BrokerStats {
     }
 
     public long getMsgPutTotalTodayNow() {
-        return 
this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
+        return 
this.defaultMessageStore.getBrokerStatsManager().getBrokerPutNumsWithoutSystemTopic();
     }
 
     public long getMsgGetTotalTodayNow() {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 132ddc333..2dd3fc5b5 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -76,6 +76,7 @@ public class BrokerStatsManager {
     public static final String BROKER_ACK_NUMS = "BROKER_ACK_NUMS";
     public static final String BROKER_CK_NUMS = "BROKER_CK_NUMS";
     public static final String BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC = 
"BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC";
+    public static final String BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC = 
"BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC";
     public static final String SNDBCK2DLQ_TIMES = "SNDBCK2DLQ_TIMES";
 
     public static final String COMMERCIAL_OWNER = "Owner";
@@ -190,6 +191,8 @@ public class BrokerStatsManager {
         this.statsTable.put(BROKER_CK_NUMS, new StatsItemSet(BROKER_CK_NUMS, 
this.scheduledExecutorService, log));
         this.statsTable.put(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC,
             new StatsItemSet(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC, 
this.scheduledExecutorService, log));
+        this.statsTable.put(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC,
+            new StatsItemSet(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC, 
this.scheduledExecutorService, log));
         this.statsTable.put(Stats.GROUP_GET_FROM_DISK_NUMS,
             new StatsItemSet(Stats.GROUP_GET_FROM_DISK_NUMS, 
this.scheduledExecutorService, log));
         this.statsTable.put(Stats.GROUP_GET_FROM_DISK_SIZE,
@@ -513,11 +516,12 @@ public class BrokerStatsManager {
         
this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1);
     }
 
-    public void incBrokerPutNums(final int incValue) {
+    public void incBrokerPutNums(final String topic, final int incValue) {
         
this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
+        incBrokerPutNumsWithoutSystemTopic(topic, incValue);
     }
 
-    public void incBrokerGetNums(String topic, final int incValue) {
+    public void incBrokerGetNums(final String topic, final int incValue) {
         
this.statsTable.get(Stats.BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
         this.incBrokerGetNumsWithoutSystemTopic(topic, incValue);
     }
@@ -537,6 +541,13 @@ public class BrokerStatsManager {
         
this.statsTable.get(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
     }
 
+    public void incBrokerPutNumsWithoutSystemTopic(final String topic, final 
int incValue) {
+        if (TopicValidator.isSystemTopic(topic)) {
+            return;
+        }
+        
this.statsTable.get(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
+    }
+
     public long getBrokerGetNumsWithoutSystemTopic() {
         final StatsItemSet statsItemSet = 
this.statsTable.get(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC);
         if (statsItemSet == null) {
@@ -549,6 +560,18 @@ public class BrokerStatsManager {
         return statsItem.getValue().longValue();
     }
 
+    public long getBrokerPutNumsWithoutSystemTopic() {
+        final StatsItemSet statsItemSet = 
this.statsTable.get(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC);
+        if (statsItemSet == null) {
+            return 0;
+        }
+        final StatsItem statsItem = 
statsItemSet.getStatsItem(this.clusterName);
+        if (statsItem == null) {
+            return 0;
+        }
+        return statsItem.getValue().longValue();
+    }
+
     public void incSendBackNums(final String group, final String topic) {
         final String statsKey = buildStatsKey(topic, group);
         this.statsTable.get(Stats.SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 89b93abd0..c4f7e6c77 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -1050,7 +1050,7 @@ public class TimerMessageStore {
                             
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
                             
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
                                 
putMessageResult.getAppendMessageResult().getWroteBytes());
-                            this.brokerStatsManager.incBrokerPutNums(1);
+                            
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
                         }
                         return PUT_OK;
                     case SERVICE_NOT_AVAILABLE:
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
index c32db16dd..a602da093 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
@@ -199,4 +199,17 @@ public class BrokerStatsManagerTest {
             .getValue().doubleValue()).isEqualTo(1L);
         
assertThat(brokerStatsManager.getBrokerGetNumsWithoutSystemTopic()).isEqualTo(1L);
     }
+
+    @Test
+    public void testIncBrokerPutNumsWithoutSystemTopic() {
+        brokerStatsManager.incBrokerPutNumsWithoutSystemTopic(TOPIC, 1);
+        
assertThat(brokerStatsManager.getStatsItem(BrokerStatsManager.BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC,
 CLUSTER_NAME)
+            .getValue().doubleValue()).isEqualTo(1L);
+        
assertThat(brokerStatsManager.getBrokerPutNumsWithoutSystemTopic()).isEqualTo(1L);
+
+        
brokerStatsManager.incBrokerPutNumsWithoutSystemTopic(TopicValidator.RMQ_SYS_TRACE_TOPIC,
 1);
+        
assertThat(brokerStatsManager.getStatsItem(BrokerStatsManager.BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC,
 CLUSTER_NAME)
+            .getValue().doubleValue()).isEqualTo(1L);
+        
assertThat(brokerStatsManager.getBrokerPutNumsWithoutSystemTopic()).isEqualTo(1L);
+    }
 }

Reply via email to