This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 01761bdc7a fix transaction msg stats (#7766)
01761bdc7a is described below
commit 01761bdc7a42bcb4e6716d2e2666549b4a9e0a6c
Author: kaikoo <[email protected]>
AuthorDate: Thu Feb 29 08:21:03 2024 +0800
fix transaction msg stats (#7766)
---
.../broker/processor/EndTransactionProcessor.java | 3 +++
.../processor/EndTransactionProcessorTest.java | 22 ++++++++++++++++++++--
2 files changed, 23 insertions(+), 2 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index e812a53ba7..468a8791d4 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -279,6 +279,9 @@ public class EndTransactionProcessor implements
NettyRequestProcessor {
switch (putMessageResult.getPutMessageStatus()) {
// Success
case PUT_OK:
+
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getMsgNum(), 1);
+
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
+
this.brokerController.getBrokerStatsManager().incBrokerPutNums(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getMsgNum());
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
index a364a1bbee..226d40ff02 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.stats.Stats;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
@@ -56,6 +57,8 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class EndTransactionProcessorTest {
+ private static final String TOPIC = "trans_topic_test";
+
private EndTransactionProcessor endTransactionProcessor;
@Mock
@@ -95,20 +98,26 @@ public class EndTransactionProcessorTest {
public void testProcessRequest() throws RemotingCommandException {
when(transactionMsgService.commitMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
when(messageStore.putMessage(any(MessageExtBrokerInner.class)))
- .thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+ .thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK,
createAppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request =
createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, false);
RemotingCommand response =
endTransactionProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.BROKER_PUT_NUMS,
brokerController.getBrokerConfig().getBrokerClusterName()).getValue().sum()).isEqualTo(1);
+
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_NUMS,
TOPIC).getValue().sum()).isEqualTo(1L);
+
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_SIZE,
TOPIC).getValue().sum()).isEqualTo(1L);
}
@Test
public void testProcessRequest_CheckMessage() throws
RemotingCommandException {
when(transactionMsgService.commitMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
when(messageStore.putMessage(any(MessageExtBrokerInner.class)))
- .thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+ .thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK,
createAppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request =
createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, true);
RemotingCommand response =
endTransactionProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.BROKER_PUT_NUMS,
brokerController.getBrokerConfig().getBrokerClusterName()).getValue().sum()).isEqualTo(1);
+
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_NUMS,
TOPIC).getValue().sum()).isEqualTo(1L);
+
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_SIZE,
TOPIC).getValue().sum()).isEqualTo(1L);
}
@Test
@@ -148,6 +157,7 @@ public class EndTransactionProcessorTest {
messageExt.setQueueId(0);
messageExt.setCommitLogOffset(123456789L);
messageExt.setQueueOffset(1234);
+ MessageAccessor.putProperty(messageExt,
MessageConst.PROPERTY_REAL_TOPIC, TOPIC);
MessageAccessor.putProperty(messageExt,
MessageConst.PROPERTY_REAL_QUEUE_ID, "0");
MessageAccessor.putProperty(messageExt,
MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(messageExt,
MessageConst.PROPERTY_PRODUCER_GROUP, "testTransactionGroup");
@@ -195,4 +205,12 @@ public class EndTransactionProcessorTest {
MessageAccessor.putProperty(messageExt,
MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "60");
return messageExt;
}
+
+ private AppendMessageResult createAppendMessageResult(AppendMessageStatus
status) {
+ AppendMessageResult result = new AppendMessageResult(status);
+ result.setMsgId("12345678");
+ result.setMsgNum(1);
+ result.setWroteBytes(1);
+ return result;
+ }
}