This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f82718ae3b [ISSUE #7208] fix: when deleting topic also delete its pop
retry topic (#7209)
f82718ae3b is described below
commit f82718ae3b77a16b553c03f672dc971a2d5d48fa
Author: cnScarb <[email protected]>
AuthorDate: Thu Aug 31 15:50:10 2023 +0800
[ISSUE #7208] fix: when deleting topic also delete its pop retry topic
(#7209)
---
.../broker/processor/AdminBrokerProcessor.java | 24 ++++++++++++---
.../broker/processor/AdminBrokerProcessorTest.java | 36 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 5 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index bbddcec2d7..8fbcd3c94f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -51,6 +51,7 @@ import
org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
@@ -542,16 +543,29 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
}
}
-
this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
-
this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic());
-
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic());
-
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic());
-
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic()));
+ final Set<String> groups =
this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
+ // delete pop retry topics first
+ for (String group : groups) {
+ final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic,
group);
+ if
(brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) !=
null) {
+ deleteTopicInBroker(popRetryTopic);
+ }
+ }
+ // delete topic
+ deleteTopicInBroker(topic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
+ private void deleteTopicInBroker(String topic) {
+ this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
+ this.brokerController.getTopicQueueMappingManager().delete(topic);
+
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic);
+
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic);
+
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic));
+ }
+
private synchronized RemotingCommand
updateAndCreateAccessConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index d33a217f76..9d17011b61 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
import org.apache.rocketmq.broker.BrokerController;
@@ -41,6 +42,7 @@ import
org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
@@ -90,8 +92,11 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -321,6 +326,37 @@ public class AdminBrokerProcessorTest {
"please execute it from master broker.");
}
+ @Test
+ public void testDeleteWithPopRetryTopic() throws Exception {
+ String topic = "topicA";
+ String anotherTopic = "another_topicA";
+
+ topicConfigManager = mock(TopicConfigManager.class);
+
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+ final ConcurrentHashMap<String, TopicConfig> topicConfigTable = new
ConcurrentHashMap<>();
+ topicConfigTable.put(topic, new TopicConfig());
+ topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1"), new
TopicConfig());
+
+ topicConfigTable.put(anotherTopic, new TopicConfig());
+ topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic,
"cid2"), new TopicConfig());
+
when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable);
+
when(topicConfigManager.selectTopicConfig(anyString())).thenAnswer(invocation
-> {
+ final String selectTopic = invocation.getArgument(0);
+ return topicConfigManager.getTopicConfigTable().get(selectTopic);
+ });
+
+
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+
when(consumerOffsetManager.whichGroupByTopic(topic)).thenReturn(Sets.newHashSet("cid1"));
+
+ RemotingCommand request = buildDeleteTopicRequest(topic);
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ verify(topicConfigManager).deleteTopicConfig(topic);
+
verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic,
"cid1"));
+ verify(messageStore, times(2)).deleteTopics(anySet());
+ }
+
@Test
public void testGetAllTopicConfigInRocksdb() throws Exception {
if (notToBeExecuted()) {