This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 438077e7ad Limit group max length to 120. (#9563)
438077e7ad is described below
commit 438077e7ad09495f6418542b77c83787ae286223
Author: Jixiang Jin <[email protected]>
AuthorDate: Tue Jul 22 17:51:50 2025 +0800
Limit group max length to 120. (#9563)
* limit group length to 120 for max length for pop retry topic is 255.
* Add unit test for validating group.
* Fix unit test for validating gRPC group, limit length to 120
---
.../processor/AbstractSendMessageProcessor.java | 2 +-
.../broker/processor/AdminBrokerProcessor.java | 39 ++++++++++++------
.../subscription/SubscriptionGroupManager.java | 23 +++--------
.../org/apache/rocketmq/client/Validators.java | 10 +++--
.../rocketmq/common/topic/TopicValidator.java | 48 +++++++++++++++++-----
.../rocketmq/common/topic/TopicValidatorTest.java | 39 +++++++++++++-----
.../proxy/grpc/v2/AbstractMessingActivityTest.java | 2 +-
7 files changed, 108 insertions(+), 55 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 16c137dd2e..928bd397e1 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -472,7 +472,7 @@ public abstract class AbstractSendMessageProcessor
implements NettyRequestProces
return response;
}
- TopicValidator.ValidateTopicResult result =
TopicValidator.validateTopic(requestHeader.getTopic());
+ TopicValidator.ValidateResult result =
TopicValidator.validateTopic(requestHeader.getTopic());
if (!result.isValid()) {
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark(result.getRemark());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 87c7d24ca8..4eb78fc1c2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -556,7 +556,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
long executionTime;
try {
- TopicValidator.ValidateTopicResult result =
TopicValidator.validateTopic(topic);
+ TopicValidator.ValidateResult result =
TopicValidator.validateTopic(topic);
if (!result.isValid()) {
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark(result.getRemark());
@@ -646,7 +646,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
// Valid topics
for (TopicConfig topicConfig : topicConfigList) {
String topic = topicConfig.getTopicName();
- TopicValidator.ValidateTopicResult result =
TopicValidator.validateTopic(topic);
+ TopicValidator.ValidateResult result =
TopicValidator.validateTopic(topic);
if (!result.isValid()) {
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark(result.getRemark());
@@ -716,7 +716,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
String topic = requestHeader.getTopic();
- TopicValidator.ValidateTopicResult result =
TopicValidator.validateTopic(topic);
+ TopicValidator.ValidateResult result =
TopicValidator.validateTopic(topic);
if (!result.isValid()) {
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark(result.getRemark());
@@ -1532,14 +1532,23 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
SubscriptionGroupConfig config =
RemotingSerializable.decode(request.getBody(), SubscriptionGroupConfig.class);
- if (config != null) {
+ if (null != config) {
+ TopicValidator.ValidateResult result =
TopicValidator.validateGroup(config.getGroupName());
+ if (!result.isValid()) {
+ response.setCode(ResponseCode.INVALID_PARAMETER);
+ response.setRemark(result.getRemark());
+ return response;
+ }
+
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
long executionTime = System.currentTimeMillis() - startTime;
- LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms",
config.getGroupName(), executionTime);
+ if (null != config) {
+ LOGGER.info("executionTime of create subscriptionGroup:{} is {}
ms", config.getGroupName(), executionTime);
+ }
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
@@ -1551,12 +1560,19 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
private RemotingCommand
updateAndCreateSubscriptionGroupList(ChannelHandlerContext ctx, RemotingCommand
request) {
final long startTime = System.nanoTime();
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
final SubscriptionGroupList subscriptionGroupList =
SubscriptionGroupList.decode(request.getBody(), SubscriptionGroupList.class);
final List<SubscriptionGroupConfig> groupConfigList =
subscriptionGroupList.getGroupConfigList();
final StringBuilder builder = new StringBuilder();
for (SubscriptionGroupConfig config : groupConfigList) {
+ TopicValidator.ValidateResult result =
TopicValidator.validateGroup(config.getGroupName());
+ if (!result.isValid()) {
+ response.setCode(ResponseCode.INVALID_PARAMETER);
+ response.setRemark(result.getRemark());
+ return response;
+ }
builder.append(config.getGroupName()).append(";");
}
final String groupNames = builder.toString();
@@ -1564,7 +1580,6 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
groupNames,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
try {
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfigList(groupConfigList);
response.setCode(ResponseCode.SUCCESS);
@@ -2058,13 +2073,13 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
/**
* Reset consumer offset.
*
- * @param topic Required, not null.
- * @param group Required, not null.
- * @param queueId if target queue ID is negative, all message queues will
be reset; otherwise, only the target queue
- * would get reset.
+ * @param topic Required, not null.
+ * @param group Required, not null.
+ * @param queueId if target queue ID is negative, all message queues
will be reset; otherwise, only the target queue
+ * would get reset.
* @param timestamp if timestamp is negative, offset would be reset to
broker offset at the time being; otherwise,
- * binary search is performed to locate target offset.
- * @param offset Target offset to reset to if target queue ID is properly
provided.
+ * binary search is performed to locate target offset.
+ * @param offset Target offset to reset to if target queue ID is
properly provided.
* @return Affected queues and their new offset
*/
private RemotingCommand resetOffsetInner(String topic, String group, int
queueId, long timestamp, Long offset) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index c7083365be..e860f29074 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -17,8 +17,9 @@
package org.apache.rocketmq.broker.subscription;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Maps;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -27,15 +28,11 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
-
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
-import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SubscriptionGroupAttributes;
@@ -48,6 +45,7 @@ import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+@SuppressWarnings("Duplicates")
public class SubscriptionGroupManager extends ConfigManager {
protected static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -191,10 +189,6 @@ public class SubscriptionGroupManager extends
ConfigManager {
/**
* set the bit value to 1 at the specific index (from 0)
- *
- * @param group
- * @param topic
- * @param forbiddenIndex from 0
*/
public void setForbidden(String group, String topic, int forbiddenIndex) {
int topicForbidden = getForbidden(group, topic);
@@ -204,10 +198,6 @@ public class SubscriptionGroupManager extends
ConfigManager {
/**
* clear the bit value to 0 at the specific index (from 0)
- *
- * @param group
- * @param topic
- * @param forbiddenIndex from 0
*/
public void clearForbidden(String group, String topic, int forbiddenIndex)
{
int topicForbidden = getForbidden(group, topic);
@@ -270,7 +260,8 @@ public class SubscriptionGroupManager extends ConfigManager
{
if (null == subscriptionGroupConfig) {
if
(brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup()
|| MixAll.isSysConsumerGroupAndEnableCreate(group,
brokerController.getBrokerConfig().isEnableCreateSysGroup())) {
- if (group.length() > Validators.CHARACTER_MAX_LENGTH ||
TopicValidator.isTopicOrGroupIllegal(group)) {
+ TopicValidator.ValidateResult result =
TopicValidator.validateGroup(group);
+ if (!result.isValid()) {
return null;
}
subscriptionGroupConfig = new SubscriptionGroupConfig();
@@ -319,9 +310,7 @@ public class SubscriptionGroupManager extends ConfigManager
{
}
private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager
sgm) {
- Iterator<Entry<String, SubscriptionGroupConfig>> it =
sgm.getSubscriptionGroupTable().entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, SubscriptionGroupConfig> next = it.next();
+ for (Entry<String, SubscriptionGroupConfig> next :
sgm.getSubscriptionGroupTable().entrySet()) {
log.info("load exist subscription group, {}",
next.getValue().toString());
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java
b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index 77e4bbd238..7f588d56ea 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -38,6 +38,11 @@ import static
org.apache.rocketmq.common.topic.TopicValidator.isTopicOrGroupIlle
public class Validators {
public static final int CHARACTER_MAX_LENGTH = 255;
public static final int TOPIC_MAX_LENGTH = 127;
+ /*
+ * Group name max length is 120, for it will be used to make up retry and
DLQ topic,
+ * like pull retry: %RETRY%group_topic and pop retry: %RETRY%group_topic.
+ */
+ public static final int GROUP_MAX_LENGTH = 120;
/**
* Validate group
@@ -47,11 +52,10 @@ public class Validators {
throw new MQClientException("the specified group is blank", null);
}
- if (group.length() > CHARACTER_MAX_LENGTH) {
- throw new MQClientException("the specified group is longer than
group max length 255.", null);
+ if (group.length() > GROUP_MAX_LENGTH) {
+ throw new MQClientException(String.format("the specified group[%s]
is longer than group max length: %s.", group, GROUP_MAX_LENGTH), null);
}
-
if (isTopicOrGroupIllegal(group)) {
throw new MQClientException(String.format(
"the specified group[%s] contains illegal characters,
allowing only %s", group,
diff --git
a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
index 1efb508664..47d45c6dfe 100644
--- a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
+++ b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
@@ -39,12 +39,17 @@ public class TopicValidator {
public static final boolean[] VALID_CHAR_BIT_MAP = new boolean[128];
private static final int TOPIC_MAX_LENGTH = 127;
+ /*
+ * Group name max length is 120, for it will be used to make up retry and
DLQ topic,
+ * like pull retry: %RETRY%group_topic and pop retry: %RETRY%group_topic.
+ */
+ private static final int GROUP_MAX_LENGTH = 120;
private static final int RETRY_OR_DLQ_TOPIC_MAX_LENGTH = 255;
private static final Set<String> SYSTEM_TOPIC_SET = new HashSet<>();
/**
- * Topics'set which client can not send msg!
+ * Topic set which client can not send msg!
*/
private static final Set<String> NOT_ALLOWED_SEND_TOPIC_SET = new
HashSet<>();
@@ -93,44 +98,65 @@ public class TopicValidator {
public static boolean isTopicOrGroupIllegal(String str) {
int strLen = str.length();
int len = VALID_CHAR_BIT_MAP.length;
- boolean[] bitMap = VALID_CHAR_BIT_MAP;
for (int i = 0; i < strLen; i++) {
char ch = str.charAt(i);
- if (ch >= len || !bitMap[ch]) {
+ if (ch >= len || !VALID_CHAR_BIT_MAP[ch]) {
return true;
}
}
return false;
}
- public static ValidateTopicResult validateTopic(String topic) {
+ public static ValidateResult validateTopic(String topic) {
if (UtilAll.isBlank(topic)) {
- return new ValidateTopicResult(false, "The specified topic is
blank.");
+ return new ValidateResult(false, "The specified topic is blank.");
}
if (isTopicOrGroupIllegal(topic)) {
- return new ValidateTopicResult(false, "The specified topic
contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$");
+ String falseRemark = "The specified topic: " + topic + ", contains
illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$";
+ return new ValidateResult(false, falseRemark);
}
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ||
topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
if (topic.length() > RETRY_OR_DLQ_TOPIC_MAX_LENGTH) {
- return new ValidateTopicResult(false, "The specified topic is
longer than topic max length.");
+ String falseRemark = "The specified topic is DLQ or Retry
topic: " + topic + ", and it's longer than topic max length: " +
RETRY_OR_DLQ_TOPIC_MAX_LENGTH;
+ return new ValidateResult(false, falseRemark);
}
} else {
if (topic.length() > TOPIC_MAX_LENGTH) {
- return new ValidateTopicResult(false, "The specified topic is
longer than topic max length.");
+ String falseRemark = "The specified topic: " + topic + ", is
longer than topic max length: " + TOPIC_MAX_LENGTH;
+ return new ValidateResult(false, falseRemark);
}
}
- return new ValidateTopicResult(true, "");
+ return new ValidateResult(true, "");
+ }
+
+ public static ValidateResult validateGroup(String group) {
+
+ if (UtilAll.isBlank(group)) {
+ return new ValidateResult(false, "The specified group is blank.");
+ }
+
+ if (isTopicOrGroupIllegal(group)) {
+ String falseRemark = "The specified group: " + group + ", contains
illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$";
+ return new ValidateResult(false, falseRemark);
+ }
+
+ if (group.length() > GROUP_MAX_LENGTH) {
+ String falseRemark = "The specified group: " + group + ", is
longer than group max length: " + GROUP_MAX_LENGTH;
+ return new ValidateResult(false, falseRemark);
+ }
+
+ return new ValidateResult(true, "");
}
- public static class ValidateTopicResult {
+ public static class ValidateResult {
private final boolean valid;
private final String remark;
- public ValidateTopicResult(boolean valid, String remark) {
+ public ValidateResult(boolean valid, String remark) {
this.valid = valid;
this.remark = remark;
}
diff --git
a/common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java
b/common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java
index 1655eaab2b..0541c7f6d2 100644
---
a/common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java
+++
b/common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java
@@ -24,34 +24,26 @@ public class TopicValidatorTest {
@Test
public void testTopicValidator_NotPass() {
- TopicValidator.ValidateTopicResult res =
TopicValidator.validateTopic("");
+ TopicValidator.ValidateResult res = TopicValidator.validateTopic("");
assertThat(res.isValid()).isFalse();
assertThat(res.getRemark()).contains("The specified topic is blank");
res = TopicValidator.validateTopic("../TopicTest");
assertThat(res.isValid()).isFalse();
- assertThat(res.getRemark()).contains("The specified topic contains
illegal characters");
res = TopicValidator.validateTopic(generateString(128));
assertThat(res.isValid()).isFalse();
- assertThat(res.getRemark()).contains("The specified topic is longer
than topic max length.");
-
- res = TopicValidator.validateTopic(generateString2(128));
- assertThat(res.isValid()).isFalse();
- assertThat(res.getRemark()).contains("The specified topic is longer
than topic max length.");
res = TopicValidator.validateTopic(generateRetryTopic(256));
assertThat(res.isValid()).isFalse();
- assertThat(res.getRemark()).contains("The specified topic is longer
than topic max length.");
res = TopicValidator.validateTopic(generateDlqTopic(256));
assertThat(res.isValid()).isFalse();
- assertThat(res.getRemark()).contains("The specified topic is longer
than topic max length.");
}
@Test
public void testTopicValidator_Pass() {
- TopicValidator.ValidateTopicResult res =
TopicValidator.validateTopic("TestTopic");
+ TopicValidator.ValidateResult res =
TopicValidator.validateTopic("TestTopic");
assertThat(res.isValid()).isTrue();
assertThat(res.getRemark()).isEmpty();
@@ -68,6 +60,33 @@ public class TopicValidatorTest {
assertThat(res.getRemark()).isEmpty();
}
+ @Test
+ public void testGroupValidator_Pass() {
+ TopicValidator.ValidateResult res =
TopicValidator.validateGroup("TestGroup");
+ assertThat(res.isValid()).isTrue();
+ assertThat(res.getRemark()).isEmpty();
+
+ res = TopicValidator.validateGroup(generateString2(120));
+ assertThat(res.isValid()).isTrue();
+ assertThat(res.getRemark()).isEmpty();
+ }
+
+ @Test
+ public void testGroupValidator__NotPass() {
+ TopicValidator.ValidateResult res = TopicValidator.validateGroup("");
+ assertThat(res.isValid()).isFalse();
+ assertThat(res.getRemark()).contains("The specified group is blank");
+
+ res = TopicValidator.validateGroup("../GroupTest");
+ assertThat(res.isValid()).isFalse();
+
+ res = TopicValidator.validateGroup(generateString(120));
+ assertThat(res.isValid()).isFalse();
+
+ res = TopicValidator.validateGroup(generateString2(121));
+ assertThat(res.isValid()).isFalse();
+ }
+
@Test
public void testAddSystemTopic() {
String topic = "SYSTEM_TOPIC_TEST";
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivityTest.java
index 3c2967357f..f31a95770c 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivityTest.java
@@ -64,7 +64,7 @@ public class AbstractMessingActivityTest extends
InitConfigTest {
assertThrows(GrpcProxyException.class, () ->
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(MixAll.CID_SYS_RMQ_TRANS).build()));
assertThrows(GrpcProxyException.class, () ->
messingActivity.validateConsumerGroup(Resource.newBuilder().setName("@").build()));
assertThrows(GrpcProxyException.class, () ->
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(256)).build()));
-
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(255)).build());
+
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(120)).build());
}
private static String createString(int len) {