This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new dd62ed0f3b [ISSUE #8892] Add test cases to config manager v2 (#8873)
dd62ed0f3b is described below
commit dd62ed0f3b16919adec5d5eece21a1050dc9c5a0
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Oct 29 20:07:49 2024 +0800
[ISSUE #8892] Add test cases to config manager v2 (#8873)
* fix: add unit test for TopicConfigManagerV2
Signed-off-by: Li Zhanhui <[email protected]>
* fix: add unit test cases for config manager v2
Signed-off-by: Li Zhanhui <[email protected]>
* chore: add copyright header
Signed-off-by: Li Zhanhui <[email protected]>
---------
Signed-off-by: Li Zhanhui <[email protected]>
---
.../rocketmq/broker/config/v2/ConfigStorage.java | 14 +-
.../broker/config/v2/ConsumerOffsetManagerV2.java | 2 +
.../config/v2/SubscriptionGroupManagerV2.java | 2 +
.../config/v2/ConsumerOffsetManagerV2Test.java | 193 +++++++++++++++++++++
.../config/v2/SubscriptionGroupManagerV2Test.java | 141 +++++++++++++++
.../broker/config/v2/TopicConfigManagerV2Test.java | 123 +++++++++++++
6 files changed, 471 insertions(+), 4 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
index af259aaa37..a31b573daa 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
@@ -27,11 +27,11 @@ import
org.apache.rocketmq.common.config.AbstractRocksDBStorage;
import org.apache.rocketmq.common.config.ConfigHelper;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DirectSlice;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
@@ -112,10 +112,16 @@ public class ConfigStorage extends AbstractRocksDBStorage
{
readOptions.setTotalOrderSeek(true);
readOptions.setTailing(false);
readOptions.setAutoPrefixMode(true);
- readOptions.setIterateLowerBound(new DirectSlice(beginKey));
- readOptions.setIterateUpperBound(new DirectSlice(endKey));
+ // Use DirectSlice till the follow issue is fixed:
+ // https://github.com/facebook/rocksdb/issues/13098
+ //
+ // readOptions.setIterateUpperBound(new DirectSlice(endKey));
+ byte[] buf = new byte[endKey.remaining()];
+ endKey.slice().get(buf);
+ readOptions.setIterateUpperBound(new Slice(buf));
+
RocksIterator iterator = db.newIterator(defaultCFHandle,
readOptions);
- iterator.seekToFirst();
+ iterator.seek(beginKey.slice());
return iterator;
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
index 5b0885c491..2c5d3677d8 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
@@ -390,10 +390,12 @@ public class ConsumerOffsetManagerV2 extends
ConsumerOffsetManager {
ByteBuf keyBuf = keyOfPullOffset(group, topic, queueId);
ByteBuf valueBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(8);
+ valueBuf.writeLong(offset);
try (WriteBatch writeBatch = new WriteBatch()) {
writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ configStorage.write(writeBatch);
} catch (RocksDBException e) {
LOG.error("Failed to commit pull offset. group={}, topic={},
queueId={}, offset={}",
group, topic, queueId, offset);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
index 8da6f9d2bc..f535fa195a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
@@ -74,6 +74,7 @@ public class SubscriptionGroupManagerV2 extends
SubscriptionGroupManager {
if (null != subscriptionGroupConfig) {
super.updateSubscriptionGroupConfigWithoutPersist(subscriptionGroupConfig);
}
+ iterator.next();
}
} finally {
beginKey.release();
@@ -163,6 +164,7 @@ public class SubscriptionGroupManagerV2 extends
SubscriptionGroupManager {
writeBatch.delete(ConfigHelper.readBytes(keyBuf));
long stateMachineVersion =
brokerController.getMessageStore().getStateMachineVersion();
ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ configStorage.write(writeBatch);
} catch (RocksDBException e) {
log.error("Failed to remove subscription group config by
group-name={}", groupName, e);
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
new file mode 100644
index 0000000000..d7f46855e1
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.config.v2;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConsumerOffsetManagerV2Test {
+
+ private ConfigStorage configStorage;
+
+ private ConsumerOffsetManagerV2 consumerOffsetManagerV2;
+
+ @Mock
+ private BrokerController controller;
+
+ @Rule
+ public TemporaryFolder tf = new TemporaryFolder();
+
+ @After
+ public void cleanUp() {
+ if (null != configStorage) {
+ configStorage.shutdown();
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
+
+ File configStoreDir = tf.newFolder();
+ configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
+ configStorage.start();
+ consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller,
configStorage);
+ }
+
+ /**
+ * Verify consumer offset can survive restarts
+ */
+ @Test
+ public void testCommitOffset_Standard() {
+ Assert.assertTrue(consumerOffsetManagerV2.load());
+
+ String clientHost = "localhost";
+ String topic = "T1";
+ String group = "G0";
+ int queueId = 1;
+ long queueOffset = 100;
+ consumerOffsetManagerV2.commitOffset(clientHost, group, topic,
queueId, queueOffset);
+ Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
+
+ configStorage.shutdown();
+ consumerOffsetManagerV2.getOffsetTable().clear();
+ Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group,
topic, queueId));
+
+ configStorage.start();
+ consumerOffsetManagerV2.load();
+ Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
+ }
+
+ /**
+ * Verify commit offset can survive config store restart
+ */
+ @Test
+ public void testCommitOffset_LMQ() {
+ Assert.assertTrue(consumerOffsetManagerV2.load());
+
+ String clientHost = "localhost";
+ String topic = MixAll.LMQ_PREFIX + "T1";
+ String group = "G0";
+ int queueId = 1;
+ long queueOffset = 100;
+ consumerOffsetManagerV2.commitOffset(clientHost, group, topic,
queueId, queueOffset);
+ Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
+
+ configStorage.shutdown();
+
+ configStorage.start();
+ consumerOffsetManagerV2.load();
+ Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
+ }
+
+
+ /**
+ * Verify commit offset can survive config store restart
+ */
+ @Test
+ public void testCommitPullOffset_LMQ() {
+ Assert.assertTrue(consumerOffsetManagerV2.load());
+
+ String clientHost = "localhost";
+ String topic = MixAll.LMQ_PREFIX + "T1";
+ String group = "G0";
+ int queueId = 1;
+ long queueOffset = 100;
+ consumerOffsetManagerV2.commitPullOffset(clientHost, group, topic,
queueId, queueOffset);
+ Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryPullOffset(group, topic, queueId));
+
+ configStorage.shutdown();
+
+ configStorage.start();
+ consumerOffsetManagerV2.load();
+ Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryPullOffset(group, topic, queueId));
+ }
+
+ /**
+ * Verify commit offset can survive config store restart
+ */
+ @Test
+ public void testRemoveByTopicAtGroup() {
+ Assert.assertTrue(consumerOffsetManagerV2.load());
+
+ String clientHost = "localhost";
+ String topic = MixAll.LMQ_PREFIX + "T1";
+ String topic2 = MixAll.LMQ_PREFIX + "T2";
+ String group = "G0";
+ int queueId = 1;
+ long queueOffset = 100;
+ consumerOffsetManagerV2.commitOffset(clientHost, group, topic,
queueId, queueOffset);
+ consumerOffsetManagerV2.commitOffset(clientHost, group, topic2,
queueId, queueOffset);
+ Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
+ Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic2, queueId));
+
+ consumerOffsetManagerV2.removeConsumerOffset(topic +
ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + group);
+ Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group,
topic, queueId));
+ Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic2, queueId));
+
+ configStorage.shutdown();
+ configStorage.start();
+ consumerOffsetManagerV2.load();
+ Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group,
topic, queueId));
+ Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic2, queueId));
+ }
+
+ /**
+ * Verify commit offset can survive config store restart
+ */
+ @Test
+ public void testRemoveByGroup() {
+ Assert.assertTrue(consumerOffsetManagerV2.load());
+
+ String clientHost = "localhost";
+ String topic = MixAll.LMQ_PREFIX + "T1";
+ String topic2 = MixAll.LMQ_PREFIX + "T2";
+ String group = "G0";
+ int queueId = 1;
+ long queueOffset = 100;
+ consumerOffsetManagerV2.commitOffset(clientHost, group, topic,
queueId, queueOffset);
+ consumerOffsetManagerV2.commitOffset(clientHost, group, topic2,
queueId, queueOffset);
+ Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
+ consumerOffsetManagerV2.removeOffset(group);
+ Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group,
topic, queueId));
+ Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group,
topic2, queueId));
+
+ configStorage.shutdown();
+ configStorage.start();
+ consumerOffsetManagerV2.load();
+ Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group,
topic, queueId));
+ Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group,
topic2, queueId));
+ }
+
+}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
new file mode 100644
index 0000000000..6d436a7c4d
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.config.v2;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy;
+import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.MessageStore;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SubscriptionGroupManagerV2Test {
+ private ConfigStorage configStorage;
+
+ private SubscriptionGroupManagerV2 subscriptionGroupManagerV2;
+
+ @Mock
+ private BrokerController controller;
+
+ @Mock
+ private MessageStore messageStore;
+
+ @Rule
+ public TemporaryFolder tf = new TemporaryFolder();
+
+ @After
+ public void cleanUp() {
+ if (null != configStorage) {
+ configStorage.shutdown();
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setAutoCreateSubscriptionGroup(false);
+ Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
+
+ Mockito.doReturn(messageStore).when(controller).getMessageStore();
+ Mockito.doReturn(1L).when(messageStore).getStateMachineVersion();
+
+ File configStoreDir = tf.newFolder();
+ configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
+ configStorage.start();
+ subscriptionGroupManagerV2 = new
SubscriptionGroupManagerV2(controller, configStorage);
+ }
+
+
+ @Test
+ public void testUpdateSubscriptionGroupConfig() {
+ SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName("G1");
+ subscriptionGroupConfig.setConsumeEnable(true);
+ subscriptionGroupConfig.setRetryMaxTimes(16);
+ subscriptionGroupConfig.setGroupSysFlag(1);
+ GroupRetryPolicy retryPolicy = new GroupRetryPolicy();
+ retryPolicy.setType(GroupRetryPolicyType.EXPONENTIAL);
+ subscriptionGroupConfig.setGroupRetryPolicy(retryPolicy);
+ subscriptionGroupConfig.setBrokerId(1);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ subscriptionGroupConfig.setConsumeMessageOrderly(true);
+ subscriptionGroupConfig.setConsumeTimeoutMinute(30);
+ subscriptionGroupConfig.setConsumeFromMinEnable(true);
+ subscriptionGroupConfig.setWhichBrokerWhenConsumeSlowly(1);
+ subscriptionGroupConfig.setNotifyConsumerIdsChangedEnable(true);
+
subscriptionGroupManagerV2.updateSubscriptionGroupConfig(subscriptionGroupConfig);
+
+ SubscriptionGroupConfig found =
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
+ Assert.assertEquals(subscriptionGroupConfig, found);
+
+ subscriptionGroupManagerV2.getSubscriptionGroupTable().clear();
+ configStorage.shutdown();
+ configStorage.start();
+ subscriptionGroupManagerV2.load();
+ found =
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
+ Assert.assertEquals(subscriptionGroupConfig, found);
+ }
+
+
+ @Test
+ public void testDeleteSubscriptionGroupConfig() {
+ SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName("G1");
+ subscriptionGroupConfig.setConsumeEnable(true);
+ subscriptionGroupConfig.setRetryMaxTimes(16);
+ subscriptionGroupConfig.setGroupSysFlag(1);
+ GroupRetryPolicy retryPolicy = new GroupRetryPolicy();
+ retryPolicy.setType(GroupRetryPolicyType.EXPONENTIAL);
+ subscriptionGroupConfig.setGroupRetryPolicy(retryPolicy);
+ subscriptionGroupConfig.setBrokerId(1);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ subscriptionGroupConfig.setConsumeMessageOrderly(true);
+ subscriptionGroupConfig.setConsumeTimeoutMinute(30);
+ subscriptionGroupConfig.setConsumeFromMinEnable(true);
+ subscriptionGroupConfig.setWhichBrokerWhenConsumeSlowly(1);
+ subscriptionGroupConfig.setNotifyConsumerIdsChangedEnable(true);
+
subscriptionGroupManagerV2.updateSubscriptionGroupConfig(subscriptionGroupConfig);
+
+ SubscriptionGroupConfig found =
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
+ Assert.assertEquals(subscriptionGroupConfig, found);
+
subscriptionGroupManagerV2.removeSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
+
+ found =
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
+ Assert.assertNull(found);
+
+ configStorage.shutdown();
+ configStorage.start();
+ subscriptionGroupManagerV2.load();
+ found =
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
+ Assert.assertNull(found);
+ }
+
+}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
new file mode 100644
index 0000000000..92c936b110
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.config.v2;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+
+@RunWith(value = MockitoJUnitRunner.class)
+public class TopicConfigManagerV2Test {
+
+ private ConfigStorage configStorage;
+
+ private TopicConfigManagerV2 topicConfigManagerV2;
+
+ @Mock
+ private BrokerController controller;
+
+ @Rule
+ public TemporaryFolder tf = new TemporaryFolder();
+
+ @After
+ public void cleanUp() {
+ if (null != configStorage) {
+ configStorage.shutdown();
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
+
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+
Mockito.doReturn(messageStoreConfig).when(controller).getMessageStoreConfig();
+
+ File configStoreDir = tf.newFolder();
+ configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
+ configStorage.start();
+ topicConfigManagerV2 = new TopicConfigManagerV2(controller,
configStorage);
+ }
+
+ @Test
+ public void testUpdateTopicConfig() {
+ TopicConfig topicConfig = new TopicConfig();
+ String topicName = "T1";
+ topicConfig.setTopicName(topicName);
+ topicConfig.setPerm(6);
+ topicConfig.setReadQueueNums(8);
+ topicConfig.setWriteQueueNums(4);
+ topicConfig.setOrder(true);
+ topicConfig.setTopicSysFlag(4);
+ topicConfigManagerV2.updateTopicConfig(topicConfig);
+
+ Assert.assertTrue(configStorage.shutdown());
+
+ topicConfigManagerV2.getTopicConfigTable().clear();
+
+ Assert.assertTrue(configStorage.start());
+ Assert.assertTrue(topicConfigManagerV2.load());
+
+ TopicConfig loaded = topicConfigManagerV2.selectTopicConfig(topicName);
+ Assert.assertNotNull(loaded);
+ Assert.assertEquals(topicName, loaded.getTopicName());
+ Assert.assertEquals(6, loaded.getPerm());
+ Assert.assertEquals(8, loaded.getReadQueueNums());
+ Assert.assertEquals(4, loaded.getWriteQueueNums());
+ Assert.assertTrue(loaded.isOrder());
+ Assert.assertEquals(4, loaded.getTopicSysFlag());
+
+ Assert.assertTrue(topicConfigManagerV2.containsTopic(topicName));
+ }
+
+ @Test
+ public void testRemoveTopicConfig() {
+ TopicConfig topicConfig = new TopicConfig();
+ String topicName = "T1";
+ topicConfig.setTopicName(topicName);
+ topicConfig.setPerm(6);
+ topicConfig.setReadQueueNums(8);
+ topicConfig.setWriteQueueNums(4);
+ topicConfig.setOrder(true);
+ topicConfig.setTopicSysFlag(4);
+ topicConfigManagerV2.updateTopicConfig(topicConfig);
+ topicConfigManagerV2.removeTopicConfig(topicName);
+ Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName));
+ Assert.assertTrue(configStorage.shutdown());
+
+ Assert.assertTrue(configStorage.start());
+ Assert.assertTrue(topicConfigManagerV2.load());
+ Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName));
+ }
+}