This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9b3b551a0 [ISSUE #5823] Fix delete topic in message store interface 
(#5828)
9b3b551a0 is described below

commit 9b3b551a0f5059a6f6b174dccf0d6a9befe1df18
Author: lizhimins <[email protected]>
AuthorDate: Thu Jan 5 18:39:43 2023 +0800

    [ISSUE #5823] Fix delete topic in message store interface (#5828)
    
    * [ISSUE #5823] Fix delete topic in message store interface
    
    * [ISSUE #5823] Add delete topic test
    
    Co-authored-by: 斜阳 <[email protected]>
---
 .../apache/rocketmq/store/DefaultMessageStore.java |  5 ++-
 .../rocketmq/store/DefaultMessageStoreTest.java    | 49 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 1 deletion(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 2b829637a..93d006245 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1322,6 +1322,9 @@ public class DefaultMessageStore implements MessageStore {
                 this.consumeQueueStore.removeTopicQueueTable(cq.getTopic(), 
cq.getQueueId());
             }
 
+            // remove topic from cq table
+            this.consumeQueueStore.getConsumeQueueTable().remove(topic);
+
             if (this.brokerConfig.isAutoDeleteUnusedStats()) {
                 this.brokerStatsManager.onTopicDeleted(topic);
             }
@@ -1348,7 +1351,7 @@ public class DefaultMessageStore implements MessageStore {
     public int cleanUnusedTopic(final Set<String> retainTopics) {
         Set<String> consumeQueueTopicSet = 
this.getConsumeQueueTable().keySet();
         int deleteCount = 0;
-        for (String topicName : Sets.difference(retainTopics, 
consumeQueueTopicSet)) {
+        for (String topicName : Sets.difference(consumeQueueTopicSet, 
retainTopics)) {
             if (retainTopics.contains(topicName) ||
                 TopicValidator.isSystemTopic(topicName) ||
                 MixAll.isLmq(topicName)) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index f0939ec8b..cb63f589a 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.store;
 
+import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.RandomAccessFile;
 import java.lang.reflect.InvocationTargetException;
@@ -30,10 +31,13 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.OverlappingFileLockException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -52,6 +56,7 @@ import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.assertj.core.util.Strings;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -867,6 +872,50 @@ public class DefaultMessageStoreTest {
         messageStoreConfig.setMaxMessageSize(originMaxMessageSize);
     }
 
+    @Test
+    public void testDeleteTopics() {
+        MessageStoreConfig messageStoreConfig = 
messageStore.getMessageStoreConfig();
+        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> 
consumeQueueTable =
+            ((DefaultMessageStore) messageStore).getConsumeQueueTable();
+        for (int i = 0; i < 10; i++) {
+            ConcurrentMap<Integer, ConsumeQueueInterface> cqTable = new 
ConcurrentHashMap<>();
+            String topicName = "topic-" + i;
+            for (int j = 0; j < 4; j++) {
+                ConsumeQueue consumeQueue = new ConsumeQueue(topicName, j, 
messageStoreConfig.getStorePathRootDir(),
+                    messageStoreConfig.getMappedFileSizeConsumeQueue(), 
messageStore);
+                cqTable.put(j, consumeQueue);
+            }
+            consumeQueueTable.put(topicName, cqTable);
+        }
+        Assert.assertEquals(consumeQueueTable.size(), 10);
+        HashSet<String> resultSet = Sets.newHashSet("topic-3", "topic-5");
+        messageStore.deleteTopics(Sets.difference(consumeQueueTable.keySet(), 
resultSet));
+        Assert.assertEquals(consumeQueueTable.size(), 2);
+        Assert.assertEquals(resultSet, consumeQueueTable.keySet());
+    }
+
+    @Test
+    public void testCleanUnusedTopic() {
+        MessageStoreConfig messageStoreConfig = 
messageStore.getMessageStoreConfig();
+        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> 
consumeQueueTable =
+            ((DefaultMessageStore) messageStore).getConsumeQueueTable();
+        for (int i = 0; i < 10; i++) {
+            ConcurrentMap<Integer, ConsumeQueueInterface> cqTable = new 
ConcurrentHashMap<>();
+            String topicName = "topic-" + i;
+            for (int j = 0; j < 4; j++) {
+                ConsumeQueue consumeQueue = new ConsumeQueue(topicName, j, 
messageStoreConfig.getStorePathRootDir(),
+                    messageStoreConfig.getMappedFileSizeConsumeQueue(), 
messageStore);
+                cqTable.put(j, consumeQueue);
+            }
+            consumeQueueTable.put(topicName, cqTable);
+        }
+        Assert.assertEquals(consumeQueueTable.size(), 10);
+        HashSet<String> resultSet = Sets.newHashSet("topic-3", "topic-5");
+        messageStore.cleanUnusedTopic(resultSet);
+        Assert.assertEquals(consumeQueueTable.size(), 2);
+        Assert.assertEquals(resultSet, consumeQueueTable.keySet());
+    }
+
     private class MyMessageArrivingListener implements MessageArrivingListener 
{
         @Override
         public void arriving(String topic, int queueId, long logicOffset, long 
tagsCode, long msgStoreTime,

Reply via email to