This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-auto-batch
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-auto-batch by this 
push:
     new 60c1823f1 Fix the problem of abnormal statistics under Bcq (#4567)
60c1823f1 is described below

commit 60c1823f188a3df1a8d604e2429994b5974dd110
Author: guyinyou <[email protected]>
AuthorDate: Thu Jul 7 16:04:16 2022 +0800

    Fix the problem of abnormal statistics under Bcq (#4567)
    
    * Fix autoBatch not compatible with batchConsumeQueue
    
    * Fix the problem of abnormal statistics under Bcq
---
 store/src/main/java/org/apache/rocketmq/store/CommitLog.java          | 2 +-
 .../src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java  | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index be013e9d4..7baa9c6e6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -907,7 +907,7 @@ public class CommitLog implements Swappable {
         PutMessageResult putMessageResult = new 
PutMessageResult(PutMessageStatus.PUT_OK, result);
 
         // Statistics
-        
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
+        
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());
         
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
 
         return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, 
needHandleHA);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 0da12a232..05b486d8d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -804,7 +804,7 @@ public class DefaultMessageStore implements MessageStore {
                                 continue;
                             }
 
-                            
this.storeStatsService.getGetMessageTransferedMsgCount().add(1);
+                            
this.storeStatsService.getGetMessageTransferedMsgCount().add(cqUnit.getBatchNum());
                             getResult.addMessage(selectResult, 
cqUnit.getQueueOffset(), cqUnit.getBatchNum());
                             status = GetMessageStatus.FOUND;
                             nextPhyFileStartOffset = Long.MIN_VALUE;
@@ -2457,7 +2457,7 @@ public class DefaultMessageStore implements MessageStore {
                                     if 
(!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
                                         
DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == 
BrokerRole.SLAVE) {
                                         
DefaultMessageStore.this.storeStatsService
-                                            
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                                            
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(dispatchRequest.getBatchSize());
                                         
DefaultMessageStore.this.storeStatsService
                                             
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                             .add(dispatchRequest.getMsgSize());

Reply via email to