This is an automated email from the ASF dual-hosted git repository. dongeforever pushed a commit to branch 5.0.0-alpha-static-topic in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 7f7cf1b7aa4ae15fc4c685f8201dac8d5e2be4f1 Author: dongeforever <[email protected]> AuthorDate: Thu Dec 23 17:21:33 2021 +0800 Add test for topicStats and consumeStats --- .../broker/processor/AdminBrokerProcessor.java | 25 ++++++++++++++++------ .../rocketmq/test/statictopic/StaticTopicIT.java | 19 ++++++++++++++++ .../apache/rocketmq/tools/admin/MQAdminUtils.java | 3 ++- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 61a8898..2891baf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1166,6 +1166,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements continue; } + TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic); + { SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); @@ -1193,17 +1195,26 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements requestHeader.getConsumerGroup(), topic, i); - if (consumerOffset < 0) - consumerOffset = 0; + // the consumerOffset cannot be zero for static topic because of the "double read check" strategy + // just remain the logic for dynamic topic + // maybe we should remove it in the future + if (mappingDetail == null) { + if (consumerOffset < 0) + consumerOffset = 0; + } offsetWrapper.setBrokerOffset(brokerOffset); offsetWrapper.setConsumerOffset(consumerOffset); - long timeOffset = consumerOffset - 1; - if (timeOffset >= 0) { - long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); - if (lastTimestamp > 0) { - offsetWrapper.setLastTimestamp(lastTimestamp); + // the consumeOffset is not in this broker for static topic + // and may get the wrong result + if (mappingDetail == null) { + long timeOffset = consumerOffset - 1; + if (timeOffset >= 0) { + long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); + if (lastTimestamp > 0) { + offsetWrapper.setLastTimestamp(lastTimestamp); + } } } diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java index bc7cf25..22443d7 100644 --- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java @@ -10,6 +10,7 @@ import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; @@ -306,6 +307,15 @@ public class StaticTopicIT extends BaseConf { //use a new producer producer = getProducer(nsAddr, topic); + ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(group); + List<MessageQueue> messageQueues = producer.getMessageQueue(); + for (MessageQueue queue: messageQueues) { + OffsetWrapper wrapper = consumeStats.getOffsetTable().get(queue); + Assert.assertNotNull(wrapper); + Assert.assertEquals(msgEachQueue, wrapper.getBrokerOffset()); + Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset()); + } + List<String> brokers = ImmutableList.of(broker2Name, broker3Name, broker1Name); for (int i = 0; i < brokers.size(); i++) { Set<String> targetBrokers = ImmutableSet.of(brokers.get(i)); @@ -314,6 +324,15 @@ public class StaticTopicIT extends BaseConf { Thread.sleep(500); sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, (i + 1) * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE); } + consumeStats = defaultMQAdminExt.examineConsumeStats(group); + + messageQueues = producer.getMessageQueue(); + for (MessageQueue queue: messageQueues) { + OffsetWrapper wrapper = consumeStats.getOffsetTable().get(queue); + Assert.assertNotNull(wrapper); + Assert.assertEquals(msgEachQueue + brokers.size() * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, wrapper.getBrokerOffset()); + Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset()); + } consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener()); consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size()); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java index cd2c4ac..d915cb1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java @@ -315,6 +315,7 @@ public class MQAdminUtils { if (phyOffsetWrapper == null) { continue; } + if (consumerOffset == -1 && phyOffsetWrapper.getConsumerOffset() >= 0) { consumerOffset = phyOffsetWrapper.getConsumerOffset(); @@ -322,7 +323,7 @@ public class MQAdminUtils { } if (brokerOffset == -1 && item.getLogicOffset() >= 0) { - brokerOffset = item.computeStaticQueueOffsetStrictly(offsetWrapper.getBrokerOffset()); + brokerOffset = item.computeStaticQueueOffsetStrictly(phyOffsetWrapper.getBrokerOffset()); } if (consumerOffset >= 0 && brokerOffset >= 0) {
