wang-jiahua commented on code in PR #10448:
URL: https://github.com/apache/rocketmq/pull/10448#discussion_r3386366617
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java:
##########
@@ -816,6 +814,84 @@ private synchronized RemotingCommand
deleteTopic(ChannelHandlerContext ctx,
return response;
}
+ private synchronized RemotingCommand deleteTopicList(ChannelHandlerContext
ctx,
+ RemotingCommand request) {
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+
+ DeleteTopicListRequestBody requestBody =
DeleteTopicListRequestBody.decode(
+ request.getBody(), DeleteTopicListRequestBody.class);
+ List<String> topicList = requestBody == null ? null :
requestBody.getTopicList();
+
+ if (CollectionUtils.isEmpty(topicList)) {
+ response.setCode(ResponseCode.INVALID_PARAMETER);
+ response.setRemark("The specified topic list is blank.");
+ return response;
+ }
+
+ LOGGER.info("AdminBrokerProcessor#deleteTopicList: broker receive
request to delete topics={}, caller={}",
+ topicList, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ boolean validateSystemTopic =
brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic();
+ // dedup while preserving the input order
+ Set<String> topicsToClean = new LinkedHashSet<>();
+ for (String topic : topicList) {
+ if (UtilAll.isBlank(topic)) {
+ response.setCode(ResponseCode.INVALID_PARAMETER);
+ response.setRemark("The specified topic is blank.");
+ return response;
+ }
+ if (validateSystemTopic && TopicValidator.isSystemTopic(topic)) {
+ response.setCode(ResponseCode.INVALID_PARAMETER);
+ response.setRemark("The topic[" + topic + "] is conflict with
system topic.");
+ return response;
+ }
+ topicsToClean.add(topic);
+ }
+
+ if
(brokerController.getBrokerConfig().isClearRetryTopicWhenDeleteTopic()) {
+ // snapshot the inputs before mutating the set, so retry topics
for already-added retry topics are not collected
+ for (String topic : new ArrayList<>(topicsToClean)) {
+ collectPopRetryTopics(topic, topicsToClean);
+ }
+ }
+
+ try {
+ for (String topic : topicsToClean) {
+ if (LiteMetadataUtil.isLiteMessageType(topic,
brokerController)) {
+
brokerController.getLiteLifecycleManager().cleanByParentTopic(topic);
+ }
+ }
+ // batch delete topic config in one persist
+
this.brokerController.getTopicConfigManager().deleteTopicConfigList(new
ArrayList<>(topicsToClean));
Review Comment:
Fixed in 6bebee2c9. Moved `deleteTopicConfigList` after all other cleanup
steps (queue mapping, offsets, inflight counter, timer metrics, message store).
This way, if the batch fails partway through and the caller retries,
`collectPopRetryTopics` → `selectTopicConfig` can still find the configs and
rediscover derived POP retry topics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]