This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 01761bdc7a fix transaction msg stats (#7766)
01761bdc7a is described below

commit 01761bdc7a42bcb4e6716d2e2666549b4a9e0a6c
Author: kaikoo <[email protected]>
AuthorDate: Thu Feb 29 08:21:03 2024 +0800

    fix transaction msg stats (#7766)
---
 .../broker/processor/EndTransactionProcessor.java  |  3 +++
 .../processor/EndTransactionProcessorTest.java     | 22 ++++++++++++++++++++--
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index e812a53ba7..468a8791d4 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -279,6 +279,9 @@ public class EndTransactionProcessor implements 
NettyRequestProcessor {
             switch (putMessageResult.getPutMessageStatus()) {
                 // Success
                 case PUT_OK:
+                    
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(),
 putMessageResult.getAppendMessageResult().getMsgNum(), 1);
+                    
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
 putMessageResult.getAppendMessageResult().getWroteBytes());
+                    
this.brokerController.getBrokerStatsManager().incBrokerPutNums(msgInner.getTopic(),
 putMessageResult.getAppendMessageResult().getMsgNum());
                 case FLUSH_DISK_TIMEOUT:
                 case FLUSH_SLAVE_TIMEOUT:
                 case SLAVE_NOT_AVAILABLE:
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
index a364a1bbee..226d40ff02 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.stats.Stats;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
@@ -56,6 +57,8 @@ import static org.mockito.Mockito.when;
 @RunWith(MockitoJUnitRunner.class)
 public class EndTransactionProcessorTest {
 
+    private static final String TOPIC = "trans_topic_test";
+
     private EndTransactionProcessor endTransactionProcessor;
 
     @Mock
@@ -95,20 +98,26 @@ public class EndTransactionProcessorTest {
     public void testProcessRequest() throws RemotingCommandException {
         
when(transactionMsgService.commitMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
         when(messageStore.putMessage(any(MessageExtBrokerInner.class)))
-                .thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+                .thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, 
createAppendMessageResult(AppendMessageStatus.PUT_OK)));
         RemotingCommand request = 
createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, false);
         RemotingCommand response = 
endTransactionProcessor.processRequest(handlerContext, request);
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.BROKER_PUT_NUMS,
 
brokerController.getBrokerConfig().getBrokerClusterName()).getValue().sum()).isEqualTo(1);
+        
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_NUMS,
 TOPIC).getValue().sum()).isEqualTo(1L);
+        
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_SIZE,
 TOPIC).getValue().sum()).isEqualTo(1L);
     }
 
     @Test
     public void testProcessRequest_CheckMessage() throws 
RemotingCommandException {
         
when(transactionMsgService.commitMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
         when(messageStore.putMessage(any(MessageExtBrokerInner.class)))
-                .thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+                .thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, 
createAppendMessageResult(AppendMessageStatus.PUT_OK)));
         RemotingCommand request = 
createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, true);
         RemotingCommand response = 
endTransactionProcessor.processRequest(handlerContext, request);
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.BROKER_PUT_NUMS,
 
brokerController.getBrokerConfig().getBrokerClusterName()).getValue().sum()).isEqualTo(1);
+        
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_NUMS,
 TOPIC).getValue().sum()).isEqualTo(1L);
+        
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_SIZE,
 TOPIC).getValue().sum()).isEqualTo(1L);
     }
 
     @Test
@@ -148,6 +157,7 @@ public class EndTransactionProcessorTest {
         messageExt.setQueueId(0);
         messageExt.setCommitLogOffset(123456789L);
         messageExt.setQueueOffset(1234);
+        MessageAccessor.putProperty(messageExt, 
MessageConst.PROPERTY_REAL_TOPIC, TOPIC);
         MessageAccessor.putProperty(messageExt, 
MessageConst.PROPERTY_REAL_QUEUE_ID, "0");
         MessageAccessor.putProperty(messageExt, 
MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
         MessageAccessor.putProperty(messageExt, 
MessageConst.PROPERTY_PRODUCER_GROUP, "testTransactionGroup");
@@ -195,4 +205,12 @@ public class EndTransactionProcessorTest {
         MessageAccessor.putProperty(messageExt, 
MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "60");
         return messageExt;
     }
+
+    private AppendMessageResult createAppendMessageResult(AppendMessageStatus 
status) {
+        AppendMessageResult result = new AppendMessageResult(status);
+        result.setMsgId("12345678");
+        result.setMsgNum(1);
+        result.setWroteBytes(1);
+        return result;
+    }
 }

Reply via email to