This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3fe81bfbac [ISSUE #6545] Remove getTopicConfigs method in interface 
MessageStore (#6531)
3fe81bfbac is described below

commit 3fe81bfbac11ee9a3bf9c21155d2c9c69570bb72
Author: Ao Qiao <[email protected]>
AuthorDate: Sat Apr 15 18:22:06 2023 +0800

    [ISSUE #6545] Remove getTopicConfigs method in interface MessageStore 
(#6531)
    
    * change map -> lambda
    
    * f
    
    * fix unit test
    
    * remove getTopicConfig function
    
    * Update MultiDispatchTest.java
    
    * Update CompactionStore.java
    
    * update
    
    * update test
    
    * update test
    
    * Update BatchConsumeMessageTest.java
    
    * Update BrokerController.java
    
    * Update BrokerController.java
    
    * check
    
    * Update BrokerController.java
    
    * Update BatchConsumeMessageTest.java
---
 .../apache/rocketmq/broker/BrokerController.java   |  3 +-
 .../broker/filter/MessageStoreWithFilterTest.java  |  3 +-
 .../schedule/ScheduleMessageServiceTest.java       |  2 +-
 .../apache/rocketmq/store/DefaultMessageStore.java | 18 +++++-----
 .../org/apache/rocketmq/store/MessageStore.java    | 18 ----------
 .../apache/rocketmq/store/kv/CompactionStore.java  |  6 ++--
 .../store/plugin/AbstractPluginMessageStore.java   | 13 -------
 .../rocketmq/store/queue/ConsumeQueueStore.java    | 26 ++------------
 .../apache/rocketmq/store/AppendCallbackTest.java  |  3 +-
 .../apache/rocketmq/store/BatchPutMessageTest.java |  3 +-
 .../apache/rocketmq/store/ConsumeQueueTest.java    |  5 +--
 .../store/DefaultMessageStoreCleanFilesTest.java   |  3 +-
 .../store/DefaultMessageStoreShutDownTest.java     |  3 +-
 .../rocketmq/store/DefaultMessageStoreTest.java    |  4 +--
 .../java/org/apache/rocketmq/store/HATest.java     |  3 +-
 .../apache/rocketmq/store/MultiDispatchTest.java   |  3 +-
 .../store/dledger/DLedgerMultiPathTest.java        |  3 +-
 .../store/dledger/MessageStoreTestBase.java        |  5 +--
 .../store/ha/autoswitch/AutoSwitchHATest.java      |  3 +-
 .../store/queue/BatchConsumeMessageTest.java       | 41 +++++++++++++++-------
 .../store/queue/BatchConsumeQueueTest.java         |  3 +-
 .../store/queue/ConsumeQueueStoreTest.java         | 27 +++++++++-----
 .../rocketmq/store/queue/ConsumeQueueTest.java     |  5 +--
 .../apache/rocketmq/store/queue/QueueTestBase.java |  8 ++---
 .../store/timer/TimerMessageStoreTest.java         |  3 +-
 25 files changed, 101 insertions(+), 113 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 70e59e098d..a35618dc0a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -737,8 +737,7 @@ public class BrokerController {
 
         if (result) {
             try {
-                DefaultMessageStore defaultMessageStore = new 
DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
this.messageArrivingListener, this.brokerConfig);
-                
defaultMessageStore.setTopicConfigTable(topicConfigManager.getTopicConfigTable());
+                DefaultMessageStore defaultMessageStore = new 
DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
this.messageArrivingListener, this.brokerConfig, 
topicConfigManager.getTopicConfigTable());
 
                 if (messageStoreConfig.isEnableDLegerCommitLog()) {
                     DLedgerRoleChangeHandler roleChangeHandler = new 
DLedgerRoleChangeHandler(this, defaultMessageStore);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
index 678c5079df..84bca91699 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
@@ -159,7 +160,7 @@ public class MessageStoreWithFilterTest {
                                      long msgStoreTime, byte[] filterBitMap, 
Map<String, String> properties) {
                 }
             }
-            , brokerConfig);
+            , brokerConfig, new ConcurrentHashMap<>());
 
         master.getDispatcherList().addFirst(new CommitLogDispatcher() {
             @Override
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
index d3f6753d23..b90fb2931d 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
@@ -119,7 +119,7 @@ public class ScheduleMessageServiceTest {
 
         brokerConfig = new BrokerConfig();
         BrokerStatsManager manager = new 
BrokerStatsManager(brokerConfig.getBrokerClusterName(), 
brokerConfig.isEnableDetailStat());
-        messageStore = new DefaultMessageStore(messageStoreConfig, manager, 
new MyMessageArrivingListener(), new BrokerConfig());
+        messageStore = new DefaultMessageStore(messageStoreConfig, manager, 
new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());
 
         assertThat(messageStore.load()).isTrue();
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index e51132bbf2..73b0f42e58 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -200,16 +200,20 @@ public class DefaultMessageStore implements MessageStore {
 
     private long stateMachineVersion = 0L;
 
+    // this is a unmodifiableMap
+    private ConcurrentMap<String, TopicConfig> topicConfigTable;
+
     private final ScheduledExecutorService scheduledCleanQueueExecutorService =
         Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
 
     public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, 
final BrokerStatsManager brokerStatsManager,
-        final MessageArrivingListener messageArrivingListener, final 
BrokerConfig brokerConfig) throws IOException {
+        final MessageArrivingListener messageArrivingListener, final 
BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig> 
topicConfigTable) throws IOException {
         this.messageArrivingListener = messageArrivingListener;
         this.brokerConfig = brokerConfig;
         this.messageStoreConfig = messageStoreConfig;
         this.aliveReplicasNum = messageStoreConfig.getTotalReplicas();
         this.brokerStatsManager = brokerStatsManager;
+        this.topicConfigTable = topicConfigTable;
         this.allocateMappedFileService = new AllocateMappedFileService(this);
         if (messageStoreConfig.isEnableDLegerCommitLog()) {
             this.commitLog = new DLedgerCommitLog(this);
@@ -2047,18 +2051,16 @@ public class DefaultMessageStore implements 
MessageStore {
         }
     }
 
-    @Override
     public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
-        return this.consumeQueueStore.getTopicConfigs();
+        return this.topicConfigTable;
     }
 
-    @Override
     public Optional<TopicConfig> getTopicConfig(String topic) {
-        return this.consumeQueueStore.getTopicConfig(topic);
-    }
+        if (this.topicConfigTable == null) {
+            return Optional.empty();
+        }
 
-    public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> 
topicConfigTable) {
-        this.consumeQueueStore.setTopicConfigTable(topicConfigTable);
+        return Optional.ofNullable(this.topicConfigTable.get(topic));
     }
 
     public BrokerIdentity getBrokerIdentity() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index f77739fc47..a7da245551 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -25,15 +25,12 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.SystemClock;
-import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBatch;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -736,21 +733,6 @@ public interface MessageStore {
      */
     void assignOffset(MessageExtBrokerInner msg, short messageNum);
 
-    /**
-     * get all topic config
-     *
-     * @return all topic config info
-     */
-    Map<String, TopicConfig> getTopicConfigs();
-
-    /**
-     * get topic config
-     *
-     * @param topic topic name
-     * @return topic config info
-     */
-    Optional<TopicConfig> getTopicConfig(String topic);
-
     /**
      * Get master broker message store in process in broker container
      *
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java 
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index b6d73b81c7..3ba37df957 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -29,9 +29,9 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 
@@ -53,7 +53,7 @@ public class CompactionStore {
     private final String compactionPath;
     private final String compactionLogPath;
     private final String compactionCqPath;
-    private final MessageStore defaultMessageStore;
+    private final DefaultMessageStore defaultMessageStore;
     private final CompactionPositionMgr positionMgr;
     private final ConcurrentHashMap<String, CompactionLog> compactionLogTable;
     private final ScheduledExecutorService compactionSchedule;
@@ -65,7 +65,7 @@ public class CompactionStore {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
-    public CompactionStore(MessageStore defaultMessageStore) {
+    public CompactionStore(DefaultMessageStore defaultMessageStore) {
         this.defaultMessageStore = defaultMessageStore;
         this.compactionLogTable = new ConcurrentHashMap<>();
         MessageStoreConfig config = 
defaultMessageStore.getMessageStoreConfig();
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 3f43adc12d..89c3e53b6b 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -26,15 +26,12 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.SystemClock;
-import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBatch;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -595,16 +592,6 @@ public abstract class AbstractPluginMessageStore 
implements MessageStore {
         next.assignOffset(msg, messageNum);
     }
 
-    @Override
-    public Map<String, TopicConfig> getTopicConfigs() {
-        return next.getTopicConfigs();
-    }
-
-    @Override
-    public Optional<TopicConfig> getTopicConfig(String topic) {
-        return next.getTopicConfig(topic);
-    }
-
     @Override
     public List<PutMessageHook> getPutMessageHookList() {
         return next.getPutMessageHookList();
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 082e7bbb38..7d7878f129 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -64,20 +64,12 @@ public class ConsumeQueueStore {
     protected final QueueOffsetAssigner queueOffsetAssigner = new 
QueueOffsetAssigner();
     protected final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* 
queueId */, ConsumeQueueInterface>> consumeQueueTable;
 
-    // Should be careful, do not change the topic config
-    // TopicConfigManager is more suitable here.
-    private ConcurrentMap<String, TopicConfig> topicConfigTable;
-
     public ConsumeQueueStore(DefaultMessageStore messageStore, 
MessageStoreConfig messageStoreConfig) {
         this.messageStore = messageStore;
         this.messageStoreConfig = messageStoreConfig;
         this.consumeQueueTable = new ConcurrentHashMap<>(32);
     }
 
-    public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> 
topicConfigTable) {
-        this.topicConfigTable = topicConfigTable;
-    }
-
     private FileQueueLifeCycle getLifeCycle(String topic, int queueId) {
         return findOrCreateConsumeQueue(topic, queueId);
     }
@@ -173,9 +165,9 @@ public class ConsumeQueueStore {
     }
 
     private void queueTypeShouldBe(String topic, CQType cqTypeExpected) {
-        TopicConfig topicConfig = this.topicConfigTable == null ? null : 
this.topicConfigTable.get(topic);
+        Optional<TopicConfig> topicConfig = 
this.messageStore.getTopicConfig(topic);
 
-        CQType cqTypeActual = 
QueueTypeUtils.getCQType(Optional.ofNullable(topicConfig));
+        CQType cqTypeActual = QueueTypeUtils.getCQType(topicConfig);
 
         if (!Objects.equals(cqTypeExpected, cqTypeActual)) {
             throw new RuntimeException(format("The queue type of topic: %s 
should be %s, but is %s", topic, cqTypeExpected, cqTypeActual));
@@ -341,7 +333,7 @@ public class ConsumeQueueStore {
 
         ConsumeQueueInterface newLogic;
 
-        Optional<TopicConfig> topicConfig = this.getTopicConfig(topic);
+        Optional<TopicConfig> topicConfig = 
this.messageStore.getTopicConfig(topic);
         // TODO maybe the topic has been deleted.
         if (Objects.equals(CQType.BatchCQ, 
QueueTypeUtils.getCQType(topicConfig))) {
             newLogic = new BatchConsumeQueue(
@@ -537,18 +529,6 @@ public class ConsumeQueueStore {
         }
     }
 
-    public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
-        return this.topicConfigTable;
-    }
-
-    public Optional<TopicConfig> getTopicConfig(String topic) {
-        if (this.topicConfigTable == null) {
-            return Optional.empty();
-        }
-
-        return Optional.ofNullable(this.topicConfigTable.get(topic));
-    }
-
     public long getTotalSize() {
         long totalSize = 0;
         for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : 
this.consumeQueueTable.values()) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java 
b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
index dc1af78b3f..87bfe85da2 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
@@ -55,7 +56,7 @@ public class AppendCallbackTest {
         
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + 
File.separator + "unitteststore");
         
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + 
File.separator + "unitteststore" + File.separator + "commitlog");
         //too much reference
-        DefaultMessageStore messageStore = new 
DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig());
+        DefaultMessageStore messageStore = new 
DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new 
ConcurrentHashMap<>());
         CommitLog commitLog = new CommitLog(messageStore);
         callback = commitLog.new DefaultAppendMessageCallback();
     }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java 
b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
index 8332f38c3c..43ca38eb48 100644
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
@@ -78,7 +79,7 @@ public class BatchPutMessageTest {
         
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + 
File.separator
             + "putmessagesteststore" + File.separator + "commitlog");
         messageStoreConfig.setHaListenPort(0);
-        return new DefaultMessageStore(messageStoreConfig, new 
BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new 
BrokerConfig());
+        return new DefaultMessageStore(messageStoreConfig, new 
BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new 
BrokerConfig(), new ConcurrentHashMap<>());
     }
 
     @Test
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java 
b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index d80a6f25fa..2e08369bde 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
@@ -151,7 +152,7 @@ public class ConsumeQueueTest {
                     long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
                 }
             }
-            , brokerConfig);
+            , brokerConfig, new ConcurrentHashMap<>());
 
         assertThat(master.load()).isTrue();
 
@@ -179,7 +180,7 @@ public class ConsumeQueueTest {
                     long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
                 }
             }
-            , brokerConfig);
+            , brokerConfig, new ConcurrentHashMap<>());
 
         assertThat(master.load()).isTrue();
 
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
index 601d50c0f5..083aabc48b 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.store;
 
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
@@ -483,7 +484,7 @@ public class DefaultMessageStoreCleanFilesTest {
 
     private void initMessageStore(MessageStoreConfig messageStoreConfig, 
double diskSpaceCleanForciblyRatio) throws Exception {
         messageStore = new DefaultMessageStore(messageStoreConfig,
-                new BrokerStatsManager("test", true), new 
MyMessageArrivingListener(), new BrokerConfig());
+                new BrokerStatsManager("test", true), new 
MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());
 
         cleanCommitLogService = getCleanCommitLogService();
         cleanConsumeQueueService = getCleanConsumeQueueService();
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
index 7329098a38..515a4845a4 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.store;
 
 import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.store.config.FlushDiskType;
@@ -74,7 +75,7 @@ public class DefaultMessageStoreShutDownTest {
         String storeRootPath = System.getProperty("java.io.tmpdir") + 
File.separator + "store";
         messageStoreConfig.setStorePathRootDir(storeRootPath);
         messageStoreConfig.setHaListenPort(0);
-        return new DefaultMessageStore(messageStoreConfig, new 
BrokerStatsManager("simpleTest", true), null, new BrokerConfig());
+        return new DefaultMessageStore(messageStoreConfig, new 
BrokerStatsManager("simpleTest", true), null, new BrokerConfig(), new 
ConcurrentHashMap<>());
     }
 
 }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 4675157166..2f22de4d11 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -99,7 +99,7 @@ public class DefaultMessageStoreTest {
         messageStoreConfig.setMaxIndexNum(100 * 10);
         
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + 
File.separator + "store");
         messageStoreConfig.setHaListenPort(0);
-        MessageStore master = new DefaultMessageStore(messageStoreConfig, 
null, new MyMessageArrivingListener(), new BrokerConfig());
+        MessageStore master = new DefaultMessageStore(messageStoreConfig, 
null, new MyMessageArrivingListener(), new BrokerConfig(), new 
ConcurrentHashMap<>());
 
         boolean load = master.load();
         assertTrue(load);
@@ -144,7 +144,7 @@ public class DefaultMessageStoreTest {
         return new DefaultMessageStore(messageStoreConfig,
             new BrokerStatsManager("simpleTest", true),
             new MyMessageArrivingListener(),
-            new BrokerConfig());
+            new BrokerConfig(), new ConcurrentHashMap<>());
     }
 
     @Test
diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java 
b/store/src/test/java/org/apache/rocketmq/store/HATest.java
index e1dc16bdc1..38a0435817 100644
--- a/store/src/test/java/org/apache/rocketmq/store/HATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java
@@ -27,6 +27,7 @@ import java.net.SocketAddress;
 import java.time.Duration;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
@@ -250,7 +251,7 @@ public class HATest {
     private MessageStore buildMessageStore(MessageStoreConfig 
messageStoreConfig, long brokerId) throws Exception {
         BrokerConfig brokerConfig = new BrokerConfig();
         brokerConfig.setBrokerId(brokerId);
-        return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, 
null, brokerConfig);
+        return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, 
null, brokerConfig, new ConcurrentHashMap<>());
     }
 
     private void buildMessageStoreConfig(MessageStoreConfig 
messageStoreConfig) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java 
b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
index daa17eef88..3ae4b2be56 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -57,7 +58,7 @@ public class MultiDispatchTest {
         messageStoreConfig.setEnableMultiDispatch(true);
         BrokerConfig brokerConfig = new BrokerConfig();
         //too much reference
-        messageStore = new DefaultMessageStore(messageStoreConfig, null, null, 
brokerConfig);
+        messageStore = new DefaultMessageStore(messageStoreConfig, null, null, 
brokerConfig, new ConcurrentHashMap<>());
         consumeQueue = new ConsumeQueue("xxx", 0,
             
getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()), 
messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore);
     }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java
index ebeba5013a..5eb8320732 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.store.DefaultMessageStore;
@@ -107,7 +108,7 @@ public class DLedgerMultiPathTest extends 
MessageStoreTestBase {
         storeConfig.setdLegerSelfId(selfId);
         DefaultMessageStore defaultMessageStore = new 
DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitLogTest", 
true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, 
properties) -> {
 
-        }, new BrokerConfig());
+        }, new BrokerConfig(), new ConcurrentHashMap<>());
         Assert.assertTrue(defaultMessageStore.load());
         defaultMessageStore.start();
         return defaultMessageStore;
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
 
b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
index 3ae0cb64ec..a21806ffcf 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
@@ -21,6 +21,7 @@ import io.openmessaging.storage.dledger.DLedgerServer;
 import java.io.File;
 import java.net.UnknownHostException;
 import java.util.Arrays;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -60,7 +61,7 @@ public class MessageStoreTestBase extends StoreTestBase {
         storeConfig.setRecheckReputOffsetFromCq(true);
         DefaultMessageStore defaultMessageStore = new 
DefaultMessageStore(storeConfig,  new 
BrokerStatsManager("DLedgerCommitlogTest", true), (topic, queueId, logicOffset, 
tagsCode, msgStoreTime, filterBitMap, properties) -> {
 
-        }, new BrokerConfig());
+        }, new BrokerConfig(), new ConcurrentHashMap<>());
         DLedgerServer dLegerServer = ((DLedgerCommitLog) 
defaultMessageStore.getCommitLog()).getdLedgerServer();
         if (leaderId != null) {
             dLegerServer.getdLedgerConfig().setEnableLeaderElector(false);
@@ -109,7 +110,7 @@ public class MessageStoreTestBase extends StoreTestBase {
         storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
         DefaultMessageStore defaultMessageStore = new 
DefaultMessageStore(storeConfig,  new BrokerStatsManager("CommitlogTest", 
true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, 
properties) -> {
 
-        }, new BrokerConfig());
+        }, new BrokerConfig(), new ConcurrentHashMap<>());
 
         if (createAbort) {
             String fileName = 
StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir());
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 3c4e7af8d5..6d105289f0 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.store.ha.autoswitch;
 
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
@@ -497,7 +498,7 @@ public class AutoSwitchHATest {
         BrokerConfig brokerConfig = new BrokerConfig();
         brokerConfig.setBrokerId(brokerId);
         brokerConfig.setEnableControllerMode(true);
-        return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, 
null, brokerConfig);
+        return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, 
null, brokerConfig, new ConcurrentHashMap<>());
     }
 
     private void buildMessageStoreConfig(MessageStoreConfig 
messageStoreConfig, int mappedFileSize) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
index 10c2454afa..e3ac1b6bda 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
@@ -27,6 +27,9 @@ import java.util.Objects;
 import java.util.Queue;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -37,10 +40,10 @@ import 
org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.utils.QueueTypeUtils;
 import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageFilter;
-import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
@@ -55,11 +58,13 @@ import static org.awaitility.Awaitility.await;
 public class BatchConsumeMessageTest extends QueueTestBase {
     private static final int BATCH_NUM = 10;
     private static final int TOTAL_MSGS = 200;
-    private MessageStore messageStore;
+    private DefaultMessageStore messageStore;
+    private ConcurrentMap<String, TopicConfig> topicConfigTableMap;
 
     @Before
     public void init() throws Exception {
-        messageStore = createMessageStore(null, true);
+        this.topicConfigTableMap = new ConcurrentHashMap<>();
+        messageStore = (DefaultMessageStore) createMessageStore(null, true, 
this.topicConfigTableMap);
         messageStore.load();
         messageStore.start();
     }
@@ -76,7 +81,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
     @Test
     public void testSendMessagesToCqTopic() {
         String topic = UUID.randomUUID().toString();
-        createTopic(topic, CQType.SimpleCQ, messageStore);
+        ConcurrentMap<String, TopicConfig>  topicConfigTable = 
createTopicConfigTable(topic, CQType.SimpleCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
 
 //        int batchNum = 10;
 
@@ -100,7 +106,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
     @Test
     public void testSendMessagesToBcqTopic() {
         String topic = UUID.randomUUID().toString();
-        createTopic(topic, CQType.BatchCQ, messageStore);
+        ConcurrentMap<String, TopicConfig>  topicConfigTable = 
createTopicConfigTable(topic, CQType.BatchCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
 
         // case 1 has PROPERTY_INNER_NUM but has no INNER_BATCH_FLAG
 //        MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, 1);
@@ -123,7 +130,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
     @Test
     public void testConsumeBatchMessage() {
         String topic = UUID.randomUUID().toString();
-        createTopic(topic, CQType.BatchCQ, messageStore);
+        ConcurrentMap<String, TopicConfig>  topicConfigTable = 
createTopicConfigTable(topic, CQType.BatchCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
         int batchNum = 10;
 
         MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, 
batchNum);
@@ -153,7 +161,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
     @Test
     public void testNextBeginOffsetConsumeBatchMessage() {
         String topic = UUID.randomUUID().toString();
-        createTopic(topic, CQType.BatchCQ, messageStore);
+        ConcurrentMap<String, TopicConfig>  topicConfigTable = 
createTopicConfigTable(topic, CQType.BatchCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
         Random random = new Random();
         int putMessageCount = 1000;
 
@@ -191,7 +200,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
     public void testGetOffsetInQueueByTime() throws Exception {
         String topic = "testGetOffsetInQueueByTime";
 
-        createTopic(topic, CQType.BatchCQ, messageStore);
+        ConcurrentMap<String, TopicConfig>  topicConfigTable = 
createTopicConfigTable(topic, CQType.BatchCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
         
Assert.assertTrue(QueueTypeUtils.isBatchCq(messageStore.getTopicConfig(topic)));
 
         // The initial min max offset, before and after the creation of 
consume queue
@@ -231,7 +241,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
     @Test
     public void testDispatchNormalConsumeQueue() throws Exception {
         String topic = "TestDispatchBuildConsumeQueue";
-        createTopic(topic, CQType.SimpleCQ, messageStore);
+        ConcurrentMap<String, TopicConfig>  topicConfigTable = 
createTopicConfigTable(topic, CQType.SimpleCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
 
         long timeStart = -1;
         long timeMid = -1;
@@ -291,7 +302,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
         long timeStart = -1;
         long timeMid = -1;
 
-        createTopic(topic, CQType.BatchCQ, messageStore);
+        ConcurrentMap<String, TopicConfig>  topicConfigTable = 
createTopicConfigTable(topic, CQType.BatchCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
 
         for (int i = 0; i < 100; i++) {
             PutMessageResult putMessageResult = 
messageStore.putMessage(buildMessage(topic, batchNum));
@@ -349,7 +361,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
     public void testGetBatchMessageWithinNumber() {
         String topic = UUID.randomUUID().toString();
 
-        createTopic(topic, CQType.BatchCQ, messageStore);
+        ConcurrentMap<String, TopicConfig>  topicConfigTable = 
createTopicConfigTable(topic, CQType.BatchCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
 
         int batchNum = 20;
         for (int i = 0; i < 200; i++) {
@@ -409,7 +422,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
     @Test
     public void testGetBatchMessageWithinSize() {
         String topic = UUID.randomUUID().toString();
-        createTopic(topic, CQType.BatchCQ, messageStore);
+        ConcurrentMap<String, TopicConfig>  topicConfigTable = 
createTopicConfigTable(topic, CQType.BatchCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
 
         int batchNum = 10;
         for (int i = 0; i < 100; i++) {
@@ -463,7 +477,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
     }
 
     protected void putMsg(String topic) {
-        createTopic(topic, CQType.BatchCQ, messageStore);
+        ConcurrentMap<String, TopicConfig>  topicConfigTable = 
createTopicConfigTable(topic, CQType.BatchCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
 
         for (int i = 0; i < TOTAL_MSGS; i++) {
             MessageExtBrokerInner message = buildMessage(topic, BATCH_NUM * (i 
% 2 + 1));
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
index c0a9c4276f..c6525bd836 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.store.queue;
 
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.DefaultMessageStore;
@@ -299,7 +300,7 @@ public class BatchConsumeQueueTest extends StoreTestBase {
             new BrokerStatsManager("simpleTest", true),
             (topic, queueId, logicOffset, tagsCode, msgStoreTime, 
filterBitMap, properties) -> {
             },
-            new BrokerConfig());
+            new BrokerConfig(), new ConcurrentHashMap<>());
     }
 
 }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java
index a8379fcf02..59e1d08791 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.rocketmq.store.queue;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.attribute.CQType;
-import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
@@ -36,10 +38,14 @@ import static org.junit.Assert.assertEquals;
 
 public class ConsumeQueueStoreTest extends QueueTestBase {
     private MessageStore messageStore;
+    private ConcurrentMap<String, TopicConfig> topicConfigTableMap;
+
+
 
     @Before
     public void init() throws Exception {
-        messageStore = createMessageStore(null, true);
+        this.topicConfigTableMap = new ConcurrentHashMap<>();
+        messageStore = createMessageStore(null, true, topicConfigTableMap);
         messageStore.load();
         messageStore.start();
     }
@@ -56,7 +62,8 @@ public class ConsumeQueueStoreTest extends QueueTestBase {
     @Test
     public void testLoadConsumeQueuesWithWrongAttribute() {
         String normalTopic = UUID.randomUUID().toString();
-        createTopic(normalTopic, CQType.SimpleCQ, messageStore);
+        ConcurrentMap<String, TopicConfig> topicConfigTable = 
createTopicConfigTable(normalTopic, CQType.SimpleCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
 
         for (int i = 0; i < 10; i++) {
             PutMessageResult putMessageResult = 
messageStore.putMessage(buildMessage(normalTopic, -1));
@@ -66,10 +73,10 @@ public class ConsumeQueueStoreTest extends QueueTestBase {
         await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
 
         // simulate delete topic but with files left.
-        ((DefaultMessageStore)messageStore).setTopicConfigTable(null);
+        this.topicConfigTableMap.clear();
 
-        createTopic(normalTopic, CQType.BatchCQ, messageStore);
-        messageStore.shutdown();
+        topicConfigTable = createTopicConfigTable(normalTopic, CQType.BatchCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
 
         RuntimeException runtimeException = 
Assert.assertThrows(RuntimeException.class, () -> 
messageStore.getQueueStore().load());
         Assert.assertTrue(runtimeException.getMessage().endsWith("should be 
SimpleCQ, but is BatchCQ"));
@@ -78,7 +85,8 @@ public class ConsumeQueueStoreTest extends QueueTestBase {
     @Test
     public void testLoadBatchConsumeQueuesWithWrongAttribute() {
         String batchTopic = UUID.randomUUID().toString();
-        createTopic(batchTopic, CQType.BatchCQ, messageStore);
+        ConcurrentMap<String, TopicConfig>  topicConfigTable = 
createTopicConfigTable(batchTopic, CQType.BatchCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
 
         for (int i = 0; i < 10; i++) {
             PutMessageResult putMessageResult = 
messageStore.putMessage(buildMessage(batchTopic, 10));
@@ -88,9 +96,10 @@ public class ConsumeQueueStoreTest extends QueueTestBase {
         await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
 
         // simulate delete topic but with files left.
-        ((DefaultMessageStore)messageStore).setTopicConfigTable(null);
+        this.topicConfigTableMap.clear();
 
-        createTopic(batchTopic, CQType.SimpleCQ, messageStore);
+        topicConfigTable = createTopicConfigTable(batchTopic, CQType.SimpleCQ);
+        this.topicConfigTableMap.putAll(topicConfigTable);
         messageStore.shutdown();
 
         RuntimeException runtimeException = 
Assert.assertThrows(RuntimeException.class, () -> 
messageStore.getQueueStore().load());
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java 
b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
index 6a8bfc5bc6..c3c8be52dd 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.attribute.CQType;
@@ -74,7 +75,7 @@ public class ConsumeQueueTest extends QueueTestBase {
         DefaultMessageStore master = new DefaultMessageStore(
             messageStoreConfig, new BrokerStatsManager(brokerConfig),
             (topic, queueId, logicOffset, tagsCode, msgStoreTime, 
filterBitMap, properties) -> {
-            }, brokerConfig);
+            }, brokerConfig, new ConcurrentHashMap<>());
 
         assertThat(master.load()).isTrue();
 
@@ -112,7 +113,7 @@ public class ConsumeQueueTest extends QueueTestBase {
     public void testIterator() throws Exception {
         final int msgNum = 100;
         final int msgSize = 1000;
-        MessageStore messageStore =  createMessageStore(null, true);
+        MessageStore messageStore =  createMessageStore(null, true, null);
         messageStore.load();
         String topic = UUID.randomUUID().toString();
         //The initial min max offset, before and after the creation of consume 
queue
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java 
b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
index a1e1cc1f5c..81dc158db5 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
@@ -41,7 +41,7 @@ import java.util.concurrent.ConcurrentMap;
 
 public class QueueTestBase extends StoreTestBase {
 
-    protected void createTopic(String topic, CQType cqType, MessageStore 
messageStore) {
+    protected ConcurrentMap<String, TopicConfig> createTopicConfigTable(String 
topic, CQType cqType) {
         ConcurrentMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<>();
         TopicConfig topicConfigToBeAdded = new TopicConfig();
 
@@ -51,14 +51,14 @@ public class QueueTestBase extends StoreTestBase {
         topicConfigToBeAdded.setAttributes(attributes);
 
         topicConfigTable.put(topic, topicConfigToBeAdded);
-        ((DefaultMessageStore) 
messageStore).setTopicConfigTable(topicConfigTable);
+        return topicConfigTable;
     }
 
     protected Callable<Boolean> fullyDispatched(MessageStore messageStore) {
         return () -> messageStore.dispatchBehindBytes() == 0;
     }
 
-    protected MessageStore createMessageStore(String baseDir, boolean extent) 
throws Exception {
+    protected MessageStore createMessageStore(String baseDir, boolean extent,  
ConcurrentMap<String, TopicConfig> topicConfigTable) throws Exception {
         if (baseDir == null) {
             baseDir = createBaseDir();
         }
@@ -86,7 +86,7 @@ public class QueueTestBase extends StoreTestBase {
             new BrokerStatsManager("simpleTest", true),
             (topic, queueId, logicOffset, tagsCode, msgStoreTime, 
filterBitMap, properties) -> {
             },
-            new BrokerConfig());
+            new BrokerConfig(), topicConfigTable);
     }
 
     public MessageExtBrokerInner buildMessage(String topic, int batchNum) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index 7ace2d9fe6..63ec97cdb0 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -99,7 +100,7 @@ public class TimerMessageStoreTest {
         storeConfig.setTimerInterceptDelayLevel(true);
         storeConfig.setTimerPrecisionMs(precisionMs);
 
-        messageStore = new DefaultMessageStore(storeConfig, new 
BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new 
BrokerConfig());
+        messageStore = new DefaultMessageStore(storeConfig, new 
BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new 
BrokerConfig(), new ConcurrentHashMap<>());
         boolean load = messageStore.load();
         assertTrue(load);
         messageStore.start();


Reply via email to