This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9b3b551a0 [ISSUE #5823] Fix delete topic in message store interface
(#5828)
9b3b551a0 is described below
commit 9b3b551a0f5059a6f6b174dccf0d6a9befe1df18
Author: lizhimins <[email protected]>
AuthorDate: Thu Jan 5 18:39:43 2023 +0800
[ISSUE #5823] Fix delete topic in message store interface (#5828)
* [ISSUE #5823] Fix delete topic in message store interface
* [ISSUE #5823] Add delete topic test
Co-authored-by: 斜阳 <[email protected]>
---
.../apache/rocketmq/store/DefaultMessageStore.java | 5 ++-
.../rocketmq/store/DefaultMessageStoreTest.java | 49 ++++++++++++++++++++++
2 files changed, 53 insertions(+), 1 deletion(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 2b829637a..93d006245 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1322,6 +1322,9 @@ public class DefaultMessageStore implements MessageStore {
this.consumeQueueStore.removeTopicQueueTable(cq.getTopic(),
cq.getQueueId());
}
+ // remove topic from cq table
+ this.consumeQueueStore.getConsumeQueueTable().remove(topic);
+
if (this.brokerConfig.isAutoDeleteUnusedStats()) {
this.brokerStatsManager.onTopicDeleted(topic);
}
@@ -1348,7 +1351,7 @@ public class DefaultMessageStore implements MessageStore {
public int cleanUnusedTopic(final Set<String> retainTopics) {
Set<String> consumeQueueTopicSet =
this.getConsumeQueueTable().keySet();
int deleteCount = 0;
- for (String topicName : Sets.difference(retainTopics,
consumeQueueTopicSet)) {
+ for (String topicName : Sets.difference(consumeQueueTopicSet,
retainTopics)) {
if (retainTopics.contains(topicName) ||
TopicValidator.isSystemTopic(topicName) ||
MixAll.isLmq(topicName)) {
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index f0939ec8b..cb63f589a 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.store;
+import com.google.common.collect.Sets;
import java.io.File;
import java.io.RandomAccessFile;
import java.lang.reflect.InvocationTargetException;
@@ -30,10 +31,13 @@ import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
@@ -52,6 +56,7 @@ import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.assertj.core.util.Strings;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -867,6 +872,50 @@ public class DefaultMessageStoreTest {
messageStoreConfig.setMaxMessageSize(originMaxMessageSize);
}
+ @Test
+ public void testDeleteTopics() {
+ MessageStoreConfig messageStoreConfig =
messageStore.getMessageStoreConfig();
+ ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>>
consumeQueueTable =
+ ((DefaultMessageStore) messageStore).getConsumeQueueTable();
+ for (int i = 0; i < 10; i++) {
+ ConcurrentMap<Integer, ConsumeQueueInterface> cqTable = new
ConcurrentHashMap<>();
+ String topicName = "topic-" + i;
+ for (int j = 0; j < 4; j++) {
+ ConsumeQueue consumeQueue = new ConsumeQueue(topicName, j,
messageStoreConfig.getStorePathRootDir(),
+ messageStoreConfig.getMappedFileSizeConsumeQueue(),
messageStore);
+ cqTable.put(j, consumeQueue);
+ }
+ consumeQueueTable.put(topicName, cqTable);
+ }
+ Assert.assertEquals(consumeQueueTable.size(), 10);
+ HashSet<String> resultSet = Sets.newHashSet("topic-3", "topic-5");
+ messageStore.deleteTopics(Sets.difference(consumeQueueTable.keySet(),
resultSet));
+ Assert.assertEquals(consumeQueueTable.size(), 2);
+ Assert.assertEquals(resultSet, consumeQueueTable.keySet());
+ }
+
+ @Test
+ public void testCleanUnusedTopic() {
+ MessageStoreConfig messageStoreConfig =
messageStore.getMessageStoreConfig();
+ ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>>
consumeQueueTable =
+ ((DefaultMessageStore) messageStore).getConsumeQueueTable();
+ for (int i = 0; i < 10; i++) {
+ ConcurrentMap<Integer, ConsumeQueueInterface> cqTable = new
ConcurrentHashMap<>();
+ String topicName = "topic-" + i;
+ for (int j = 0; j < 4; j++) {
+ ConsumeQueue consumeQueue = new ConsumeQueue(topicName, j,
messageStoreConfig.getStorePathRootDir(),
+ messageStoreConfig.getMappedFileSizeConsumeQueue(),
messageStore);
+ cqTable.put(j, consumeQueue);
+ }
+ consumeQueueTable.put(topicName, cqTable);
+ }
+ Assert.assertEquals(consumeQueueTable.size(), 10);
+ HashSet<String> resultSet = Sets.newHashSet("topic-3", "topic-5");
+ messageStore.cleanUnusedTopic(resultSet);
+ Assert.assertEquals(consumeQueueTable.size(), 2);
+ Assert.assertEquals(resultSet, consumeQueueTable.keySet());
+ }
+
private class MyMessageArrivingListener implements MessageArrivingListener
{
@Override
public void arriving(String topic, int queueId, long logicOffset, long
tagsCode, long msgStoreTime,