This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 7f7cf1b7aa4ae15fc4c685f8201dac8d5e2be4f1
Author: dongeforever <[email protected]>
AuthorDate: Thu Dec 23 17:21:33 2021 +0800

    Add test for topicStats and consumeStats
---
 .../broker/processor/AdminBrokerProcessor.java     | 25 ++++++++++++++++------
 .../rocketmq/test/statictopic/StaticTopicIT.java   | 19 ++++++++++++++++
 .../apache/rocketmq/tools/admin/MQAdminUtils.java  |  3 ++-
 3 files changed, 39 insertions(+), 8 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 61a8898..2891baf 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1166,6 +1166,8 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
                 continue;
             }
 
+            TopicQueueMappingDetail mappingDetail = 
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
+
             {
                 SubscriptionData findSubscriptionData =
                     
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(),
 topic);
@@ -1193,17 +1195,26 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
                     requestHeader.getConsumerGroup(),
                     topic,
                     i);
-                if (consumerOffset < 0)
-                    consumerOffset = 0;
+                // the consumerOffset cannot be zero for static topic because 
of the "double read check" strategy
+                // just remain the logic for dynamic topic
+                // maybe we should remove it in the future
+                if (mappingDetail == null) {
+                    if (consumerOffset < 0)
+                        consumerOffset = 0;
+                }
 
                 offsetWrapper.setBrokerOffset(brokerOffset);
                 offsetWrapper.setConsumerOffset(consumerOffset);
 
-                long timeOffset = consumerOffset - 1;
-                if (timeOffset >= 0) {
-                    long lastTimestamp = 
this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, 
timeOffset);
-                    if (lastTimestamp > 0) {
-                        offsetWrapper.setLastTimestamp(lastTimestamp);
+                // the consumeOffset is not in this broker for static topic
+                // and may get the wrong result
+                if (mappingDetail == null) {
+                    long timeOffset = consumerOffset - 1;
+                    if (timeOffset >= 0) {
+                        long lastTimestamp = 
this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, 
timeOffset);
+                        if (lastTimestamp > 0) {
+                            offsetWrapper.setLastTimestamp(lastTimestamp);
+                        }
                     }
                 }
 
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java 
b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index bc7cf25..22443d7 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -10,6 +10,7 @@ import org.apache.rocketmq.acl.common.AclUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -306,6 +307,15 @@ public class StaticTopicIT extends BaseConf {
         //use a new producer
         producer = getProducer(nsAddr, topic);
 
+        ConsumeStats consumeStats = 
defaultMQAdminExt.examineConsumeStats(group);
+        List<MessageQueue> messageQueues = producer.getMessageQueue();
+        for (MessageQueue queue: messageQueues) {
+            OffsetWrapper wrapper = consumeStats.getOffsetTable().get(queue);
+            Assert.assertNotNull(wrapper);
+            Assert.assertEquals(msgEachQueue, wrapper.getBrokerOffset());
+            Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset());
+        }
+
         List<String> brokers = ImmutableList.of(broker2Name, broker3Name, 
broker1Name);
         for (int i = 0; i < brokers.size(); i++) {
             Set<String> targetBrokers = ImmutableSet.of(brokers.get(i));
@@ -314,6 +324,15 @@ public class StaticTopicIT extends BaseConf {
             Thread.sleep(500);
             sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, 
msgEachQueue, (i + 1) * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
         }
+        consumeStats = defaultMQAdminExt.examineConsumeStats(group);
+
+        messageQueues = producer.getMessageQueue();
+        for (MessageQueue queue: messageQueues) {
+            OffsetWrapper wrapper = consumeStats.getOffsetTable().get(queue);
+            Assert.assertNotNull(wrapper);
+            Assert.assertEquals(msgEachQueue + brokers.size() * 
TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, wrapper.getBrokerOffset());
+            Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset());
+        }
         consumer = getConsumer(nsAddr, group, topic, "*", new 
RMQNormalListener());
         consumeMessagesAndCheck(producer, consumer, topic, queueNum, 
msgEachQueue, 1, brokers.size());
     }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
index cd2c4ac..d915cb1 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -315,6 +315,7 @@ public class MQAdminUtils {
                 if (phyOffsetWrapper == null) {
                     continue;
                 }
+
                 if (consumerOffset == -1
                     && phyOffsetWrapper.getConsumerOffset() >= 0) {
                     consumerOffset = phyOffsetWrapper.getConsumerOffset();
@@ -322,7 +323,7 @@ public class MQAdminUtils {
                 }
                 if (brokerOffset == -1
                     && item.getLogicOffset() >= 0) {
-                    brokerOffset = 
item.computeStaticQueueOffsetStrictly(offsetWrapper.getBrokerOffset());
+                    brokerOffset = 
item.computeStaticQueueOffsetStrictly(phyOffsetWrapper.getBrokerOffset());
                 }
                 if (consumerOffset >= 0
                     && brokerOffset >= 0) {

Reply via email to