This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch 5.0.0-alpha in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 6172ad371384619b0200ea5424d725c4ce9400f8 Author: Hongjian Fei <[email protected]> AuthorDate: Thu Jan 6 20:07:21 2022 +0800 [ISSUE #3708] Refactor CQ and BCQ loading process and Fix some unit-tests issue. (#3713) * [RIP-26] Both CQ and BCQ will be supported in DefaultMessageStore. * [RIP-26] Both CQ and BCQ will be supported in DefaultMessageStore. * Part of ISSUE #3708. Refactor CQ and BCQ loading process. * Mock getDiskSpaceWarningLevelRatio and getDiskSpaceCleanForciblyRatio for ut. * Optimize batch message unit-test. * Mock getDiskSpaceWarningLevelRatio and getDiskSpaceCleanForciblyRatio for ut. * Optimize batch message unit-test. --- .../rocketmq/store/queue/ConsumeQueueStore.java | 127 +++++++++------------ .../store/DefaultMessageStoreCleanFilesTest.java | 28 ++--- .../store/queue/BatchConsumeMessageTest.java | 70 +++++------- .../store/queue/ConsumeQueueStoreTest.java | 100 ++++++++++++++++ .../apache/rocketmq/store/queue/QueueTestBase.java | 37 ++++-- 5 files changed, 214 insertions(+), 148 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index e8146ff..d3bfe75 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -28,7 +28,6 @@ import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.store.config.StorePathConfigHelper; import java.io.File; import java.util.HashMap; @@ -39,6 +38,10 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import static java.lang.String.format; +import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathBatchConsumeQueue; +import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathConsumeQueue; + public class ConsumeQueueStore { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -79,7 +82,8 @@ public class ConsumeQueueStore { * Apply the dispatched request and build the consume queue. * This function should be idempotent. * - * @param request + * @param consumeQueue consume queue + * @param request dispatch request */ public void putMessagePositionInfoWrapper(ConsumeQueueInterface consumeQueue, DispatchRequest request) { FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); @@ -97,11 +101,13 @@ public class ConsumeQueueStore { } public boolean load() { - return loadConsumeQueues() && loadBatchConsumeQueues(); + boolean cqLoadResult = loadConsumeQueues(getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), CQType.SimpleCQ); + boolean bcqLoadResult = loadConsumeQueues(getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), CQType.BatchCQ); + return cqLoadResult && bcqLoadResult; } - private boolean loadBatchConsumeQueues() { - File dirLogic = new File(StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir())); + private boolean loadConsumeQueues(String storePath, CQType cqType) { + File dirLogic = new File(storePath); File[] fileTopicList = dirLogic.listFiles(); if (fileTopicList != null) { @@ -118,24 +124,9 @@ public class ConsumeQueueStore { continue; } - TopicConfig topicConfig = this.topicConfigTable == null ? null : this.topicConfigTable.get(topic); - - // For batch consume queue, the topic config must exist - if (topicConfig == null) { - log.warn("topic: {} has no topic config.", topic); - continue; - } + queueTypeShouldBe(topic, cqType); - if (!Objects.equals(CQType.BatchCQ, QueueTypeUtils.getCQType(Optional.of(topicConfig)))) { - log.error("[BUG]topic: {} should be BCQ.", topic); - } - - ConsumeQueueInterface logic = new BatchConsumeQueue( - topic, - queueId, - StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), - this.messageStoreConfig.getMappedFileSizeConsumeQueue(), - this.messageStore); + ConsumeQueueInterface logic = createConsumeQueueByType(cqType, topic, queueId, storePath); this.putConsumeQueue(topic, queueId, logic); if (!this.load(logic)) { return false; @@ -145,46 +136,39 @@ public class ConsumeQueueStore { } } - log.info("load batch consume queue all over, OK"); + log.info("load {} all over, OK", cqType); return true; } - private boolean loadConsumeQueues() { - File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir())); - File[] fileTopicList = dirLogic.listFiles(); - if (fileTopicList != null) { - - for (File fileTopic : fileTopicList) { - String topic = fileTopic.getName(); - - File[] fileQueueIdList = fileTopic.listFiles(); - if (fileQueueIdList != null) { - for (File fileQueueId : fileQueueIdList) { - int queueId; - try { - queueId = Integer.parseInt(fileQueueId.getName()); - } catch (NumberFormatException e) { - continue; - } - ConsumeQueueInterface logic = new ConsumeQueue( - topic, - queueId, - StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), - this.messageStoreConfig.getMappedFileSizeConsumeQueue(), - this.messageStore); - this.putConsumeQueue(topic, queueId, logic); - if (!this.load(logic)) { - return false; - } - } - } - } + private ConsumeQueueInterface createConsumeQueueByType(CQType cqType, String topic, int queueId, String storePath) { + if (Objects.equals(CQType.SimpleCQ, cqType)) { + return new ConsumeQueue( + topic, + queueId, + storePath, + this.messageStoreConfig.getMappedFileSizeConsumeQueue(), + this.messageStore); + } else if (Objects.equals(CQType.BatchCQ, cqType)) { + return new BatchConsumeQueue( + topic, + queueId, + storePath, + this.messageStoreConfig.getMapperFileSizeBatchConsumeQueue(), + this.messageStore); + } else { + throw new RuntimeException(format("queue type %s is not supported.", cqType.toString())); } + } - log.info("load logics queue all over, OK"); + private void queueTypeShouldBe(String topic, CQType cqTypeExpected) { + TopicConfig topicConfig = this.topicConfigTable == null ? null : this.topicConfigTable.get(topic); - return true; + CQType cqTypeActual = QueueTypeUtils.getCQType(Optional.ofNullable(topicConfig)); + + if (!Objects.equals(cqTypeExpected, cqTypeActual)) { + throw new RuntimeException(format("The queue type of topic: %s should be %s, but is %s", topic, cqTypeExpected, cqTypeActual)); + } } public void recover(ConsumeQueueInterface consumeQueue) { @@ -212,13 +196,9 @@ public class ConsumeQueueStore { } public void checkSelf() { - Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next(); - Iterator<Map.Entry<Integer, ConsumeQueueInterface>> itNext = next.getValue().entrySet().iterator(); - while (itNext.hasNext()) { - Map.Entry<Integer, ConsumeQueueInterface> cq = itNext.next(); - this.checkSelf(cq.getValue()); + for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : this.consumeQueueTable.entrySet()) { + for (Map.Entry<Integer, ConsumeQueueInterface> cqEntry : topicEntry.getValue().entrySet()) { + this.checkSelf(cqEntry.getValue()); } } } @@ -292,28 +272,23 @@ public class ConsumeQueueStore { newLogic = new BatchConsumeQueue( topic, queueId, - StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), + getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.messageStoreConfig.getMapperFileSizeBatchConsumeQueue(), this.messageStore); - ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic); - if (oldLogic != null) { - logic = oldLogic; - } else { - logic = newLogic; - } } else { newLogic = new ConsumeQueue( topic, queueId, - StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), + getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.messageStoreConfig.getMappedFileSizeConsumeQueue(), this.messageStore); - ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic); - if (oldLogic != null) { - logic = oldLogic; - } else { - logic = newLogic; - } + } + + ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic); + if (oldLogic != null) { + logic = oldLogic; + } else { + logic = newLogic; } return logic; diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java index 88cf181..9dad5ea 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java @@ -47,6 +47,8 @@ import static org.apache.rocketmq.common.message.MessageDecoder.CHARSET_UTF8; import static org.apache.rocketmq.store.ConsumeQueue.CQ_STORE_UNIT_SIZE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; /** * Test case for DefaultMessageStore.CleanCommitLogService and DefaultMessageStore.CleanConsumeQueueService @@ -334,26 +336,6 @@ public class DefaultMessageStoreCleanFilesTest { } } - private DefaultMessageStore.CleanCommitLogService getCleanCommitLogService(double diskSpaceCleanForciblyRatio) - throws Exception { - Field serviceField = messageStore.getClass().getDeclaredField("cleanCommitLogService"); - serviceField.setAccessible(true); - DefaultMessageStore.CleanCommitLogService cleanCommitLogService = - (DefaultMessageStore.CleanCommitLogService) serviceField.get(messageStore); - serviceField.setAccessible(false); - - Field warningLevelRatioField = cleanCommitLogService.getClass().getDeclaredField("diskSpaceWarningLevelRatio"); - warningLevelRatioField.setAccessible(true); - warningLevelRatioField.set(cleanCommitLogService, String.valueOf(diskSpaceCleanForciblyRatio)); - warningLevelRatioField.setAccessible(false); - - Field cleanForciblyRatioField = cleanCommitLogService.getClass().getDeclaredField("diskSpaceCleanForciblyRatio"); - cleanForciblyRatioField.setAccessible(true); - cleanForciblyRatioField.set(cleanCommitLogService, String.valueOf(diskSpaceCleanForciblyRatio)); - cleanForciblyRatioField.setAccessible(false); - return cleanCommitLogService; - } - private DefaultMessageStore.CleanConsumeQueueService getCleanConsumeQueueService() throws Exception { Field serviceField = messageStore.getClass().getDeclaredField("cleanConsumeQueueService"); @@ -490,11 +472,15 @@ public class DefaultMessageStoreCleanFilesTest { messageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); - cleanCommitLogService = getCleanCommitLogService(diskSpaceCleanForciblyRatio); cleanConsumeQueueService = getCleanConsumeQueueService(); assertTrue(messageStore.load()); messageStore.start(); + + // partially mock a real obj + cleanCommitLogService = spy(cleanCommitLogService); + when(cleanCommitLogService.getDiskSpaceWarningLevelRatio()).thenReturn(diskSpaceCleanForciblyRatio); + when(cleanCommitLogService.getDiskSpaceCleanForciblyRatio()).thenReturn(diskSpaceCleanForciblyRatio); } private class MyMessageArrivingListener implements MessageArrivingListener { diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java index 500cd81..bc5f896 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java @@ -17,8 +17,6 @@ package org.apache.rocketmq.store.queue; -import org.apache.rocketmq.common.TopicAttributes; -import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.message.MessageAccessor; @@ -27,7 +25,6 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.utils.QueueTypeUtils; -import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.MessageExtBrokerInner; @@ -43,16 +40,11 @@ import org.junit.Test; import java.io.File; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Queue; import java.util.Random; import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; @@ -173,8 +165,9 @@ public class BatchConsumeMessageTest extends QueueTestBase { long pullOffset = 0L; int getMessageCount = 0; + int atMostMsgNum = 1; while (true) { - GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, pullOffset, 1, null); + GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, pullOffset, atMostMsgNum, null); if (Objects.equals(getMessageResult.getStatus(), GetMessageStatus.OFFSET_OVERFLOW_ONE)) { break; } @@ -229,8 +222,9 @@ public class BatchConsumeMessageTest extends QueueTestBase { String topic = "TestDispatchBuildConsumeQueue"; createTopic(topic, CQType.SimpleCQ, messageStore); - long timeStart = System.currentTimeMillis(); + long timeStart = -1; long timeMid = -1; + long commitLogMid = -1; for (int i = 0; i < 100; i++) { MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, -1); @@ -238,8 +232,14 @@ public class BatchConsumeMessageTest extends QueueTestBase { Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); Thread.sleep(2); - if (i == 49) - timeMid = System.currentTimeMillis(); + if (i == 0) { + timeStart = putMessageResult.getAppendMessageResult().getStoreTimestamp(); + } + if (i == 50) { + timeMid = putMessageResult.getAppendMessageResult().getStoreTimestamp(); + commitLogMid = putMessageResult.getAppendMessageResult().getWroteOffset(); + } + } await().atMost(5, SECONDS).until(fullyDispatched(messageStore)); @@ -261,16 +261,14 @@ public class BatchConsumeMessageTest extends QueueTestBase { } //check the message time - long latencyAllowed = 20; - long earlistMessageTime = messageStore.getEarliestMessageTime(topic, 0); - Assert.assertTrue(earlistMessageTime > timeStart - latencyAllowed); - Assert.assertTrue(earlistMessageTime < timeStart + latencyAllowed); + long earliestMessageTime = messageStore.getEarliestMessageTime(topic, 0); + Assert.assertEquals(timeStart, earliestMessageTime); long messageStoreTime = messageStore.getMessageStoreTimeStamp(topic, 0, 50); - Assert.assertTrue(messageStoreTime > timeMid - latencyAllowed); - Assert.assertTrue(messageStoreTime < timeMid + latencyAllowed); + Assert.assertEquals(timeMid, messageStoreTime); long commitLogOffset = messageStore.getCommitLogOffsetInQueue(topic, 0, 50); Assert.assertTrue(commitLogOffset >= messageStore.getMinPhyOffset()); Assert.assertTrue(commitLogOffset <= messageStore.getMaxPhyOffset()); + Assert.assertEquals(commitLogMid, commitLogOffset); Assert.assertFalse(messageStore.checkInDiskByConsumeOffset(topic, 0, 50)); } @@ -279,7 +277,7 @@ public class BatchConsumeMessageTest extends QueueTestBase { public void testDispatchBuildBatchConsumeQueue() throws Exception { String topic = "testDispatchBuildBatchConsumeQueue"; int batchNum = 10; - long timeStart = System.currentTimeMillis(); + long timeStart = -1; long timeMid = -1; createTopic(topic, CQType.BatchCQ, messageStore); @@ -288,8 +286,13 @@ public class BatchConsumeMessageTest extends QueueTestBase { PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(topic, batchNum)); Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); Thread.sleep(2); - if (i == 29) - timeMid = System.currentTimeMillis(); + if (i == 0) { + timeStart = putMessageResult.getAppendMessageResult().getStoreTimestamp(); + } + if (i == 30) { + timeMid = putMessageResult.getAppendMessageResult().getStoreTimestamp();; + } + } await().atMost(5, SECONDS).until(fullyDispatched(messageStore)); @@ -309,12 +312,10 @@ public class BatchConsumeMessageTest extends QueueTestBase { } //check the message time - long earlistMessageTime = messageStore.getEarliestMessageTime(topic, 0); - Assert.assertTrue(earlistMessageTime > timeStart - 20); - Assert.assertTrue(earlistMessageTime < timeStart + 20); + long earliestMessageTime = messageStore.getEarliestMessageTime(topic, 0); + Assert.assertEquals(earliestMessageTime, timeStart); long messageStoreTime = messageStore.getMessageStoreTimeStamp(topic, 0, 300); - Assert.assertTrue(messageStoreTime > timeMid - 20); - Assert.assertTrue(messageStoreTime < timeMid + 20); + Assert.assertEquals(messageStoreTime, timeMid); long commitLogOffset = messageStore.getCommitLogOffsetInQueue(topic, 0, 300); Assert.assertTrue(commitLogOffset >= messageStore.getMinPhyOffset()); Assert.assertTrue(commitLogOffset <= messageStore.getMaxPhyOffset()); @@ -450,21 +451,4 @@ public class BatchConsumeMessageTest extends QueueTestBase { } } - private void createTopic(String topic, CQType cqType, MessageStore messageStore) { - ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(); - TopicConfig topicConfigToBeAdded = new TopicConfig(); - - Map<String, String> attributes = new HashMap<>(); - attributes.put(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), cqType.toString()); - topicConfigToBeAdded.setTopicName(topic); - topicConfigToBeAdded.setAttributes(attributes); - - topicConfigTable.put(topic, topicConfigToBeAdded); - ((DefaultMessageStore)messageStore).setTopicConfigTable(topicConfigTable); - } - - private Callable<Boolean> fullyDispatched(MessageStore messageStore) { - return () -> messageStore.dispatchBehindBytes() == 0; - } - } diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java new file mode 100644 index 0000000..a8379fc --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.queue; + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.attribute.CQType; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.UUID; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; + +public class ConsumeQueueStoreTest extends QueueTestBase { + private MessageStore messageStore; + + @Before + public void init() throws Exception { + messageStore = createMessageStore(null, true); + messageStore.load(); + messageStore.start(); + } + + @After + public void destroy() { + messageStore.shutdown(); + messageStore.destroy(); + + File file = new File(messageStore.getMessageStoreConfig().getStorePathRootDir()); + UtilAll.deleteFile(file); + } + + @Test + public void testLoadConsumeQueuesWithWrongAttribute() { + String normalTopic = UUID.randomUUID().toString(); + createTopic(normalTopic, CQType.SimpleCQ, messageStore); + + for (int i = 0; i < 10; i++) { + PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(normalTopic, -1)); + assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); + } + + await().atMost(5, SECONDS).until(fullyDispatched(messageStore)); + + // simulate delete topic but with files left. + ((DefaultMessageStore)messageStore).setTopicConfigTable(null); + + createTopic(normalTopic, CQType.BatchCQ, messageStore); + messageStore.shutdown(); + + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> messageStore.getQueueStore().load()); + Assert.assertTrue(runtimeException.getMessage().endsWith("should be SimpleCQ, but is BatchCQ")); + } + + @Test + public void testLoadBatchConsumeQueuesWithWrongAttribute() { + String batchTopic = UUID.randomUUID().toString(); + createTopic(batchTopic, CQType.BatchCQ, messageStore); + + for (int i = 0; i < 10; i++) { + PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(batchTopic, 10)); + assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); + } + + await().atMost(5, SECONDS).until(fullyDispatched(messageStore)); + + // simulate delete topic but with files left. + ((DefaultMessageStore)messageStore).setTopicConfigTable(null); + + createTopic(batchTopic, CQType.SimpleCQ, messageStore); + messageStore.shutdown(); + + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> messageStore.getQueueStore().load()); + Assert.assertTrue(runtimeException.getMessage().endsWith("should be BatchCQ, but is SimpleCQ")); + } + +} diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java index c16d7cb..506cbd6 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java @@ -17,6 +17,9 @@ package org.apache.rocketmq.store.queue; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.TopicAttributes; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; @@ -31,10 +34,31 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; import java.io.File; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; public class QueueTestBase extends StoreTestBase { + protected void createTopic(String topic, CQType cqType, MessageStore messageStore) { + ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(); + TopicConfig topicConfigToBeAdded = new TopicConfig(); + + Map<String, String> attributes = new HashMap<>(); + attributes.put(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), cqType.toString()); + topicConfigToBeAdded.setTopicName(topic); + topicConfigToBeAdded.setAttributes(attributes); + + topicConfigTable.put(topic, topicConfigToBeAdded); + ((DefaultMessageStore)messageStore).setTopicConfigTable(topicConfigTable); + } + + protected Callable<Boolean> fullyDispatched(MessageStore messageStore) { + return () -> messageStore.dispatchBehindBytes() == 0; + } + protected MessageStore createMessageStore(String baseDir, boolean extent) throws Exception { if (baseDir == null) { baseDir = createBaseDir(); @@ -58,14 +82,11 @@ public class QueueTestBase extends StoreTestBase { messageStoreConfig.setFlushIntervalCommitLog(1); messageStoreConfig.setFlushCommitLogThoroughInterval(2); - DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MessageArrivingListener() { - @Override - public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, - byte[] filterBitMap, Map<String, String> properties) { - - } - }, new BrokerConfig()); - return messageStore; + return new DefaultMessageStore( + messageStoreConfig, + new BrokerStatsManager("simpleTest"), + (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {}, + new BrokerConfig()); } public MessageExtBrokerInner buildMessage(String topic, int batchNum) {
