This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new aaa4a4b5fc Add attribute for SubscriptionGroupConfig (#6891)
aaa4a4b5fc is described below

commit aaa4a4b5fcc1f3b395264375b9067303820cd38c
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Wed Jun 14 11:46:46 2023 +0800

    Add attribute for SubscriptionGroupConfig (#6891)
---
 .../subscription/SubscriptionGroupManager.java     |  36 +++++-
 .../rocketmq/broker/topic/TopicConfigManager.java  | 106 +----------------
 .../subscription/SubscriptionGroupManagerTest.java |  77 ++++++++++++
 .../broker/topic/TopicConfigManagerTest.java       |  17 ++-
 .../common/SubscriptionGroupAttributes.java        |  29 +++++
 .../rocketmq/common/attribute/AttributeUtil.java   | 132 +++++++++++++++++++++
 .../subscription/SubscriptionGroupConfig.java      |  18 ++-
 .../command/consumer/UpdateSubGroupSubCommand.java |  12 ++
 8 files changed, 312 insertions(+), 115 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 808f370588..db8c8b6f23 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -16,7 +16,10 @@
  */
 package org.apache.rocketmq.broker.subscription;
 
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -26,11 +29,13 @@ import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.SubscriptionGroupAttributes;
+import org.apache.rocketmq.common.attribute.AttributeUtil;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
-import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 
@@ -111,6 +116,17 @@ public class SubscriptionGroupManager extends 
ConfigManager {
     }
 
     public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig 
config) {
+        Map<String, String> newAttributes = request(config);
+        Map<String, String> currentAttributes = current(config.getGroupName());
+
+        Map<String, String> finalAttributes = 
AttributeUtil.alterCurrentAttributes(
+            this.subscriptionGroupTable.get(config.getGroupName()) == null,
+            SubscriptionGroupAttributes.ALL,
+            ImmutableMap.copyOf(currentAttributes),
+            ImmutableMap.copyOf(newAttributes));
+
+        config.setAttributes(finalAttributes);
+
         SubscriptionGroupConfig old = 
this.subscriptionGroupTable.put(config.getGroupName(), config);
         if (old != null) {
             log.info("update subscription group config, old: {} new: {}", old, 
config);
@@ -315,4 +331,22 @@ public class SubscriptionGroupManager extends 
ConfigManager {
 
         return subscriptionGroupTable.containsKey(group);
     }
+
+    private Map<String, String> request(SubscriptionGroupConfig 
subscriptionGroupConfig) {
+        return subscriptionGroupConfig.getAttributes() == null ? new 
HashMap<>() : subscriptionGroupConfig.getAttributes();
+    }
+
+    private Map<String, String> current(String groupName) {
+        SubscriptionGroupConfig subscriptionGroupConfig = 
this.subscriptionGroupTable.get(groupName);
+        if (subscriptionGroupConfig == null) {
+            return new HashMap<>();
+        } else {
+            Map<String, String> attributes = 
subscriptionGroupConfig.getAttributes();
+            if (attributes == null) {
+                return new HashMap<>();
+            } else {
+                return attributes;
+            }
+        }
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 16140d4cd8..e5fdd8675f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -16,10 +16,8 @@
  */
 package org.apache.rocketmq.broker.topic;
 
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -38,6 +36,7 @@ import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.TopicAttributes;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.attribute.Attribute;
+import org.apache.rocketmq.common.attribute.AttributeUtil;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
@@ -466,8 +465,9 @@ public class TopicConfigManager extends ConfigManager {
         Map<String, String> newAttributes = request(topicConfig);
         Map<String, String> currentAttributes = 
current(topicConfig.getTopicName());
 
-        Map<String, String> finalAttributes = alterCurrentAttributes(
+        Map<String, String> finalAttributes = 
AttributeUtil.alterCurrentAttributes(
             this.topicConfigTable.get(topicConfig.getTopicName()) == null,
+            TopicAttributes.ALL,
             ImmutableMap.copyOf(currentAttributes),
             ImmutableMap.copyOf(newAttributes));
 
@@ -628,106 +628,6 @@ public class TopicConfigManager extends ConfigManager {
         }
     }
 
-    private Map<String, String> alterCurrentAttributes(boolean create, 
ImmutableMap<String, String> currentAttributes,
-        ImmutableMap<String, String> newAttributes) {
-        Map<String, String> init = new HashMap<>();
-        Map<String, String> add = new HashMap<>();
-        Map<String, String> update = new HashMap<>();
-        Map<String, String> delete = new HashMap<>();
-        Set<String> keys = new HashSet<>();
-
-        for (Entry<String, String> attribute : newAttributes.entrySet()) {
-            String key = attribute.getKey();
-            String realKey = realKey(key);
-            String value = attribute.getValue();
-
-            validate(realKey);
-            duplicationCheck(keys, realKey);
-
-            if (create) {
-                if (key.startsWith("+")) {
-                    init.put(realKey, value);
-                } else {
-                    throw new RuntimeException("only add attribute is 
supported while creating topic. key: " + realKey);
-                }
-            } else {
-                if (key.startsWith("+")) {
-                    if (!currentAttributes.containsKey(realKey)) {
-                        add.put(realKey, value);
-                    } else {
-                        update.put(realKey, value);
-                    }
-                } else if (key.startsWith("-")) {
-                    if (!currentAttributes.containsKey(realKey)) {
-                        throw new RuntimeException("attempt to delete a 
nonexistent key: " + realKey);
-                    }
-                    delete.put(realKey, value);
-                } else {
-                    throw new RuntimeException("wrong format key: " + realKey);
-                }
-            }
-        }
-
-        validateAlter(init, true, false);
-        validateAlter(add, false, false);
-        validateAlter(update, false, false);
-        validateAlter(delete, false, true);
-
-        log.info("add: {}, update: {}, delete: {}", add, update, delete);
-        HashMap<String, String> finalAttributes = new 
HashMap<>(currentAttributes);
-        finalAttributes.putAll(init);
-        finalAttributes.putAll(add);
-        finalAttributes.putAll(update);
-        for (String s : delete.keySet()) {
-            finalAttributes.remove(s);
-        }
-        return finalAttributes;
-    }
-
-    private void duplicationCheck(Set<String> keys, String key) {
-        boolean notExist = keys.add(key);
-        if (!notExist) {
-            throw new RuntimeException("alter duplication key. key: " + key);
-        }
-    }
-
-    private void validate(String kvAttribute) {
-        if (Strings.isNullOrEmpty(kvAttribute)) {
-            throw new RuntimeException("kv string format wrong.");
-        }
-
-        if (kvAttribute.contains("+")) {
-            throw new RuntimeException("kv string format wrong.");
-        }
-
-        if (kvAttribute.contains("-")) {
-            throw new RuntimeException("kv string format wrong.");
-        }
-    }
-
-    private void validateAlter(Map<String, String> alter, boolean init, 
boolean delete) {
-        for (Entry<String, String> entry : alter.entrySet()) {
-            String key = entry.getKey();
-            String value = entry.getValue();
-
-            Attribute attribute = allAttributes().get(key);
-            if (attribute == null) {
-                throw new RuntimeException("unsupported key: " + key);
-            }
-            if (!init && !attribute.isChangeable()) {
-                throw new RuntimeException("attempt to update an unchangeable 
attribute. key: " + key);
-            }
-
-            if (!delete) {
-                attribute.verify(value);
-            }
-        }
-    }
-
-    private String realKey(String key) {
-        return key.substring(1);
-    }
-
     public boolean containsTopic(String topic) {
         return topicConfigTable.containsKey(topic);
     }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
new file mode 100644
index 0000000000..6337c69ea7
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.subscription;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.SubscriptionGroupAttributes;
+import org.apache.rocketmq.common.attribute.BooleanAttribute;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SubscriptionGroupManagerTest {
+    private String group = "group";
+    @Mock
+    private BrokerController brokerControllerMock;
+    private SubscriptionGroupManager subscriptionGroupManager;
+
+    @Before
+    public void before() {
+        SubscriptionGroupAttributes.ALL.put("test", new BooleanAttribute(
+            "test",
+            false,
+            false
+        ));
+        subscriptionGroupManager = spy(new 
SubscriptionGroupManager(brokerControllerMock));
+        when(brokerControllerMock.getMessageStore()).thenReturn(null);
+        doNothing().when(subscriptionGroupManager).persist();
+    }
+
+    @Test
+    public void updateSubscriptionGroupConfig() {
+        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig.setGroupName(group);
+        Map<String, String> attr = ImmutableMap.of("+test", "true");
+        subscriptionGroupConfig.setAttributes(attr);
+        
subscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig);
+        SubscriptionGroupConfig result = 
subscriptionGroupManager.getSubscriptionGroupTable().get(group);
+        assertThat(result).isNotNull();
+        assertThat(result.getGroupName()).isEqualTo(group);
+        assertThat(result.getAttributes().get("test")).isEqualTo("true");
+
+
+        SubscriptionGroupConfig subscriptionGroupConfig1 = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig1.setGroupName(group);
+        Map<String, String> attrRemove = ImmutableMap.of("-test", "");
+        subscriptionGroupConfig1.setAttributes(attrRemove);
+        assertThatThrownBy(() -> 
subscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig1))
+            .isInstanceOf(RuntimeException.class).hasMessage("attempt to 
update an unchangeable attribute. key: test");
+    }
+}
\ No newline at end of file
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index b77c44961a..6052a79d41 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -16,18 +16,22 @@
  */
 package org.apache.rocketmq.broker.topic;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.attribute.Attribute;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicAttributes;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.Attribute;
 import org.apache.rocketmq.common.attribute.BooleanAttribute;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.attribute.EnumAttribute;
 import org.apache.rocketmq.common.attribute.LongRangeAttribute;
 import org.apache.rocketmq.common.utils.QueueTypeUtils;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.common.attribute.CQType;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,14 +39,8 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
 import static com.google.common.collect.Sets.newHashSet;
 import static java.util.Arrays.asList;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -318,7 +316,6 @@ public class TopicConfigManagerTest {
             supportedAttributes.put(supportAttribute.getName(), 
supportAttribute);
         }
 
-        topicConfigManager = spy(topicConfigManager);
-        
when(topicConfigManager.allAttributes()).thenReturn(supportedAttributes);
+        TopicAttributes.ALL.putAll(supportedAttributes);
     }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
 
b/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
new file mode 100644
index 0000000000..5b0072401c
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.common.attribute.Attribute;
+
+public class SubscriptionGroupAttributes {
+    public static final Map<String, Attribute> ALL;
+
+    static {
+        ALL = new HashMap<>();
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeUtil.java 
b/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeUtil.java
new file mode 100644
index 0000000000..a3646988c5
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeUtil.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.attribute;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+public class AttributeUtil {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+    public static Map<String, String> alterCurrentAttributes(boolean create, 
Map<String, Attribute> all,
+        ImmutableMap<String, String> currentAttributes, ImmutableMap<String, 
String> newAttributes) {
+
+        Map<String, String> init = new HashMap<>();
+        Map<String, String> add = new HashMap<>();
+        Map<String, String> update = new HashMap<>();
+        Map<String, String> delete = new HashMap<>();
+        Set<String> keys = new HashSet<>();
+
+        for (Map.Entry<String, String> attribute : newAttributes.entrySet()) {
+            String key = attribute.getKey();
+            String realKey = realKey(key);
+            String value = attribute.getValue();
+
+            validate(realKey);
+            duplicationCheck(keys, realKey);
+
+            if (create) {
+                if (key.startsWith("+")) {
+                    init.put(realKey, value);
+                } else {
+                    throw new RuntimeException("only add attribute is 
supported while creating topic. key: " + realKey);
+                }
+            } else {
+                if (key.startsWith("+")) {
+                    if (!currentAttributes.containsKey(realKey)) {
+                        add.put(realKey, value);
+                    } else {
+                        update.put(realKey, value);
+                    }
+                } else if (key.startsWith("-")) {
+                    if (!currentAttributes.containsKey(realKey)) {
+                        throw new RuntimeException("attempt to delete a 
nonexistent key: " + realKey);
+                    }
+                    delete.put(realKey, value);
+                } else {
+                    throw new RuntimeException("wrong format key: " + realKey);
+                }
+            }
+        }
+
+        validateAlter(all, init, true, false);
+        validateAlter(all, add, false, false);
+        validateAlter(all, update, false, false);
+        validateAlter(all, delete, false, true);
+
+        log.info("add: {}, update: {}, delete: {}", add, update, delete);
+        HashMap<String, String> finalAttributes = new 
HashMap<>(currentAttributes);
+        finalAttributes.putAll(init);
+        finalAttributes.putAll(add);
+        finalAttributes.putAll(update);
+        for (String s : delete.keySet()) {
+            finalAttributes.remove(s);
+        }
+        return finalAttributes;
+    }
+
+    private static void duplicationCheck(Set<String> keys, String key) {
+        boolean notExist = keys.add(key);
+        if (!notExist) {
+            throw new RuntimeException("alter duplication key. key: " + key);
+        }
+    }
+
+    private static void validate(String kvAttribute) {
+        if (Strings.isNullOrEmpty(kvAttribute)) {
+            throw new RuntimeException("kv string format wrong.");
+        }
+
+        if (kvAttribute.contains("+")) {
+            throw new RuntimeException("kv string format wrong.");
+        }
+
+        if (kvAttribute.contains("-")) {
+            throw new RuntimeException("kv string format wrong.");
+        }
+    }
+
+    private static void validateAlter(Map<String, Attribute> all, Map<String, 
String> alter, boolean init, boolean delete) {
+        for (Map.Entry<String, String> entry : alter.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+
+            Attribute attribute = all.get(key);
+            if (attribute == null) {
+                throw new RuntimeException("unsupported key: " + key);
+            }
+            if (!init && !attribute.isChangeable()) {
+                throw new RuntimeException("attempt to update an unchangeable 
attribute. key: " + key);
+            }
+
+            if (!delete) {
+                attribute.verify(value);
+            }
+        }
+    }
+
+    private static String realKey(String key) {
+        return key.substring(1);
+    }
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
index 799c7492e9..5522059aaf 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
@@ -18,6 +18,8 @@
 package org.apache.rocketmq.remoting.protocol.subscription;
 
 import com.google.common.base.MoreObjects;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.rocketmq.common.MixAll;
@@ -49,6 +51,8 @@ public class SubscriptionGroupConfig {
 
     private Set<SimpleSubscriptionData> subscriptionDataSet;
 
+    private Map<String, String> attributes = new HashMap<>();
+
     public String getGroupName() {
         return groupName;
     }
@@ -161,6 +165,14 @@ public class SubscriptionGroupConfig {
         this.subscriptionDataSet = subscriptionDataSet;
     }
 
+    public Map<String, String> getAttributes() {
+        return attributes;
+    }
+
+    public void setAttributes(Map<String, String> attributes) {
+        this.attributes = attributes;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -178,6 +190,7 @@ public class SubscriptionGroupConfig {
         result = prime * result + groupSysFlag;
         result = prime * result + consumeTimeoutMinute;
         result = prime * result + subscriptionDataSet.hashCode();
+        result = prime * result + attributes.hashCode();
         return result;
     }
 
@@ -202,6 +215,7 @@ public class SubscriptionGroupConfig {
             .append(groupSysFlag, other.groupSysFlag)
             .append(consumeTimeoutMinute, other.consumeTimeoutMinute)
             .append(subscriptionDataSet, other.subscriptionDataSet)
+            .append(attributes, other.attributes)
             .isEquals();
     }
 
@@ -216,11 +230,13 @@ public class SubscriptionGroupConfig {
             .add("retryQueueNums", retryQueueNums)
             .add("retryMaxTimes", retryMaxTimes)
             .add("groupRetryPolicy", groupRetryPolicy)
+            .add("brokerId", brokerId)
             .add("whichBrokerWhenConsumeSlowly", whichBrokerWhenConsumeSlowly)
             .add("notifyConsumerIdsChangedEnable", 
notifyConsumerIdsChangedEnable)
             .add("groupSysFlag", groupSysFlag)
             .add("consumeTimeoutMinute", consumeTimeoutMinute)
-            .add("subscriptionTopicSet", subscriptionDataSet)
+            .add("subscriptionDataSet", subscriptionDataSet)
+            .add("attributes", attributes)
             .toString();
     }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
index fddf6015de..f87bafc930 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
@@ -17,10 +17,12 @@
 package org.apache.rocketmq.tools.command.consumer;
 
 import com.alibaba.fastjson.JSON;
+import java.util.Map;
 import java.util.Set;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.attribute.AttributeParser;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
@@ -99,6 +101,10 @@ public class UpdateSubGroupSubCommand implements SubCommand 
{
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option(null, "attributes", true, "attribute(+a=b,+c=d,-e)");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -177,6 +183,12 @@ public class UpdateSubGroupSubCommand implements 
SubCommand {
                     .getOptionValue('a').trim()));
             }
 
+            if (commandLine.hasOption("attributes")) {
+                String attributesModification = 
commandLine.getOptionValue("attributes").trim();
+                Map<String, String> attributes = 
AttributeParser.parseToMap(attributesModification);
+                subscriptionGroupConfig.setAttributes(attributes);
+            }
+
             if (commandLine.hasOption('b')) {
                 String addr = commandLine.getOptionValue('b').trim();
 

Reply via email to