This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 83c47dddc [ISSUE #5823] Add delete topic in message store interface 
(#5824)
83c47dddc is described below

commit 83c47dddc98f7a74b3ef956a85e0343fc8ab6725
Author: lizhimins <[email protected]>
AuthorDate: Thu Jan 5 15:28:46 2023 +0800

    [ISSUE #5823] Add delete topic in message store interface (#5824)
    
    Co-authored-by: 斜阳 <[email protected]>
---
 .../broker/processor/AdminBrokerProcessor.java     |  4 +-
 .../apache/rocketmq/store/DefaultMessageStore.java | 79 +++++++++++++++-------
 .../org/apache/rocketmq/store/MessageStore.java    | 15 +++-
 .../store/plugin/AbstractPluginMessageStore.java   |  9 ++-
 4 files changed, 77 insertions(+), 30 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 24162022c..1723923d3 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.processor;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.io.UnsupportedEncodingException;
@@ -524,8 +525,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         
this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic());
         
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic());
         
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic());
-        this.brokerController.getMessageStore()
-            
.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
+        
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic()));
         if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) 
{
             
this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic());
         }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 3cf8efdfa..2b829637a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.store;
 
+import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
 import io.openmessaging.storage.dledger.entry.DLedgerEntry;
 import io.opentelemetry.api.common.AttributesBuilder;
@@ -35,11 +36,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -1295,35 +1294,69 @@ public class DefaultMessageStore implements 
MessageStore {
         return this.systemClock.now();
     }
 
+    /**
+     * Lazy clean queue offset table.
+     * If offset table is cleaned, and old messages are dispatching after the 
old consume queue is cleaned,
+     * consume queue will be created with old offset, then later message with 
new offset table can not be
+     * dispatched to consume queue.
+     */
     @Override
-    public int cleanUnusedTopic(Set<String> topics) {
-        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> 
it = this.getConsumeQueueTable().entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next 
= it.next();
-            String topic = next.getKey();
+    public int deleteTopics(final Set<String> deleteTopics) {
+        if (deleteTopics == null || deleteTopics.isEmpty()) {
+            return 0;
+        }
 
-            if (!topics.contains(topic) && 
!TopicValidator.isSystemTopic(topic) && !MixAll.isLmq(topic)) {
-                ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = 
next.getValue();
-                for (ConsumeQueueInterface cq : queueTable.values()) {
-                    this.consumeQueueStore.destroy(cq);
-                    LOGGER.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned",
-                        cq.getTopic(),
-                        cq.getQueueId()
-                    );
+        int deleteCount = 0;
+        for (String topic : deleteTopics) {
+            ConcurrentMap<Integer, ConsumeQueueInterface> queueTable =
+                this.consumeQueueStore.getConsumeQueueTable().get(topic);
 
-                    
this.consumeQueueStore.removeTopicQueueTable(cq.getTopic(), cq.getQueueId());
-                }
-                it.remove();
+            if (queueTable == null || queueTable.isEmpty()) {
+                continue;
+            }
 
-                if (this.brokerConfig.isAutoDeleteUnusedStats()) {
-                    this.brokerStatsManager.onTopicDeleted(topic);
-                }
+            for (ConsumeQueueInterface cq : queueTable.values()) {
+                this.consumeQueueStore.destroy(cq);
+                LOGGER.info("DeleteTopic: ConsumeQueue has been cleaned, 
topic={}, queueId={}",
+                    cq.getTopic(), cq.getQueueId());
+                this.consumeQueueStore.removeTopicQueueTable(cq.getTopic(), 
cq.getQueueId());
+            }
 
-                LOGGER.info("cleanUnusedTopic: {},topic destroyed", topic);
+            if (this.brokerConfig.isAutoDeleteUnusedStats()) {
+                this.brokerStatsManager.onTopicDeleted(topic);
             }
+
+            // destroy consume queue dir
+            String consumeQueueDir = 
StorePathConfigHelper.getStorePathConsumeQueue(
+                this.messageStoreConfig.getStorePathRootDir()) + 
File.separator + topic;
+            String consumeQueueExtDir = 
StorePathConfigHelper.getStorePathConsumeQueue(
+                this.messageStoreConfig.getStorePathRootDir()) + 
File.separator + topic;
+            String batchConsumeQueueDir = 
StorePathConfigHelper.getStorePathBatchConsumeQueue(
+                this.messageStoreConfig.getStorePathRootDir()) + 
File.separator + topic;
+
+            UtilAll.deleteEmptyDirectory(new File(consumeQueueDir));
+            UtilAll.deleteEmptyDirectory(new File(consumeQueueExtDir));
+            UtilAll.deleteEmptyDirectory(new File(batchConsumeQueueDir));
+
+            LOGGER.info("DeleteTopic: Topic has been destroyed, topic={}", 
topic);
+            deleteCount++;
         }
+        return deleteCount;
+    }
 
-        return 0;
+    @Override
+    public int cleanUnusedTopic(final Set<String> retainTopics) {
+        Set<String> consumeQueueTopicSet = 
this.getConsumeQueueTable().keySet();
+        int deleteCount = 0;
+        for (String topicName : Sets.difference(retainTopics, 
consumeQueueTopicSet)) {
+            if (retainTopics.contains(topicName) ||
+                TopicValidator.isSystemTopic(topicName) ||
+                MixAll.isLmq(topicName)) {
+                continue;
+            }
+            deleteCount += this.deleteTopics(Sets.newHashSet(topicName));
+        }
+        return deleteCount;
     }
 
     @Override
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index bb596c844..bbf2056cc 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -436,12 +436,21 @@ public interface MessageStore {
     long now();
 
     /**
-     * Clean unused topics.
+     * Delete topic's consume queue file and unused stats.
+     * This interface allows user delete system topic.
      *
-     * @param topics all valid topics.
+     * @param deleteTopics unused topic name set
+     * @return the number of the topics which has been deleted.
+     */
+    int deleteTopics(final Set<String> deleteTopics);
+
+    /**
+     * Clean unused topics which not in retain topic name set.
+     *
+     * @param retainTopics all valid topics.
      * @return number of the topics deleted.
      */
-    int cleanUnusedTopic(final Set<String> topics);
+    int cleanUnusedTopic(final Set<String> retainTopics);
 
     /**
      * Clean expired consume queues.
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index db752919b..47416a873 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -257,8 +257,13 @@ public abstract class AbstractPluginMessageStore 
implements MessageStore {
     }
 
     @Override
-    public int cleanUnusedTopic(Set<String> topics) {
-        return next.cleanUnusedTopic(topics);
+    public int deleteTopics(final Set<String> deleteTopics) {
+        return next.deleteTopics(deleteTopics);
+    }
+
+    @Override
+    public int cleanUnusedTopic(final Set<String> retainTopics) {
+        return next.cleanUnusedTopic(retainTopics);
     }
 
     @Override

Reply via email to