RongtongJin commented on code in PR #10448:
URL: https://github.com/apache/rocketmq/pull/10448#discussion_r3386169067
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java:
##########
@@ -816,6 +814,84 @@ private synchronized RemotingCommand
deleteTopic(ChannelHandlerContext ctx,
return response;
}
+ private synchronized RemotingCommand deleteTopicList(ChannelHandlerContext
ctx,
+ RemotingCommand request) {
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+
+ DeleteTopicListRequestBody requestBody =
DeleteTopicListRequestBody.decode(
+ request.getBody(), DeleteTopicListRequestBody.class);
+ List<String> topicList = requestBody == null ? null :
requestBody.getTopicList();
+
+ if (CollectionUtils.isEmpty(topicList)) {
+ response.setCode(ResponseCode.INVALID_PARAMETER);
+ response.setRemark("The specified topic list is blank.");
+ return response;
+ }
+
+ LOGGER.info("AdminBrokerProcessor#deleteTopicList: broker receive
request to delete topics={}, caller={}",
+ topicList, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ boolean validateSystemTopic =
brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic();
+ // dedup while preserving the input order
+ Set<String> topicsToClean = new LinkedHashSet<>();
+ for (String topic : topicList) {
+ if (UtilAll.isBlank(topic)) {
+ response.setCode(ResponseCode.INVALID_PARAMETER);
+ response.setRemark("The specified topic is blank.");
+ return response;
+ }
+ if (validateSystemTopic && TopicValidator.isSystemTopic(topic)) {
+ response.setCode(ResponseCode.INVALID_PARAMETER);
+ response.setRemark("The topic[" + topic + "] is conflict with
system topic.");
+ return response;
+ }
+ topicsToClean.add(topic);
+ }
+
+ if
(brokerController.getBrokerConfig().isClearRetryTopicWhenDeleteTopic()) {
+ // snapshot the inputs before mutating the set, so retry topics
for already-added retry topics are not collected
+ for (String topic : new ArrayList<>(topicsToClean)) {
+ collectPopRetryTopics(topic, topicsToClean);
+ }
+ }
+
+ try {
+ for (String topic : topicsToClean) {
+ if (LiteMetadataUtil.isLiteMessageType(topic,
brokerController)) {
+
brokerController.getLiteLifecycleManager().cleanByParentTopic(topic);
+ }
+ }
+ // batch delete topic config in one persist
+
this.brokerController.getTopicConfigManager().deleteTopicConfigList(new
ArrayList<>(topicsToClean));
Review Comment:
Consider the partial-failure retry case here. After this call removes the
topic configs, a retry of the same batch rebuilds `topicsToClean` from the
original request and `collectPopRetryTopics()`, but that helper only adds POP
retry topics whose configs still exist. If the first attempt fails after this
config deletion but before queue mapping / offset / timer metrics / message
store cleanup finishes, those retry topics may not be rediscovered on retry, so
the remaining cleanup for them can be skipped. Please make the delete plan
retry-stable, or explicitly handle/document this partial-failure behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]