This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3fe81bfbac [ISSUE #6545] Remove getTopicConfigs method in interface
MessageStore (#6531)
3fe81bfbac is described below
commit 3fe81bfbac11ee9a3bf9c21155d2c9c69570bb72
Author: Ao Qiao <[email protected]>
AuthorDate: Sat Apr 15 18:22:06 2023 +0800
[ISSUE #6545] Remove getTopicConfigs method in interface MessageStore
(#6531)
* change map -> lambda
* f
* fix unit test
* remove getTopicConfig function
* Update MultiDispatchTest.java
* Update CompactionStore.java
* update
* update test
* update test
* Update BatchConsumeMessageTest.java
* Update BrokerController.java
* Update BrokerController.java
* check
* Update BrokerController.java
* Update BatchConsumeMessageTest.java
---
.../apache/rocketmq/broker/BrokerController.java | 3 +-
.../broker/filter/MessageStoreWithFilterTest.java | 3 +-
.../schedule/ScheduleMessageServiceTest.java | 2 +-
.../apache/rocketmq/store/DefaultMessageStore.java | 18 +++++-----
.../org/apache/rocketmq/store/MessageStore.java | 18 ----------
.../apache/rocketmq/store/kv/CompactionStore.java | 6 ++--
.../store/plugin/AbstractPluginMessageStore.java | 13 -------
.../rocketmq/store/queue/ConsumeQueueStore.java | 26 ++------------
.../apache/rocketmq/store/AppendCallbackTest.java | 3 +-
.../apache/rocketmq/store/BatchPutMessageTest.java | 3 +-
.../apache/rocketmq/store/ConsumeQueueTest.java | 5 +--
.../store/DefaultMessageStoreCleanFilesTest.java | 3 +-
.../store/DefaultMessageStoreShutDownTest.java | 3 +-
.../rocketmq/store/DefaultMessageStoreTest.java | 4 +--
.../java/org/apache/rocketmq/store/HATest.java | 3 +-
.../apache/rocketmq/store/MultiDispatchTest.java | 3 +-
.../store/dledger/DLedgerMultiPathTest.java | 3 +-
.../store/dledger/MessageStoreTestBase.java | 5 +--
.../store/ha/autoswitch/AutoSwitchHATest.java | 3 +-
.../store/queue/BatchConsumeMessageTest.java | 41 +++++++++++++++-------
.../store/queue/BatchConsumeQueueTest.java | 3 +-
.../store/queue/ConsumeQueueStoreTest.java | 27 +++++++++-----
.../rocketmq/store/queue/ConsumeQueueTest.java | 5 +--
.../apache/rocketmq/store/queue/QueueTestBase.java | 8 ++---
.../store/timer/TimerMessageStoreTest.java | 3 +-
25 files changed, 101 insertions(+), 113 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 70e59e098d..a35618dc0a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -737,8 +737,7 @@ public class BrokerController {
if (result) {
try {
- DefaultMessageStore defaultMessageStore = new
DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,
this.messageArrivingListener, this.brokerConfig);
-
defaultMessageStore.setTopicConfigTable(topicConfigManager.getTopicConfigTable());
+ DefaultMessageStore defaultMessageStore = new
DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,
this.messageArrivingListener, this.brokerConfig,
topicConfigManager.getTopicConfigTable());
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new
DLedgerRoleChangeHandler(this, defaultMessageStore);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
index 678c5079df..84bca91699 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
@@ -159,7 +160,7 @@ public class MessageStoreWithFilterTest {
long msgStoreTime, byte[] filterBitMap,
Map<String, String> properties) {
}
}
- , brokerConfig);
+ , brokerConfig, new ConcurrentHashMap<>());
master.getDispatcherList().addFirst(new CommitLogDispatcher() {
@Override
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
index d3f6753d23..b90fb2931d 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
@@ -119,7 +119,7 @@ public class ScheduleMessageServiceTest {
brokerConfig = new BrokerConfig();
BrokerStatsManager manager = new
BrokerStatsManager(brokerConfig.getBrokerClusterName(),
brokerConfig.isEnableDetailStat());
- messageStore = new DefaultMessageStore(messageStoreConfig, manager,
new MyMessageArrivingListener(), new BrokerConfig());
+ messageStore = new DefaultMessageStore(messageStoreConfig, manager,
new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());
assertThat(messageStore.load()).isTrue();
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index e51132bbf2..73b0f42e58 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -200,16 +200,20 @@ public class DefaultMessageStore implements MessageStore {
private long stateMachineVersion = 0L;
+ // this is a unmodifiableMap
+ private ConcurrentMap<String, TopicConfig> topicConfigTable;
+
private final ScheduledExecutorService scheduledCleanQueueExecutorService =
Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig,
final BrokerStatsManager brokerStatsManager,
- final MessageArrivingListener messageArrivingListener, final
BrokerConfig brokerConfig) throws IOException {
+ final MessageArrivingListener messageArrivingListener, final
BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig>
topicConfigTable) throws IOException {
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
this.aliveReplicasNum = messageStoreConfig.getTotalReplicas();
this.brokerStatsManager = brokerStatsManager;
+ this.topicConfigTable = topicConfigTable;
this.allocateMappedFileService = new AllocateMappedFileService(this);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
@@ -2047,18 +2051,16 @@ public class DefaultMessageStore implements
MessageStore {
}
}
- @Override
public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
- return this.consumeQueueStore.getTopicConfigs();
+ return this.topicConfigTable;
}
- @Override
public Optional<TopicConfig> getTopicConfig(String topic) {
- return this.consumeQueueStore.getTopicConfig(topic);
- }
+ if (this.topicConfigTable == null) {
+ return Optional.empty();
+ }
- public void setTopicConfigTable(ConcurrentMap<String, TopicConfig>
topicConfigTable) {
- this.consumeQueueStore.setTopicConfigTable(topicConfigTable);
+ return Optional.ofNullable(this.topicConfigTable.get(topic));
}
public BrokerIdentity getBrokerIdentity() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index f77739fc47..a7da245551 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -25,15 +25,12 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
-import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -736,21 +733,6 @@ public interface MessageStore {
*/
void assignOffset(MessageExtBrokerInner msg, short messageNum);
- /**
- * get all topic config
- *
- * @return all topic config info
- */
- Map<String, TopicConfig> getTopicConfigs();
-
- /**
- * get topic config
- *
- * @param topic topic name
- * @return topic config info
- */
- Optional<TopicConfig> getTopicConfig(String topic);
-
/**
* Get master broker message store in process in broker container
*
diff --git
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index b6d73b81c7..3ba37df957 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -29,9 +29,9 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -53,7 +53,7 @@ public class CompactionStore {
private final String compactionPath;
private final String compactionLogPath;
private final String compactionCqPath;
- private final MessageStore defaultMessageStore;
+ private final DefaultMessageStore defaultMessageStore;
private final CompactionPositionMgr positionMgr;
private final ConcurrentHashMap<String, CompactionLog> compactionLogTable;
private final ScheduledExecutorService compactionSchedule;
@@ -65,7 +65,7 @@ public class CompactionStore {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- public CompactionStore(MessageStore defaultMessageStore) {
+ public CompactionStore(DefaultMessageStore defaultMessageStore) {
this.defaultMessageStore = defaultMessageStore;
this.compactionLogTable = new ConcurrentHashMap<>();
MessageStoreConfig config =
defaultMessageStore.getMessageStoreConfig();
diff --git
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 3f43adc12d..89c3e53b6b 100644
---
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -26,15 +26,12 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
-import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -595,16 +592,6 @@ public abstract class AbstractPluginMessageStore
implements MessageStore {
next.assignOffset(msg, messageNum);
}
- @Override
- public Map<String, TopicConfig> getTopicConfigs() {
- return next.getTopicConfigs();
- }
-
- @Override
- public Optional<TopicConfig> getTopicConfig(String topic) {
- return next.getTopicConfig(topic);
- }
-
@Override
public List<PutMessageHook> getPutMessageHookList() {
return next.getPutMessageHookList();
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 082e7bbb38..7d7878f129 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -64,20 +64,12 @@ public class ConsumeQueueStore {
protected final QueueOffsetAssigner queueOffsetAssigner = new
QueueOffsetAssigner();
protected final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/*
queueId */, ConsumeQueueInterface>> consumeQueueTable;
- // Should be careful, do not change the topic config
- // TopicConfigManager is more suitable here.
- private ConcurrentMap<String, TopicConfig> topicConfigTable;
-
public ConsumeQueueStore(DefaultMessageStore messageStore,
MessageStoreConfig messageStoreConfig) {
this.messageStore = messageStore;
this.messageStoreConfig = messageStoreConfig;
this.consumeQueueTable = new ConcurrentHashMap<>(32);
}
- public void setTopicConfigTable(ConcurrentMap<String, TopicConfig>
topicConfigTable) {
- this.topicConfigTable = topicConfigTable;
- }
-
private FileQueueLifeCycle getLifeCycle(String topic, int queueId) {
return findOrCreateConsumeQueue(topic, queueId);
}
@@ -173,9 +165,9 @@ public class ConsumeQueueStore {
}
private void queueTypeShouldBe(String topic, CQType cqTypeExpected) {
- TopicConfig topicConfig = this.topicConfigTable == null ? null :
this.topicConfigTable.get(topic);
+ Optional<TopicConfig> topicConfig =
this.messageStore.getTopicConfig(topic);
- CQType cqTypeActual =
QueueTypeUtils.getCQType(Optional.ofNullable(topicConfig));
+ CQType cqTypeActual = QueueTypeUtils.getCQType(topicConfig);
if (!Objects.equals(cqTypeExpected, cqTypeActual)) {
throw new RuntimeException(format("The queue type of topic: %s
should be %s, but is %s", topic, cqTypeExpected, cqTypeActual));
@@ -341,7 +333,7 @@ public class ConsumeQueueStore {
ConsumeQueueInterface newLogic;
- Optional<TopicConfig> topicConfig = this.getTopicConfig(topic);
+ Optional<TopicConfig> topicConfig =
this.messageStore.getTopicConfig(topic);
// TODO maybe the topic has been deleted.
if (Objects.equals(CQType.BatchCQ,
QueueTypeUtils.getCQType(topicConfig))) {
newLogic = new BatchConsumeQueue(
@@ -537,18 +529,6 @@ public class ConsumeQueueStore {
}
}
- public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
- return this.topicConfigTable;
- }
-
- public Optional<TopicConfig> getTopicConfig(String topic) {
- if (this.topicConfigTable == null) {
- return Optional.empty();
- }
-
- return Optional.ofNullable(this.topicConfigTable.get(topic));
- }
-
public long getTotalSize() {
long totalSize = 0;
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps :
this.consumeQueueTable.values()) {
diff --git
a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
index dc1af78b3f..87bfe85da2 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
@@ -55,7 +56,7 @@ public class AppendCallbackTest {
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") +
File.separator + "unitteststore");
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") +
File.separator + "unitteststore" + File.separator + "commitlog");
//too much reference
- DefaultMessageStore messageStore = new
DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig());
+ DefaultMessageStore messageStore = new
DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new
ConcurrentHashMap<>());
CommitLog commitLog = new CommitLog(messageStore);
callback = commitLog.new DefaultAppendMessageCallback();
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
index 8332f38c3c..43ca38eb48 100644
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
@@ -78,7 +79,7 @@ public class BatchPutMessageTest {
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") +
File.separator
+ "putmessagesteststore" + File.separator + "commitlog");
messageStoreConfig.setHaListenPort(0);
- return new DefaultMessageStore(messageStoreConfig, new
BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new
BrokerConfig());
+ return new DefaultMessageStore(messageStoreConfig, new
BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new
BrokerConfig(), new ConcurrentHashMap<>());
}
@Test
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index d80a6f25fa..2e08369bde 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -151,7 +152,7 @@ public class ConsumeQueueTest {
long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
}
}
- , brokerConfig);
+ , brokerConfig, new ConcurrentHashMap<>());
assertThat(master.load()).isTrue();
@@ -179,7 +180,7 @@ public class ConsumeQueueTest {
long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
}
}
- , brokerConfig);
+ , brokerConfig, new ConcurrentHashMap<>());
assertThat(master.load()).isTrue();
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
index 601d50c0f5..083aabc48b 100644
---
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.store;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
@@ -483,7 +484,7 @@ public class DefaultMessageStoreCleanFilesTest {
private void initMessageStore(MessageStoreConfig messageStoreConfig,
double diskSpaceCleanForciblyRatio) throws Exception {
messageStore = new DefaultMessageStore(messageStoreConfig,
- new BrokerStatsManager("test", true), new
MyMessageArrivingListener(), new BrokerConfig());
+ new BrokerStatsManager("test", true), new
MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());
cleanCommitLogService = getCleanCommitLogService();
cleanConsumeQueueService = getCleanConsumeQueueService();
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
index 7329098a38..515a4845a4 100644
---
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.store;
import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType;
@@ -74,7 +75,7 @@ public class DefaultMessageStoreShutDownTest {
String storeRootPath = System.getProperty("java.io.tmpdir") +
File.separator + "store";
messageStoreConfig.setStorePathRootDir(storeRootPath);
messageStoreConfig.setHaListenPort(0);
- return new DefaultMessageStore(messageStoreConfig, new
BrokerStatsManager("simpleTest", true), null, new BrokerConfig());
+ return new DefaultMessageStore(messageStoreConfig, new
BrokerStatsManager("simpleTest", true), null, new BrokerConfig(), new
ConcurrentHashMap<>());
}
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 4675157166..2f22de4d11 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -99,7 +99,7 @@ public class DefaultMessageStoreTest {
messageStoreConfig.setMaxIndexNum(100 * 10);
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") +
File.separator + "store");
messageStoreConfig.setHaListenPort(0);
- MessageStore master = new DefaultMessageStore(messageStoreConfig,
null, new MyMessageArrivingListener(), new BrokerConfig());
+ MessageStore master = new DefaultMessageStore(messageStoreConfig,
null, new MyMessageArrivingListener(), new BrokerConfig(), new
ConcurrentHashMap<>());
boolean load = master.load();
assertTrue(load);
@@ -144,7 +144,7 @@ public class DefaultMessageStoreTest {
return new DefaultMessageStore(messageStoreConfig,
new BrokerStatsManager("simpleTest", true),
new MyMessageArrivingListener(),
- new BrokerConfig());
+ new BrokerConfig(), new ConcurrentHashMap<>());
}
@Test
diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java
b/store/src/test/java/org/apache/rocketmq/store/HATest.java
index e1dc16bdc1..38a0435817 100644
--- a/store/src/test/java/org/apache/rocketmq/store/HATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java
@@ -27,6 +27,7 @@ import java.net.SocketAddress;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
@@ -250,7 +251,7 @@ public class HATest {
private MessageStore buildMessageStore(MessageStoreConfig
messageStoreConfig, long brokerId) throws Exception {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerId(brokerId);
- return new DefaultMessageStore(messageStoreConfig, brokerStatsManager,
null, brokerConfig);
+ return new DefaultMessageStore(messageStoreConfig, brokerStatsManager,
null, brokerConfig, new ConcurrentHashMap<>());
}
private void buildMessageStoreConfig(MessageStoreConfig
messageStoreConfig) {
diff --git
a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
index daa17eef88..3ae4b2be56 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageConst;
@@ -57,7 +58,7 @@ public class MultiDispatchTest {
messageStoreConfig.setEnableMultiDispatch(true);
BrokerConfig brokerConfig = new BrokerConfig();
//too much reference
- messageStore = new DefaultMessageStore(messageStoreConfig, null, null,
brokerConfig);
+ messageStore = new DefaultMessageStore(messageStoreConfig, null, null,
brokerConfig, new ConcurrentHashMap<>());
consumeQueue = new ConsumeQueue("xxx", 0,
getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()),
messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore);
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java
index ebeba5013a..5eb8320732 100644
---
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -107,7 +108,7 @@ public class DLedgerMultiPathTest extends
MessageStoreTestBase {
storeConfig.setdLegerSelfId(selfId);
DefaultMessageStore defaultMessageStore = new
DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitLogTest",
true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap,
properties) -> {
- }, new BrokerConfig());
+ }, new BrokerConfig(), new ConcurrentHashMap<>());
Assert.assertTrue(defaultMessageStore.load());
defaultMessageStore.start();
return defaultMessageStore;
diff --git
a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
index 3ae0cb64ec..a21806ffcf 100644
---
a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
+++
b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
@@ -21,6 +21,7 @@ import io.openmessaging.storage.dledger.DLedgerServer;
import java.io.File;
import java.net.UnknownHostException;
import java.util.Arrays;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
@@ -60,7 +61,7 @@ public class MessageStoreTestBase extends StoreTestBase {
storeConfig.setRecheckReputOffsetFromCq(true);
DefaultMessageStore defaultMessageStore = new
DefaultMessageStore(storeConfig, new
BrokerStatsManager("DLedgerCommitlogTest", true), (topic, queueId, logicOffset,
tagsCode, msgStoreTime, filterBitMap, properties) -> {
- }, new BrokerConfig());
+ }, new BrokerConfig(), new ConcurrentHashMap<>());
DLedgerServer dLegerServer = ((DLedgerCommitLog)
defaultMessageStore.getCommitLog()).getdLedgerServer();
if (leaderId != null) {
dLegerServer.getdLedgerConfig().setEnableLeaderElector(false);
@@ -109,7 +110,7 @@ public class MessageStoreTestBase extends StoreTestBase {
storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
DefaultMessageStore defaultMessageStore = new
DefaultMessageStore(storeConfig, new BrokerStatsManager("CommitlogTest",
true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap,
properties) -> {
- }, new BrokerConfig());
+ }, new BrokerConfig(), new ConcurrentHashMap<>());
if (createAbort) {
String fileName =
StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir());
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 3c4e7af8d5..6d105289f0 100644
---
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.store.ha.autoswitch;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
@@ -497,7 +498,7 @@ public class AutoSwitchHATest {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerId(brokerId);
brokerConfig.setEnableControllerMode(true);
- return new DefaultMessageStore(messageStoreConfig, brokerStatsManager,
null, brokerConfig);
+ return new DefaultMessageStore(messageStoreConfig, brokerStatsManager,
null, brokerConfig, new ConcurrentHashMap<>());
}
private void buildMessageStoreConfig(MessageStoreConfig
messageStoreConfig, int mappedFileSize) {
diff --git
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
index 10c2454afa..e3ac1b6bda 100644
---
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
@@ -27,6 +27,9 @@ import java.util.Objects;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -37,10 +40,10 @@ import
org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
-import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
@@ -55,11 +58,13 @@ import static org.awaitility.Awaitility.await;
public class BatchConsumeMessageTest extends QueueTestBase {
private static final int BATCH_NUM = 10;
private static final int TOTAL_MSGS = 200;
- private MessageStore messageStore;
+ private DefaultMessageStore messageStore;
+ private ConcurrentMap<String, TopicConfig> topicConfigTableMap;
@Before
public void init() throws Exception {
- messageStore = createMessageStore(null, true);
+ this.topicConfigTableMap = new ConcurrentHashMap<>();
+ messageStore = (DefaultMessageStore) createMessageStore(null, true,
this.topicConfigTableMap);
messageStore.load();
messageStore.start();
}
@@ -76,7 +81,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
@Test
public void testSendMessagesToCqTopic() {
String topic = UUID.randomUUID().toString();
- createTopic(topic, CQType.SimpleCQ, messageStore);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
createTopicConfigTable(topic, CQType.SimpleCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
// int batchNum = 10;
@@ -100,7 +106,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
@Test
public void testSendMessagesToBcqTopic() {
String topic = UUID.randomUUID().toString();
- createTopic(topic, CQType.BatchCQ, messageStore);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
createTopicConfigTable(topic, CQType.BatchCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
// case 1 has PROPERTY_INNER_NUM but has no INNER_BATCH_FLAG
// MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, 1);
@@ -123,7 +130,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
@Test
public void testConsumeBatchMessage() {
String topic = UUID.randomUUID().toString();
- createTopic(topic, CQType.BatchCQ, messageStore);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
createTopicConfigTable(topic, CQType.BatchCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
int batchNum = 10;
MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic,
batchNum);
@@ -153,7 +161,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
@Test
public void testNextBeginOffsetConsumeBatchMessage() {
String topic = UUID.randomUUID().toString();
- createTopic(topic, CQType.BatchCQ, messageStore);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
createTopicConfigTable(topic, CQType.BatchCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
Random random = new Random();
int putMessageCount = 1000;
@@ -191,7 +200,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
public void testGetOffsetInQueueByTime() throws Exception {
String topic = "testGetOffsetInQueueByTime";
- createTopic(topic, CQType.BatchCQ, messageStore);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
createTopicConfigTable(topic, CQType.BatchCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
Assert.assertTrue(QueueTypeUtils.isBatchCq(messageStore.getTopicConfig(topic)));
// The initial min max offset, before and after the creation of
consume queue
@@ -231,7 +241,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
@Test
public void testDispatchNormalConsumeQueue() throws Exception {
String topic = "TestDispatchBuildConsumeQueue";
- createTopic(topic, CQType.SimpleCQ, messageStore);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
createTopicConfigTable(topic, CQType.SimpleCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
long timeStart = -1;
long timeMid = -1;
@@ -291,7 +302,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
long timeStart = -1;
long timeMid = -1;
- createTopic(topic, CQType.BatchCQ, messageStore);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
createTopicConfigTable(topic, CQType.BatchCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
for (int i = 0; i < 100; i++) {
PutMessageResult putMessageResult =
messageStore.putMessage(buildMessage(topic, batchNum));
@@ -349,7 +361,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
public void testGetBatchMessageWithinNumber() {
String topic = UUID.randomUUID().toString();
- createTopic(topic, CQType.BatchCQ, messageStore);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
createTopicConfigTable(topic, CQType.BatchCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
int batchNum = 20;
for (int i = 0; i < 200; i++) {
@@ -409,7 +422,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
@Test
public void testGetBatchMessageWithinSize() {
String topic = UUID.randomUUID().toString();
- createTopic(topic, CQType.BatchCQ, messageStore);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
createTopicConfigTable(topic, CQType.BatchCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
int batchNum = 10;
for (int i = 0; i < 100; i++) {
@@ -463,7 +477,8 @@ public class BatchConsumeMessageTest extends QueueTestBase {
}
protected void putMsg(String topic) {
- createTopic(topic, CQType.BatchCQ, messageStore);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
createTopicConfigTable(topic, CQType.BatchCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
for (int i = 0; i < TOTAL_MSGS; i++) {
MessageExtBrokerInner message = buildMessage(topic, BATCH_NUM * (i
% 2 + 1));
diff --git
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
index c0a9c4276f..c6525bd836 100644
---
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.store.queue;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -299,7 +300,7 @@ public class BatchConsumeQueueTest extends StoreTestBase {
new BrokerStatsManager("simpleTest", true),
(topic, queueId, logicOffset, tagsCode, msgStoreTime,
filterBitMap, properties) -> {
},
- new BrokerConfig());
+ new BrokerConfig(), new ConcurrentHashMap<>());
}
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java
index a8379fcf02..59e1d08791 100644
---
a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java
@@ -16,9 +16,11 @@
*/
package org.apache.rocketmq.store.queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CQType;
-import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
@@ -36,10 +38,14 @@ import static org.junit.Assert.assertEquals;
public class ConsumeQueueStoreTest extends QueueTestBase {
private MessageStore messageStore;
+ private ConcurrentMap<String, TopicConfig> topicConfigTableMap;
+
+
@Before
public void init() throws Exception {
- messageStore = createMessageStore(null, true);
+ this.topicConfigTableMap = new ConcurrentHashMap<>();
+ messageStore = createMessageStore(null, true, topicConfigTableMap);
messageStore.load();
messageStore.start();
}
@@ -56,7 +62,8 @@ public class ConsumeQueueStoreTest extends QueueTestBase {
@Test
public void testLoadConsumeQueuesWithWrongAttribute() {
String normalTopic = UUID.randomUUID().toString();
- createTopic(normalTopic, CQType.SimpleCQ, messageStore);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
createTopicConfigTable(normalTopic, CQType.SimpleCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
for (int i = 0; i < 10; i++) {
PutMessageResult putMessageResult =
messageStore.putMessage(buildMessage(normalTopic, -1));
@@ -66,10 +73,10 @@ public class ConsumeQueueStoreTest extends QueueTestBase {
await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
// simulate delete topic but with files left.
- ((DefaultMessageStore)messageStore).setTopicConfigTable(null);
+ this.topicConfigTableMap.clear();
- createTopic(normalTopic, CQType.BatchCQ, messageStore);
- messageStore.shutdown();
+ topicConfigTable = createTopicConfigTable(normalTopic, CQType.BatchCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
RuntimeException runtimeException =
Assert.assertThrows(RuntimeException.class, () ->
messageStore.getQueueStore().load());
Assert.assertTrue(runtimeException.getMessage().endsWith("should be
SimpleCQ, but is BatchCQ"));
@@ -78,7 +85,8 @@ public class ConsumeQueueStoreTest extends QueueTestBase {
@Test
public void testLoadBatchConsumeQueuesWithWrongAttribute() {
String batchTopic = UUID.randomUUID().toString();
- createTopic(batchTopic, CQType.BatchCQ, messageStore);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
createTopicConfigTable(batchTopic, CQType.BatchCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
for (int i = 0; i < 10; i++) {
PutMessageResult putMessageResult =
messageStore.putMessage(buildMessage(batchTopic, 10));
@@ -88,9 +96,10 @@ public class ConsumeQueueStoreTest extends QueueTestBase {
await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
// simulate delete topic but with files left.
- ((DefaultMessageStore)messageStore).setTopicConfigTable(null);
+ this.topicConfigTableMap.clear();
- createTopic(batchTopic, CQType.SimpleCQ, messageStore);
+ topicConfigTable = createTopicConfigTable(batchTopic, CQType.SimpleCQ);
+ this.topicConfigTableMap.putAll(topicConfigTable);
messageStore.shutdown();
RuntimeException runtimeException =
Assert.assertThrows(RuntimeException.class, () ->
messageStore.getQueueStore().load());
diff --git
a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
index 6a8bfc5bc6..c3c8be52dd 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
@@ -20,6 +20,7 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CQType;
@@ -74,7 +75,7 @@ public class ConsumeQueueTest extends QueueTestBase {
DefaultMessageStore master = new DefaultMessageStore(
messageStoreConfig, new BrokerStatsManager(brokerConfig),
(topic, queueId, logicOffset, tagsCode, msgStoreTime,
filterBitMap, properties) -> {
- }, brokerConfig);
+ }, brokerConfig, new ConcurrentHashMap<>());
assertThat(master.load()).isTrue();
@@ -112,7 +113,7 @@ public class ConsumeQueueTest extends QueueTestBase {
public void testIterator() throws Exception {
final int msgNum = 100;
final int msgSize = 1000;
- MessageStore messageStore = createMessageStore(null, true);
+ MessageStore messageStore = createMessageStore(null, true, null);
messageStore.load();
String topic = UUID.randomUUID().toString();
//The initial min max offset, before and after the creation of consume
queue
diff --git
a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
index a1e1cc1f5c..81dc158db5 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
@@ -41,7 +41,7 @@ import java.util.concurrent.ConcurrentMap;
public class QueueTestBase extends StoreTestBase {
- protected void createTopic(String topic, CQType cqType, MessageStore
messageStore) {
+ protected ConcurrentMap<String, TopicConfig> createTopicConfigTable(String
topic, CQType cqType) {
ConcurrentMap<String, TopicConfig> topicConfigTable = new
ConcurrentHashMap<>();
TopicConfig topicConfigToBeAdded = new TopicConfig();
@@ -51,14 +51,14 @@ public class QueueTestBase extends StoreTestBase {
topicConfigToBeAdded.setAttributes(attributes);
topicConfigTable.put(topic, topicConfigToBeAdded);
- ((DefaultMessageStore)
messageStore).setTopicConfigTable(topicConfigTable);
+ return topicConfigTable;
}
protected Callable<Boolean> fullyDispatched(MessageStore messageStore) {
return () -> messageStore.dispatchBehindBytes() == 0;
}
- protected MessageStore createMessageStore(String baseDir, boolean extent)
throws Exception {
+ protected MessageStore createMessageStore(String baseDir, boolean extent,
ConcurrentMap<String, TopicConfig> topicConfigTable) throws Exception {
if (baseDir == null) {
baseDir = createBaseDir();
}
@@ -86,7 +86,7 @@ public class QueueTestBase extends StoreTestBase {
new BrokerStatsManager("simpleTest", true),
(topic, queueId, logicOffset, tagsCode, msgStoreTime,
filterBitMap, properties) -> {
},
- new BrokerConfig());
+ new BrokerConfig(), topicConfigTable);
}
public MessageExtBrokerInner buildMessage(String topic, int batchNum) {
diff --git
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index 7ace2d9fe6..63ec97cdb0 100644
---
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
@@ -99,7 +100,7 @@ public class TimerMessageStoreTest {
storeConfig.setTimerInterceptDelayLevel(true);
storeConfig.setTimerPrecisionMs(precisionMs);
- messageStore = new DefaultMessageStore(storeConfig, new
BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new
BrokerConfig());
+ messageStore = new DefaultMessageStore(storeConfig, new
BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new
BrokerConfig(), new ConcurrentHashMap<>());
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();