This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f82718ae3b [ISSUE #7208] fix: when deleting topic also delete its pop 
retry topic (#7209)
f82718ae3b is described below

commit f82718ae3b77a16b553c03f672dc971a2d5d48fa
Author: cnScarb <[email protected]>
AuthorDate: Thu Aug 31 15:50:10 2023 +0800

    [ISSUE #7208] fix: when deleting topic also delete its pop retry topic 
(#7209)
---
 .../broker/processor/AdminBrokerProcessor.java     | 24 ++++++++++++---
 .../broker/processor/AdminBrokerProcessorTest.java | 36 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 5 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index bbddcec2d7..8fbcd3c94f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -51,6 +51,7 @@ import 
org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
 import org.apache.rocketmq.common.AclConfig;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.LockCallback;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
@@ -542,16 +543,29 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             }
         }
 
-        
this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
-        
this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic());
-        
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic());
-        
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic());
-        
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic()));
+        final Set<String> groups = 
this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
+        // delete pop retry topics first
+        for (String group : groups) {
+            final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, 
group);
+            if 
(brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != 
null) {
+                deleteTopicInBroker(popRetryTopic);
+            }
+        }
+        // delete topic
+        deleteTopicInBroker(topic);
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
         return response;
     }
 
+    private void deleteTopicInBroker(String topic) {
+        this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
+        this.brokerController.getTopicQueueMappingManager().delete(topic);
+        
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic);
+        
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic);
+        
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic));
+    }
+
     private synchronized RemotingCommand 
updateAndCreateAccessConfig(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index d33a217f76..9d17011b61 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.rocketmq.broker.BrokerController;
@@ -41,6 +42,7 @@ import 
org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
@@ -90,8 +92,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -321,6 +326,37 @@ public class AdminBrokerProcessorTest {
             "please execute it from master broker.");
     }
 
+    @Test
+    public void testDeleteWithPopRetryTopic() throws Exception {
+        String topic = "topicA";
+        String anotherTopic = "another_topicA";
+
+        topicConfigManager = mock(TopicConfigManager.class);
+        
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+        final ConcurrentHashMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<>();
+        topicConfigTable.put(topic, new TopicConfig());
+        topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1"), new 
TopicConfig());
+
+        topicConfigTable.put(anotherTopic, new TopicConfig());
+        topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, 
"cid2"), new TopicConfig());
+        
when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable);
+        
when(topicConfigManager.selectTopicConfig(anyString())).thenAnswer(invocation 
-> {
+            final String selectTopic = invocation.getArgument(0);
+            return topicConfigManager.getTopicConfigTable().get(selectTopic);
+        });
+
+        
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+        
when(consumerOffsetManager.whichGroupByTopic(topic)).thenReturn(Sets.newHashSet("cid1"));
+
+        RemotingCommand request = buildDeleteTopicRequest(topic);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        verify(topicConfigManager).deleteTopicConfig(topic);
+        
verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic,
 "cid1"));
+        verify(messageStore, times(2)).deleteTopics(anySet());
+    }
+
     @Test
     public void testGetAllTopicConfigInRocksdb() throws Exception {
         if (notToBeExecuted()) {

Reply via email to