This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch revert-1644-fix_1637 in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 42fcc17f45040789e6bf452a837ffb797050d24e Author: Heng Du <[email protected]> AuthorDate: Fri Dec 13 20:05:52 2019 +0800 Revert "[ISSUE #1637]Fix 1637 (#1644)" This reverts commit f8f6fbe4aa7f5dee937e688322628c366b12a552. --- .../processor/AbstractSendMessageProcessor.java | 11 +-- .../broker/processor/AdminBrokerProcessor.java | 29 ++++---- .../rocketmq/broker/topic/TopicConfigManager.java | 4 ++ .../rocketmq/broker/topic/TopicValidator.java | 69 ------------------- .../rocketmq/broker/topic/TopicValidatorTest.java | 80 ---------------------- .../org/apache/rocketmq/client/Validators.java | 7 +- .../apache/rocketmq/client/impl/MQAdminImpl.java | 2 - .../rocketmq/client/impl/MQClientAPIImpl.java | 6 +- .../java/org/apache/rocketmq/common/UtilAll.java | 4 +- 9 files changed, 34 insertions(+), 178 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index adf0279..b0668d4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -16,16 +16,16 @@ */ package org.apache.rocketmq.broker.processor; -import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.List; import java.util.Map; import java.util.Random; + +import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; -import org.apache.rocketmq.broker.topic.TopicValidator; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; @@ -171,8 +171,11 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces + "] sending message is forbidden"); return response; } - - if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) { + if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { + String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; + log.warn(errorMsg); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorMsg); return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index df0ec90..d63cc20 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -38,18 +38,25 @@ import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; -import org.apache.rocketmq.broker.topic.TopicValidator; -import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; @@ -77,15 +84,10 @@ import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; -import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; -import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody; -import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader; import org.apache.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader; import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader; import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; @@ -108,7 +110,6 @@ import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; -import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; @@ -117,8 +118,6 @@ import org.apache.rocketmq.common.stats.StatsItem; import org.apache.rocketmq.common.stats.StatsSnapshot; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.filter.util.BitsArray; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; @@ -259,10 +258,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) { - return response; - } - try { response.setCode(ResponseCode.SUCCESS); response.setOpaque(request.getOpaque()); @@ -317,8 +312,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { accessConfig.setWhiteRemoteAddress(requestHeader.getWhiteRemoteAddress()); accessConfig.setDefaultTopicPerm(requestHeader.getDefaultTopicPerm()); accessConfig.setDefaultGroupPerm(requestHeader.getDefaultGroupPerm()); - accessConfig.setTopicPerms(UtilAll.string2List(requestHeader.getTopicPerms(), ",")); - accessConfig.setGroupPerms(UtilAll.string2List(requestHeader.getGroupPerms(), ",")); + accessConfig.setTopicPerms(UtilAll.String2List(requestHeader.getTopicPerms(),",")); + accessConfig.setGroupPerms(UtilAll.String2List(requestHeader.getGroupPerms(),",")); accessConfig.setAdmin(requestHeader.isAdmin()); try { @@ -391,7 +386,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { try { AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class); - if (accessValidator.updateGlobalWhiteAddrsConfig(UtilAll.string2List(requestHeader.getGlobalWhiteAddrs(), ","))) { + if (accessValidator.updateGlobalWhiteAddrsConfig(UtilAll.String2List(requestHeader.getGlobalWhiteAddrs(),","))) { response.setCode(ResponseCode.SUCCESS); response.setOpaque(request.getOpaque()); response.markResponseType(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 199b46d..cb29011 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -152,6 +152,10 @@ public class TopicConfigManager extends ConfigManager { return this.systemTopicList; } + public boolean isTopicCanSendMessage(final String topic) { + return !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC); + } + public TopicConfig selectTopicConfig(final String topic) { return this.topicConfigTable.get(topic); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java deleted file mode 100644 index 8b53476..0000000 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.broker.topic; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; - -public class TopicValidator { - - private static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"; - private static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR); - private static final int CHARACTER_MAX_LENGTH = 255; - - private static boolean regularExpressionMatcher(String origin, Pattern pattern) { - if (pattern == null) { - return true; - } - Matcher matcher = pattern.matcher(origin); - return matcher.matches(); - } - - public static boolean validateTopic(String topic, RemotingCommand response) { - - if (UtilAll.isBlank(topic)) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The specified topic is blank."); - return false; - } - - if (!regularExpressionMatcher(topic, PATTERN)) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The specified topic contains illegal characters, allowing only " + VALID_PATTERN_STR); - return false; - } - - if (topic.length() > CHARACTER_MAX_LENGTH) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The specified topic is longer than topic max length 255."); - return false; - } - - //whether the same with system reserved keyword - if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC."); - return false; - } - - return true; - } -} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java deleted file mode 100644 index 267931f..0000000 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.broker.topic; - -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.junit.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -public class TopicValidatorTest { - - @Test - public void testTopicValidator_NotPass() { - RemotingCommand response = RemotingCommand.createResponseCommand(-1, ""); - - Boolean res = TopicValidator.validateTopic("", response); - assertThat(res).isFalse(); - assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - assertThat(response.getRemark()).contains("The specified topic is blank"); - - clearResponse(response); - res = TopicValidator.validateTopic("../TopicTest", response); - assertThat(res).isFalse(); - assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - assertThat(response.getRemark()).contains("The specified topic contains illegal characters"); - - clearResponse(response); - res = TopicValidator.validateTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC, response); - assertThat(res).isFalse(); - assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - assertThat(response.getRemark()).contains("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC."); - - clearResponse(response); - res = TopicValidator.validateTopic(generateString(255), response); - assertThat(res).isFalse(); - assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - assertThat(response.getRemark()).contains("The specified topic is longer than topic max length 255."); - - } - - @Test - public void testTopicValidator_Pass() { - RemotingCommand response = RemotingCommand.createResponseCommand(-1, ""); - - Boolean res = TopicValidator.validateTopic("TestTopic", response); - assertThat(res).isTrue(); - assertThat(response.getCode()).isEqualTo(-1); - assertThat(response.getRemark()).isEmpty(); - } - - private static void clearResponse(RemotingCommand response) { - response.setCode(-1); - response.setRemark(""); - } - - private static String generateString(int length) { - StringBuilder stringBuffer = new StringBuilder(); - String tmpStr = "0123456789"; - for (int i = 0; i < length; i++) { - stringBuffer.append(tmpStr); - } - return stringBuffer.toString(); - } -} diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java index a37a17b..1b96cd0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/Validators.java +++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java @@ -77,6 +77,9 @@ public class Validators { return matcher.matches(); } + /** + * Validate message + */ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { @@ -100,6 +103,9 @@ public class Validators { } } + /** + * Validate topic + */ public static void checkTopic(String topic) throws MQClientException { if (UtilAll.isBlank(topic)) { throw new MQClientException("The specified topic is blank", null); @@ -121,5 +127,4 @@ public class Validators { String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", topic), null); } } - } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 9dbd552..ca89d61 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -28,7 +28,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.rocketmq.client.QueryResult; -import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -81,7 +80,6 @@ public class MQAdminImpl { public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { try { - Validators.checkTopic(newTopic); TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis); List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas(); if (brokerDataList != null && !brokerDataList.isEmpty()) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 9380f4b..1ad5fbf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -43,12 +43,12 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.TopicStatsTable; @@ -305,8 +305,8 @@ public class MQClientAPIImpl { requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm()); requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm()); requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress()); - requestHeader.setTopicPerms(UtilAll.list2String(plainAccessConfig.getTopicPerms(), ",")); - requestHeader.setGroupPerms(UtilAll.list2String(plainAccessConfig.getGroupPerms(), ",")); + requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(), ",")); + requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(), ",")); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader); diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index 624199c..222f697 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -580,7 +580,7 @@ public class UtilAll { } } - public static String list2String(List<String> list, String splitor) { + public static String List2String(List<String> list, String splitor) { if (list == null || list.size() == 0) { return null; } @@ -595,7 +595,7 @@ public class UtilAll { return str.toString(); } - public static List<String> string2List(String str, String splitor) { + public static List<String> String2List(String str, String splitor) { if (StringUtils.isEmpty(str)) { return null; }
