This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 438077e7ad Limit group max length to 120. (#9563)
438077e7ad is described below

commit 438077e7ad09495f6418542b77c83787ae286223
Author: Jixiang Jin <[email protected]>
AuthorDate: Tue Jul 22 17:51:50 2025 +0800

    Limit group max length to 120. (#9563)
    
    * limit group length to 120 for max length for pop retry topic is 255.
    * Add unit test for validating group.
    * Fix unit test for validating gRPC group, limit length to 120
---
 .../processor/AbstractSendMessageProcessor.java    |  2 +-
 .../broker/processor/AdminBrokerProcessor.java     | 39 ++++++++++++------
 .../subscription/SubscriptionGroupManager.java     | 23 +++--------
 .../org/apache/rocketmq/client/Validators.java     | 10 +++--
 .../rocketmq/common/topic/TopicValidator.java      | 48 +++++++++++++++++-----
 .../rocketmq/common/topic/TopicValidatorTest.java  | 39 +++++++++++++-----
 .../proxy/grpc/v2/AbstractMessingActivityTest.java |  2 +-
 7 files changed, 108 insertions(+), 55 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 16c137dd2e..928bd397e1 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -472,7 +472,7 @@ public abstract class AbstractSendMessageProcessor 
implements NettyRequestProces
             return response;
         }
 
-        TopicValidator.ValidateTopicResult result = 
TopicValidator.validateTopic(requestHeader.getTopic());
+        TopicValidator.ValidateResult result = 
TopicValidator.validateTopic(requestHeader.getTopic());
         if (!result.isValid()) {
             response.setCode(ResponseCode.INVALID_PARAMETER);
             response.setRemark(result.getRemark());
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 87c7d24ca8..4eb78fc1c2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -556,7 +556,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
 
         long executionTime;
         try {
-            TopicValidator.ValidateTopicResult result = 
TopicValidator.validateTopic(topic);
+            TopicValidator.ValidateResult result = 
TopicValidator.validateTopic(topic);
             if (!result.isValid()) {
                 response.setCode(ResponseCode.INVALID_PARAMETER);
                 response.setRemark(result.getRemark());
@@ -646,7 +646,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             // Valid topics
             for (TopicConfig topicConfig : topicConfigList) {
                 String topic = topicConfig.getTopicName();
-                TopicValidator.ValidateTopicResult result = 
TopicValidator.validateTopic(topic);
+                TopicValidator.ValidateResult result = 
TopicValidator.validateTopic(topic);
                 if (!result.isValid()) {
                     response.setCode(ResponseCode.INVALID_PARAMETER);
                     response.setRemark(result.getRemark());
@@ -716,7 +716,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
 
         String topic = requestHeader.getTopic();
 
-        TopicValidator.ValidateTopicResult result = 
TopicValidator.validateTopic(topic);
+        TopicValidator.ValidateResult result = 
TopicValidator.validateTopic(topic);
         if (!result.isValid()) {
             response.setCode(ResponseCode.INVALID_PARAMETER);
             response.setRemark(result.getRemark());
@@ -1532,14 +1532,23 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
 
         SubscriptionGroupConfig config = 
RemotingSerializable.decode(request.getBody(), SubscriptionGroupConfig.class);
-        if (config != null) {
+        if (null != config) {
+            TopicValidator.ValidateResult result = 
TopicValidator.validateGroup(config.getGroupName());
+            if (!result.isValid()) {
+                response.setCode(ResponseCode.INVALID_PARAMETER);
+                response.setRemark(result.getRemark());
+                return response;
+            }
+
             
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config);
         }
 
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
         long executionTime = System.currentTimeMillis() - startTime;
-        LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms", 
config.getGroupName(), executionTime);
+        if (null != config) {
+            LOGGER.info("executionTime of create subscriptionGroup:{} is {} 
ms", config.getGroupName(), executionTime);
+        }
         InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
             InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
         Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
@@ -1551,12 +1560,19 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
 
     private RemotingCommand 
updateAndCreateSubscriptionGroupList(ChannelHandlerContext ctx, RemotingCommand 
request) {
         final long startTime = System.nanoTime();
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
 
         final SubscriptionGroupList subscriptionGroupList = 
SubscriptionGroupList.decode(request.getBody(), SubscriptionGroupList.class);
         final List<SubscriptionGroupConfig> groupConfigList = 
subscriptionGroupList.getGroupConfigList();
 
         final StringBuilder builder = new StringBuilder();
         for (SubscriptionGroupConfig config : groupConfigList) {
+            TopicValidator.ValidateResult result = 
TopicValidator.validateGroup(config.getGroupName());
+            if (!result.isValid()) {
+                response.setCode(ResponseCode.INVALID_PARAMETER);
+                response.setRemark(result.getRemark());
+                return response;
+            }
             builder.append(config.getGroupName()).append(";");
         }
         final String groupNames = builder.toString();
@@ -1564,7 +1580,6 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             groupNames,
             RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
 
-        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         try {
             
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfigList(groupConfigList);
             response.setCode(ResponseCode.SUCCESS);
@@ -2058,13 +2073,13 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
     /**
      * Reset consumer offset.
      *
-     * @param topic Required, not null.
-     * @param group Required, not null.
-     * @param queueId if target queue ID is negative, all message queues will 
be reset; otherwise, only the target queue
-     * would get reset.
+     * @param topic     Required, not null.
+     * @param group     Required, not null.
+     * @param queueId   if target queue ID is negative, all message queues 
will be reset; otherwise, only the target queue
+     *                  would get reset.
      * @param timestamp if timestamp is negative, offset would be reset to 
broker offset at the time being; otherwise,
-     * binary search is performed to locate target offset.
-     * @param offset Target offset to reset to if target queue ID is properly 
provided.
+     *                  binary search is performed to locate target offset.
+     * @param offset    Target offset to reset to if target queue ID is 
properly provided.
      * @return Affected queues and their new offset
      */
     private RemotingCommand resetOffsetInner(String topic, String group, int 
queueId, long timestamp, Long offset) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index c7083365be..e860f29074 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -17,8 +17,9 @@
 package org.apache.rocketmq.broker.subscription;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Maps;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -27,15 +28,11 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
-
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
-import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.SubscriptionGroupAttributes;
@@ -48,6 +45,7 @@ import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 
+@SuppressWarnings("Duplicates")
 public class SubscriptionGroupManager extends ConfigManager {
     protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -191,10 +189,6 @@ public class SubscriptionGroupManager extends 
ConfigManager {
 
     /**
      * set the bit value to 1 at the specific index (from 0)
-     *
-     * @param group
-     * @param topic
-     * @param forbiddenIndex from 0
      */
     public void setForbidden(String group, String topic, int forbiddenIndex) {
         int topicForbidden = getForbidden(group, topic);
@@ -204,10 +198,6 @@ public class SubscriptionGroupManager extends 
ConfigManager {
 
     /**
      * clear the bit value to 0 at the specific index (from 0)
-     *
-     * @param group
-     * @param topic
-     * @param forbiddenIndex from 0
      */
     public void clearForbidden(String group, String topic, int forbiddenIndex) 
{
         int topicForbidden = getForbidden(group, topic);
@@ -270,7 +260,8 @@ public class SubscriptionGroupManager extends ConfigManager 
{
         if (null == subscriptionGroupConfig) {
             if 
(brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup()
                     || MixAll.isSysConsumerGroupAndEnableCreate(group, 
brokerController.getBrokerConfig().isEnableCreateSysGroup())) {
-                if (group.length() > Validators.CHARACTER_MAX_LENGTH || 
TopicValidator.isTopicOrGroupIllegal(group)) {
+                TopicValidator.ValidateResult result = 
TopicValidator.validateGroup(group);
+                if (!result.isValid()) {
                     return null;
                 }
                 subscriptionGroupConfig = new SubscriptionGroupConfig();
@@ -319,9 +310,7 @@ public class SubscriptionGroupManager extends ConfigManager 
{
     }
 
     private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager 
sgm) {
-        Iterator<Entry<String, SubscriptionGroupConfig>> it = 
sgm.getSubscriptionGroupTable().entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, SubscriptionGroupConfig> next = it.next();
+        for (Entry<String, SubscriptionGroupConfig> next : 
sgm.getSubscriptionGroupTable().entrySet()) {
             log.info("load exist subscription group, {}", 
next.getValue().toString());
         }
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java 
b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index 77e4bbd238..7f588d56ea 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -38,6 +38,11 @@ import static 
org.apache.rocketmq.common.topic.TopicValidator.isTopicOrGroupIlle
 public class Validators {
     public static final int CHARACTER_MAX_LENGTH = 255;
     public static final int TOPIC_MAX_LENGTH = 127;
+    /*
+     * Group name max length is 120, for it will be used to make up retry and 
DLQ topic,
+     * like pull retry: %RETRY%group_topic and pop retry: %RETRY%group_topic.
+     */
+    public static final int GROUP_MAX_LENGTH = 120;
 
     /**
      * Validate group
@@ -47,11 +52,10 @@ public class Validators {
             throw new MQClientException("the specified group is blank", null);
         }
 
-        if (group.length() > CHARACTER_MAX_LENGTH) {
-            throw new MQClientException("the specified group is longer than 
group max length 255.", null);
+        if (group.length() > GROUP_MAX_LENGTH) {
+            throw new MQClientException(String.format("the specified group[%s] 
is longer than group max length: %s.", group, GROUP_MAX_LENGTH), null);
         }
 
-
         if (isTopicOrGroupIllegal(group)) {
             throw new MQClientException(String.format(
                     "the specified group[%s] contains illegal characters, 
allowing only %s", group,
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java 
b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
index 1efb508664..47d45c6dfe 100644
--- a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
+++ b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
@@ -39,12 +39,17 @@ public class TopicValidator {
 
     public static final boolean[] VALID_CHAR_BIT_MAP = new boolean[128];
     private static final int TOPIC_MAX_LENGTH = 127;
+    /*
+     * Group name max length is 120, for it will be used to make up retry and 
DLQ topic,
+     * like pull retry: %RETRY%group_topic and pop retry: %RETRY%group_topic.
+     */
+    private static final int GROUP_MAX_LENGTH = 120;
     private static final int RETRY_OR_DLQ_TOPIC_MAX_LENGTH = 255;
 
     private static final Set<String> SYSTEM_TOPIC_SET = new HashSet<>();
 
     /**
-     * Topics'set which client can not send msg!
+     * Topic set which client can not send msg!
      */
     private static final Set<String> NOT_ALLOWED_SEND_TOPIC_SET = new 
HashSet<>();
 
@@ -93,44 +98,65 @@ public class TopicValidator {
     public static boolean isTopicOrGroupIllegal(String str) {
         int strLen = str.length();
         int len = VALID_CHAR_BIT_MAP.length;
-        boolean[] bitMap = VALID_CHAR_BIT_MAP;
         for (int i = 0; i < strLen; i++) {
             char ch = str.charAt(i);
-            if (ch >= len || !bitMap[ch]) {
+            if (ch >= len || !VALID_CHAR_BIT_MAP[ch]) {
                 return true;
             }
         }
         return false;
     }
 
-    public static ValidateTopicResult validateTopic(String topic) {
+    public static ValidateResult validateTopic(String topic) {
 
         if (UtilAll.isBlank(topic)) {
-            return new ValidateTopicResult(false, "The specified topic is 
blank.");
+            return new ValidateResult(false, "The specified topic is blank.");
         }
 
         if (isTopicOrGroupIllegal(topic)) {
-            return new ValidateTopicResult(false, "The specified topic 
contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$");
+            String falseRemark = "The specified topic: " + topic + ", contains 
illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$";
+            return new ValidateResult(false, falseRemark);
         }
 
         if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || 
topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
             if (topic.length() > RETRY_OR_DLQ_TOPIC_MAX_LENGTH) {
-                return new ValidateTopicResult(false, "The specified topic is 
longer than topic max length.");
+                String falseRemark = "The specified topic is DLQ or Retry 
topic: " + topic + ", and it's longer than topic max length: " + 
RETRY_OR_DLQ_TOPIC_MAX_LENGTH;
+                return new ValidateResult(false, falseRemark);
             }
         } else {
             if (topic.length() > TOPIC_MAX_LENGTH) {
-                return new ValidateTopicResult(false, "The specified topic is 
longer than topic max length.");
+                String falseRemark = "The specified topic: " + topic + ", is 
longer than topic max length: " + TOPIC_MAX_LENGTH;
+                return new ValidateResult(false, falseRemark);
             }
         }
 
-        return new ValidateTopicResult(true, "");
+        return new ValidateResult(true, "");
+    }
+
+    public static ValidateResult validateGroup(String group) {
+
+        if (UtilAll.isBlank(group)) {
+            return new ValidateResult(false, "The specified group is blank.");
+        }
+
+        if (isTopicOrGroupIllegal(group)) {
+            String falseRemark = "The specified group: " + group + ", contains 
illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$";
+            return new ValidateResult(false, falseRemark);
+        }
+
+        if (group.length() > GROUP_MAX_LENGTH) {
+            String falseRemark = "The specified group: " + group + ", is 
longer than group max length: " + GROUP_MAX_LENGTH;
+            return new ValidateResult(false, falseRemark);
+        }
+
+        return new ValidateResult(true, "");
     }
 
-    public static class ValidateTopicResult {
+    public static class ValidateResult {
         private final boolean valid;
         private final String remark;
 
-        public ValidateTopicResult(boolean valid, String remark) {
+        public ValidateResult(boolean valid, String remark) {
             this.valid = valid;
             this.remark = remark;
         }
diff --git 
a/common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java 
b/common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java
index 1655eaab2b..0541c7f6d2 100644
--- 
a/common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java
+++ 
b/common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java
@@ -24,34 +24,26 @@ public class TopicValidatorTest {
 
     @Test
     public void testTopicValidator_NotPass() {
-        TopicValidator.ValidateTopicResult res = 
TopicValidator.validateTopic("");
+        TopicValidator.ValidateResult res = TopicValidator.validateTopic("");
         assertThat(res.isValid()).isFalse();
         assertThat(res.getRemark()).contains("The specified topic is blank");
 
         res = TopicValidator.validateTopic("../TopicTest");
         assertThat(res.isValid()).isFalse();
-        assertThat(res.getRemark()).contains("The specified topic contains 
illegal characters");
 
         res = TopicValidator.validateTopic(generateString(128));
         assertThat(res.isValid()).isFalse();
-        assertThat(res.getRemark()).contains("The specified topic is longer 
than topic max length.");
-
-        res = TopicValidator.validateTopic(generateString2(128));
-        assertThat(res.isValid()).isFalse();
-        assertThat(res.getRemark()).contains("The specified topic is longer 
than topic max length.");
 
         res = TopicValidator.validateTopic(generateRetryTopic(256));
         assertThat(res.isValid()).isFalse();
-        assertThat(res.getRemark()).contains("The specified topic is longer 
than topic max length.");
 
         res = TopicValidator.validateTopic(generateDlqTopic(256));
         assertThat(res.isValid()).isFalse();
-        assertThat(res.getRemark()).contains("The specified topic is longer 
than topic max length.");
     }
 
     @Test
     public void testTopicValidator_Pass() {
-        TopicValidator.ValidateTopicResult res = 
TopicValidator.validateTopic("TestTopic");
+        TopicValidator.ValidateResult res = 
TopicValidator.validateTopic("TestTopic");
         assertThat(res.isValid()).isTrue();
         assertThat(res.getRemark()).isEmpty();
 
@@ -68,6 +60,33 @@ public class TopicValidatorTest {
         assertThat(res.getRemark()).isEmpty();
     }
 
+    @Test
+    public void testGroupValidator_Pass() {
+        TopicValidator.ValidateResult res = 
TopicValidator.validateGroup("TestGroup");
+        assertThat(res.isValid()).isTrue();
+        assertThat(res.getRemark()).isEmpty();
+
+        res = TopicValidator.validateGroup(generateString2(120));
+        assertThat(res.isValid()).isTrue();
+        assertThat(res.getRemark()).isEmpty();
+    }
+
+    @Test
+    public void testGroupValidator__NotPass() {
+        TopicValidator.ValidateResult res = TopicValidator.validateGroup("");
+        assertThat(res.isValid()).isFalse();
+        assertThat(res.getRemark()).contains("The specified group is blank");
+
+        res = TopicValidator.validateGroup("../GroupTest");
+        assertThat(res.isValid()).isFalse();
+
+        res = TopicValidator.validateGroup(generateString(120));
+        assertThat(res.isValid()).isFalse();
+
+        res = TopicValidator.validateGroup(generateString2(121));
+        assertThat(res.isValid()).isFalse();
+    }
+
     @Test
     public void testAddSystemTopic() {
         String topic = "SYSTEM_TOPIC_TEST";
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivityTest.java
index 3c2967357f..f31a95770c 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivityTest.java
@@ -64,7 +64,7 @@ public class AbstractMessingActivityTest extends 
InitConfigTest {
         assertThrows(GrpcProxyException.class, () -> 
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(MixAll.CID_SYS_RMQ_TRANS).build()));
         assertThrows(GrpcProxyException.class, () -> 
messingActivity.validateConsumerGroup(Resource.newBuilder().setName("@").build()));
         assertThrows(GrpcProxyException.class, () -> 
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(256)).build()));
-        
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(255)).build());
+        
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(120)).build());
     }
 
     private static String createString(int len) {

Reply via email to