This is an automated email from the ASF dual-hosted git repository.
oliverwqcwrw pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 59f056d75 [ISSUE #5942] Fix the produce count include the quantity of
the system topic(#5943)
59f056d75 is described below
commit 59f056d75a317fe1f4cc94390f95bf26b87102a2
Author: Oliver <[email protected]>
AuthorDate: Tue Feb 7 09:51:00 2023 +0800
[ISSUE #5942] Fix the produce count include the quantity of the system
topic(#5943)
---
.../broker/processor/PopReviveService.java | 2 +-
.../broker/processor/ReplyMessageProcessor.java | 2 +-
.../broker/processor/SendMessageProcessor.java | 2 +-
.../broker/schedule/ScheduleMessageService.java | 2 +-
.../rocketmq/example/quickstart/Producer.java | 2 +-
.../apache/rocketmq/store/stats/BrokerStats.java | 4 ++--
.../rocketmq/store/stats/BrokerStatsManager.java | 27 ++++++++++++++++++++--
.../rocketmq/store/timer/TimerMessageStore.java | 2 +-
.../store/stats/BrokerStatsManagerTest.java | 13 +++++++++++
9 files changed, 46 insertions(+), 10 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index d0e9dbc36..95aa52091 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -131,7 +131,7 @@ public class PopReviveService extends ServiceThread {
return false;
}
this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(popCheckPoint);
- this.brokerController.getBrokerStatsManager().incBrokerPutNums(1);
+
this.brokerController.getBrokerStatsManager().incBrokerPutNums(popCheckPoint.getTopic(),
1);
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
if (brokerController.getPopMessageProcessor() != null) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index dbc87a870..b2db356c8 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -295,7 +295,7 @@ public class ReplyMessageProcessor extends
AbstractSendMessageProcessor {
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(),
putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
-
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
+
this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(),
putMessageResult.getAppendMessageResult().getMsgNum());
if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 45517e1bb..6faa7525b 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -429,7 +429,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(),
putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
-
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
+
this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(),
putMessageResult.getAppendMessageResult().getMsgNum());
this.brokerController.getBrokerStatsManager().incTopicPutLatency(msg.getTopic(),
queueIdInt,
(int) (this.brokerController.getMessageStore().now() -
beginTimeMillis));
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 372fb83ea..26e757ab4 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -758,7 +758,7 @@ public class ScheduleMessageService extends ConfigManager {
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutNums(this.topic,
result.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutSize(this.topic,
result.getAppendMessageResult().getWroteBytes());
-
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
+
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(this.topic,
result.getAppendMessageResult().getMsgNum());
attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, topic)
diff --git
a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
index b78c85468..2c67e463e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
@@ -54,7 +54,7 @@ public class Producer {
* </pre>
*/
// Uncomment the following line while debugging, namesrvAddr should be
set to your local address
-// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
+ producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
/*
* Launch the instance.
diff --git
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
index d864dd50a..fb717550f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
@@ -43,7 +43,7 @@ public class BrokerStats {
this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning;
this.msgPutTotalTodayMorning =
-
this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
+
this.defaultMessageStore.getBrokerStatsManager().getBrokerPutNumsWithoutSystemTopic();
this.msgGetTotalTodayMorning =
this.defaultMessageStore.getBrokerStatsManager().getBrokerGetNumsWithoutSystemTopic();
@@ -84,7 +84,7 @@ public class BrokerStats {
}
public long getMsgPutTotalTodayNow() {
- return
this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
+ return
this.defaultMessageStore.getBrokerStatsManager().getBrokerPutNumsWithoutSystemTopic();
}
public long getMsgGetTotalTodayNow() {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 132ddc333..2dd3fc5b5 100644
---
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -76,6 +76,7 @@ public class BrokerStatsManager {
public static final String BROKER_ACK_NUMS = "BROKER_ACK_NUMS";
public static final String BROKER_CK_NUMS = "BROKER_CK_NUMS";
public static final String BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC =
"BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC";
+ public static final String BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC =
"BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC";
public static final String SNDBCK2DLQ_TIMES = "SNDBCK2DLQ_TIMES";
public static final String COMMERCIAL_OWNER = "Owner";
@@ -190,6 +191,8 @@ public class BrokerStatsManager {
this.statsTable.put(BROKER_CK_NUMS, new StatsItemSet(BROKER_CK_NUMS,
this.scheduledExecutorService, log));
this.statsTable.put(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC,
new StatsItemSet(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC,
this.scheduledExecutorService, log));
+ this.statsTable.put(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC,
+ new StatsItemSet(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC,
this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_FROM_DISK_NUMS,
new StatsItemSet(Stats.GROUP_GET_FROM_DISK_NUMS,
this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_FROM_DISK_SIZE,
@@ -513,11 +516,12 @@ public class BrokerStatsManager {
this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1);
}
- public void incBrokerPutNums(final int incValue) {
+ public void incBrokerPutNums(final String topic, final int incValue) {
this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
+ incBrokerPutNumsWithoutSystemTopic(topic, incValue);
}
- public void incBrokerGetNums(String topic, final int incValue) {
+ public void incBrokerGetNums(final String topic, final int incValue) {
this.statsTable.get(Stats.BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
this.incBrokerGetNumsWithoutSystemTopic(topic, incValue);
}
@@ -537,6 +541,13 @@ public class BrokerStatsManager {
this.statsTable.get(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
}
+ public void incBrokerPutNumsWithoutSystemTopic(final String topic, final
int incValue) {
+ if (TopicValidator.isSystemTopic(topic)) {
+ return;
+ }
+
this.statsTable.get(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
+ }
+
public long getBrokerGetNumsWithoutSystemTopic() {
final StatsItemSet statsItemSet =
this.statsTable.get(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC);
if (statsItemSet == null) {
@@ -549,6 +560,18 @@ public class BrokerStatsManager {
return statsItem.getValue().longValue();
}
+ public long getBrokerPutNumsWithoutSystemTopic() {
+ final StatsItemSet statsItemSet =
this.statsTable.get(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC);
+ if (statsItemSet == null) {
+ return 0;
+ }
+ final StatsItem statsItem =
statsItemSet.getStatsItem(this.clusterName);
+ if (statsItem == null) {
+ return 0;
+ }
+ return statsItem.getValue().longValue();
+ }
+
public void incSendBackNums(final String group, final String topic) {
final String statsKey = buildStatsKey(topic, group);
this.statsTable.get(Stats.SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 89b93abd0..c4f7e6c77 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -1050,7 +1050,7 @@ public class TimerMessageStore {
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
- this.brokerStatsManager.incBrokerPutNums(1);
+
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
}
return PUT_OK;
case SERVICE_NOT_AVAILABLE:
diff --git
a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
index c32db16dd..a602da093 100644
---
a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
@@ -199,4 +199,17 @@ public class BrokerStatsManagerTest {
.getValue().doubleValue()).isEqualTo(1L);
assertThat(brokerStatsManager.getBrokerGetNumsWithoutSystemTopic()).isEqualTo(1L);
}
+
+ @Test
+ public void testIncBrokerPutNumsWithoutSystemTopic() {
+ brokerStatsManager.incBrokerPutNumsWithoutSystemTopic(TOPIC, 1);
+
assertThat(brokerStatsManager.getStatsItem(BrokerStatsManager.BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC,
CLUSTER_NAME)
+ .getValue().doubleValue()).isEqualTo(1L);
+
assertThat(brokerStatsManager.getBrokerPutNumsWithoutSystemTopic()).isEqualTo(1L);
+
+
brokerStatsManager.incBrokerPutNumsWithoutSystemTopic(TopicValidator.RMQ_SYS_TRACE_TOPIC,
1);
+
assertThat(brokerStatsManager.getStatsItem(BrokerStatsManager.BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC,
CLUSTER_NAME)
+ .getValue().doubleValue()).isEqualTo(1L);
+
assertThat(brokerStatsManager.getBrokerPutNumsWithoutSystemTopic()).isEqualTo(1L);
+ }
}