absolute8511 commented on code in PR #8267:
URL: https://github.com/apache/rocketmq/pull/8267#discussion_r1708497551
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java:
##########
@@ -536,6 +539,84 @@ private synchronized RemotingCommand
updateAndCreateTopic(ChannelHandlerContext
return response;
}
+ private synchronized RemotingCommand
updateAndCreateTopicList(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ long startTime = System.currentTimeMillis();
+
+ final CreateTopicListRequestBody requestBody =
CreateTopicListRequestBody.decode(request.getBody(),
CreateTopicListRequestBody.class);
+ List<TopicConfig> topicConfigList = requestBody.getTopicConfigList();
+
+ StringBuilder builder = new StringBuilder();
+ for (TopicConfig topicConfig : topicConfigList) {
+ builder.append(topicConfig.getTopicName()).append(";");
+ }
+ String topicNames = builder.toString();
+ LOGGER.info("AdminBrokerProcessor#updateAndCreateTopicList:
topicNames: {}, called by {}", topicNames,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+
+ long executionTime;
+
+ try {
+ // Valid topics
+ for (TopicConfig topicConfig : topicConfigList) {
+ String topic = topicConfig.getTopicName();
+ TopicValidator.ValidateTopicResult result =
TopicValidator.validateTopic(topic);
+ if (!result.isValid()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(result.getRemark());
+ return response;
+ }
+ if
(brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) {
+ if (TopicValidator.isSystemTopic(topic)) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("The topic[" + topic + "] is
conflict with system topic.");
+ return response;
+ }
+ }
+ if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
+ &&
!brokerController.getBrokerConfig().isEnableMixedMessageType()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("MIXED message type is not supported.");
+ return response;
+ }
+ if
(topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic)))
{
+ LOGGER.info("Broker receive request to update or create
topic={}, but topicConfig has no changes , so idempotent, caller address={}",
+ topic,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ response.setCode(ResponseCode.SUCCESS);
+ return response;
Review Comment:
we should continue here instead return, otherwise we will left the other
topics uncreated after return success.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]