This is an automated email from the ASF dual-hosted git repository. dongeforever pushed a commit to branch 5.0.0-alpha in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit ba53f9364fbfc69c56bef25f1f771e61bcfc01d0 Author: dongeforever <[email protected]> AuthorDate: Wed Jan 5 17:11:31 2022 +0800 Fix test for consumer offset --- .../apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 10 ++++++---- .../org/apache/rocketmq/test/statictopic/StaticTopicIT.java | 3 +++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 568a728..5b8a19f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1176,6 +1176,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements continue; } + TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic); + { SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); @@ -1206,14 +1208,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements // the consumerOffset cannot be zero for static topic because of the "double read check" strategy // just remain the logic for dynamic topic // maybe we should remove it in the future - if (consumerOffset < 0) - consumerOffset = 0; + if (mappingDetail == null) { + if (consumerOffset < 0) + consumerOffset = 0; + } offsetWrapper.setBrokerOffset(brokerOffset); offsetWrapper.setConsumerOffset(consumerOffset); - // the consumeOffset is not in this broker for static topic - // and may get the wrong result long timeOffset = consumerOffset - 1; if (timeOffset >= 0) { long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java index 3e8f146..5b3e5fe 100644 --- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java @@ -292,6 +292,7 @@ public class StaticTopicIT extends BaseConf { String group = initConsumerGroup(); RMQNormalProducer producer = getProducer(nsAddr, topic); RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener()); + long start = System.currentTimeMillis(); int queueNum = 10; int msgEachQueue = 100; @@ -314,6 +315,7 @@ public class StaticTopicIT extends BaseConf { Assert.assertNotNull(wrapper); Assert.assertEquals(msgEachQueue, wrapper.getBrokerOffset()); Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset()); + Assert.assertTrue(wrapper.getLastTimestamp() > start); } List<String> brokers = ImmutableList.of(broker2Name, broker3Name, broker1Name); @@ -332,6 +334,7 @@ public class StaticTopicIT extends BaseConf { Assert.assertNotNull(wrapper); Assert.assertEquals(msgEachQueue + brokers.size() * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, wrapper.getBrokerOffset()); Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset()); + Assert.assertTrue(wrapper.getLastTimestamp() > start); } consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener()); consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size());
