This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch 5.0.0-alpha in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 18010a4d30e43a76e633a01fe0476f44de763c83 Author: Hongjian Fei <[email protected]> AuthorDate: Wed Jan 5 16:02:53 2022 +0800 [ISSUE #3679] Support topic attributes (#3698) * [Feature] Support topic attributes. * [Feature] Topic attributes unit-test. --- .../broker/processor/AdminBrokerProcessor.java | 18 +- .../rocketmq/broker/topic/TopicConfigManager.java | 143 +++++++++ .../broker/topic/TopicConfigManagerTest.java | 318 +++++++++++++++++++++ .../rocketmq/client/impl/MQClientAPIImpl.java | 3 + common/pom.xml | 13 - .../apache/rocketmq/common/TopicAttributes.java | 33 ++- .../org/apache/rocketmq/common/TopicConfig.java | 50 ++-- .../rocketmq/common/attribute/Attribute.java | 38 ++- .../rocketmq/common/attribute/AttributeParser.java | 79 +++++ .../common/attribute/BooleanAttribute.java | 32 ++- .../rocketmq/common/attribute/EnumAttribute.java | 32 ++- .../common/attribute/LongRangeAttribute.java | 35 ++- .../protocol/header/CreateTopicRequestHeader.java | 11 + .../common/attribute/AttributeParserTest.java | 70 +++++ .../rocketmq/common/attribute/AttributeTest.java | 70 +++++ .../apache/rocketmq/store/util/QueueTypeUtils.java | 20 ++ .../tools/command/topic/UpdateTopicSubCommand.java | 15 + 17 files changed, 873 insertions(+), 107 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index f30953d..6505263 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -33,12 +33,14 @@ import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; +import org.apache.rocketmq.common.TopicAttributes; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.attribute.AttributeParser; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; @@ -294,12 +296,18 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum()); topicConfig.setPerm(requestHeader.getPerm()); topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag()); + String attributesModification = requestHeader.getAttributes(); + topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification)); - this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - - this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); - - response.setCode(ResponseCode.SUCCESS); + try { + this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); + this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); + response.setCode(ResponseCode.SUCCESS); + } catch (Exception e) { + log.error("Update / create topic failed for [{}]", request, e); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(e.getMessage()); + } return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 4af230d..ba40538 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.broker.topic; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -25,11 +27,16 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; +import org.apache.rocketmq.common.attribute.Attribute; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicAttributes; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; @@ -40,6 +47,8 @@ import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import static com.google.common.base.Preconditions.checkNotNull; + public class TopicConfigManager extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long LOCK_TIMEOUT_MILLIS = 3000; @@ -353,6 +362,18 @@ public class TopicConfigManager extends ConfigManager { } public void updateTopicConfig(final TopicConfig topicConfig) { + checkNotNull(topicConfig, "topicConfig shouldn't be null"); + + Map<String, String> newAttributes = request(topicConfig); + Map<String, String> currentAttributes = current(topicConfig.getTopicName()); + + Map<String, String> finalAttributes = alterCurrentAttributes( + this.topicConfigTable.get(topicConfig.getTopicName()) == null, + ImmutableMap.copyOf(currentAttributes), + ImmutableMap.copyOf(newAttributes)); + + topicConfig.setAttributes(finalAttributes); + TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); if (old != null) { log.info("update topic config, old:[{}] new:[{}]", old, topicConfig); @@ -398,6 +419,11 @@ public class TopicConfigManager extends ConfigManager { } } + // make it testable + public Map<String, Attribute> allAttributes() { + return TopicAttributes.ALL; + } + public boolean isOrderTopic(final String topic) { TopicConfig topicConfig = this.topicConfigTable.get(topic); if (topicConfig == null) { @@ -471,4 +497,121 @@ public class TopicConfigManager extends ConfigManager { public ConcurrentMap<String, TopicConfig> getTopicConfigTable() { return topicConfigTable; } + + private Map<String, String> request(TopicConfig topicConfig) { + return topicConfig.getAttributes() == null ? new HashMap<>() : topicConfig.getAttributes(); + } + + private Map<String, String> current(String topic) { + TopicConfig topicConfig = this.topicConfigTable.get(topic); + if (topicConfig == null) { + return new HashMap<>(); + } else { + Map<String, String> attributes = topicConfig.getAttributes(); + if (attributes == null) { + return new HashMap<>(); + } else { + return attributes; + } + } + } + + private Map<String, String> alterCurrentAttributes(boolean create, ImmutableMap<String, String> currentAttributes, ImmutableMap<String, String> newAttributes) { + Map<String, String> init = new HashMap<>(); + Map<String, String> add = new HashMap<>(); + Map<String, String> update = new HashMap<>(); + Map<String, String> delete = new HashMap<>(); + Set<String> keys = new HashSet<>(); + + for (Entry<String, String> attribute : newAttributes.entrySet()) { + String key = attribute.getKey(); + String realKey = realKey(key); + String value = attribute.getValue(); + + validate(realKey); + duplicationCheck(keys, realKey); + + if (create) { + if (key.startsWith("+")) { + init.put(realKey, value); + } else { + throw new RuntimeException("only add attribute is supported while creating topic. key: " + realKey); + } + } else { + if (key.startsWith("+")) { + if (!currentAttributes.containsKey(realKey)) { + add.put(realKey, value); + } else { + update.put(realKey, value); + } + } else if (key.startsWith("-")) { + if (!currentAttributes.containsKey(realKey)) { + throw new RuntimeException("attempt to delete a nonexistent key: " + realKey); + } + delete.put(realKey, value); + } else { + throw new RuntimeException("wrong format key: " + realKey); + } + } + } + + validateAlter(init, true, false); + validateAlter(add, false, false); + validateAlter(update, false, false); + validateAlter(delete, false, true); + + log.info("add: {}, update: {}, delete: {}", add, update, delete); + HashMap<String, String> finalAttributes = new HashMap<>(currentAttributes); + finalAttributes.putAll(init); + finalAttributes.putAll(add); + finalAttributes.putAll(update); + for (String s : delete.keySet()) { + finalAttributes.remove(s); + } + return finalAttributes; + } + + private void duplicationCheck(Set<String> keys, String key) { + boolean notExist = keys.add(key); + if (!notExist) { + throw new RuntimeException("alter duplication key. key: " + key); + } + } + + private void validate(String kvAttribute) { + if (Strings.isNullOrEmpty(kvAttribute)) { + throw new RuntimeException("kv string format wrong."); + } + + if (kvAttribute.contains("+")) { + throw new RuntimeException("kv string format wrong."); + } + + if (kvAttribute.contains("-")) { + throw new RuntimeException("kv string format wrong."); + } + } + + private void validateAlter(Map<String, String> alter, boolean init, boolean delete) { + for (Entry<String, String> entry : alter.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + + Attribute attribute = allAttributes().get(key); + if (attribute == null) { + throw new RuntimeException("unsupported key: " + key); + } + if (!init && !attribute.isChangeable()) { + throw new RuntimeException("attempt to update an unchangeable attribute. key: " + key); + } + + if (!delete) { + attribute.verify(value); + } + } + } + + private String realKey(String key) { + return key.substring(1); + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java new file mode 100644 index 0000000..9853c36 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.topic; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.attribute.Attribute; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.TopicAttributes; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.attribute.BooleanAttribute; +import org.apache.rocketmq.common.attribute.EnumAttribute; +import org.apache.rocketmq.common.attribute.LongRangeAttribute; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.queue.CQType; +import org.apache.rocketmq.store.util.QueueTypeUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.Sets.newHashSet; +import static java.util.Arrays.asList; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TopicConfigManagerTest { + private TopicConfigManager topicConfigManager; + @Mock + private BrokerController brokerController; + + @Before + public void init() { + BrokerConfig brokerConfig = new BrokerConfig(); + when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); + + topicConfigManager = new TopicConfigManager(brokerController); + } + + @Test + public void testAddUnsupportedKeyOnCreating() { + String unsupportedKey = "key4"; + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+enum.key", "enum-2"); + attributes.put("+" + unsupportedKey, "value1"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName("new-topic"); + topicConfig.setAttributes(attributes); + + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("unsupported key: " + unsupportedKey, runtimeException.getMessage()); + } + + @Test + public void testAddWrongFormatKeyOnCreating() { + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("++enum.key", "value1"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName("new-topic"); + topicConfig.setAttributes(attributes); + + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("kv string format wrong.", runtimeException.getMessage()); + } + + @Test + public void testDeleteKeyOnCreating() { + String key = "enum.key"; + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("-" + key, ""); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName("new-topic"); + topicConfig.setAttributes(attributes); + + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("only add attribute is supported while creating topic. key: " + key, runtimeException.getMessage()); + } + + @Test + public void testAddWrongValueOnCreating() { + Map<String, String> attributes = new HashMap<>(); + attributes.put("+" + TopicAttributes.queueType.getName(), "wrong-value"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName("new-topic"); + topicConfig.setAttributes(attributes); + + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("value is not in set: [SimpleCQ, BatchCQ]", runtimeException.getMessage()); + } + + @Test + public void testNormalAddKeyOnCreating() { + String topic = "new-topic"; + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+enum.key", "enum-2"); + attributes.put("+long.range.key", "16"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topic); + topicConfig.setAttributes(attributes); + topicConfigManager.updateTopicConfig(topicConfig); + + TopicConfig existingTopicConfig = topicConfigManager.getTopicConfigTable().get(topic); + Assert.assertEquals("enum-2", existingTopicConfig.getAttributes().get("enum.key")); + Assert.assertEquals("16", existingTopicConfig.getAttributes().get("long.range.key")); +// assert file + } + + @Test + public void testAddDuplicatedKeyOnUpdating() { + String duplicatedKey = "long.range.key"; + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + createTopic(); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+" + duplicatedKey, "11"); + attributes.put("-" + duplicatedKey, ""); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName("new-topic"); + topicConfig.setAttributes(attributes); + + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("alter duplication key. key: " + duplicatedKey, runtimeException.getMessage()); + } + + private void createTopic() { + Map<String, String> attributes = new HashMap<>(); + attributes.put("+enum.key", "enum-3"); + attributes.put("+bool.key", "true"); + attributes.put("+long.range.key", "12"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName("new-topic"); + topicConfig.setAttributes(attributes); + + topicConfigManager.updateTopicConfig(topicConfig); + } + + @Test + public void testDeleteNonexistentKeyOnUpdating() { + String key = "nonexisting.key"; + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+enum.key", "enum-2"); + attributes.put("+bool.key", "true"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName("new-topic"); + topicConfig.setAttributes(attributes); + + topicConfigManager.updateTopicConfig(topicConfig); + + attributes = new HashMap<>(); + attributes.clear(); + attributes.put("-" + key, ""); + topicConfig.setAttributes(attributes); + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("attempt to delete a nonexistent key: " + key, runtimeException.getMessage()); + } + + @Test + public void testAlterTopicWithoutChangingAttributes() { + String topic = "new-topic"; + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+enum.key", "enum-2"); + attributes.put("+bool.key", "true"); + + TopicConfig topicConfigInit = new TopicConfig(); + topicConfigInit.setTopicName(topic); + topicConfigInit.setAttributes(attributes); + + topicConfigManager.updateTopicConfig(topicConfigInit); + Assert.assertEquals("enum-2", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("enum.key")); + Assert.assertEquals("true", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("bool.key")); + + TopicConfig topicConfigAlter = new TopicConfig(); + topicConfigAlter.setTopicName(topic); + topicConfigAlter.setReadQueueNums(10); + topicConfigAlter.setWriteQueueNums(10); + topicConfigManager.updateTopicConfig(topicConfigAlter); + Assert.assertEquals("enum-2", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("enum.key")); + Assert.assertEquals("true", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("bool.key")); + } + + @Test + public void testNormalUpdateUnchangeableKeyOnUpdating() { + String topic = "exist-topic"; + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", true, false), + new LongRangeAttribute("long.range.key", false, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+long.range.key", "14"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topic); + topicConfig.setAttributes(attributes); + + topicConfigManager.updateTopicConfig(topicConfig); + + attributes.put("+long.range.key", "16"); + topicConfig.setAttributes(attributes); + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("attempt to update an unchangeable attribute. key: long.range.key", runtimeException.getMessage()); + } + + @Test + public void testNormalQueryKeyOnGetting() { + String topic = "exist-topic"; + String unchangeable = "bool.key"; + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+" + unchangeable, "true"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topic); + topicConfig.setAttributes(attributes); + + topicConfigManager.updateTopicConfig(topicConfig); + + TopicConfig topicConfigUpdated = topicConfigManager.getTopicConfigTable().get(topic); + Assert.assertEquals(CQType.SimpleCQ, QueueTypeUtils.getCQType(topicConfigUpdated)); + + Assert.assertEquals("true", topicConfigUpdated.getAttributes().get(unchangeable)); + } + + private void supportAttributes(List<Attribute> supportAttributes) { + Map<String, Attribute> supportedAttributes = new HashMap<>(); + + for (Attribute supportAttribute : supportAttributes) { + supportedAttributes.put(supportAttribute.getName(), supportAttribute); + } + + topicConfigManager = spy(topicConfigManager); + when(topicConfigManager.allAttributes()).thenReturn(supportedAttributes); + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 20b8f41..cd0d05b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -56,10 +56,12 @@ import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; +import org.apache.rocketmq.common.TopicAttributes; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.attribute.AttributeParser; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageClientIDSetter; @@ -332,6 +334,7 @@ public class MQClientAPIImpl { requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name()); requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag()); requestHeader.setOrder(topicConfig.isOrder()); + requestHeader.setAttributes(AttributeParser.parseToString(topicConfig.getAttributes())); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); diff --git a/common/pom.xml b/common/pom.xml index dd8ea6a..bbbc713 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -27,19 +27,6 @@ <artifactId>rocketmq-common</artifactId> <name>rocketmq-common ${project.version}</name> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>6</source> - <target>6</target> - </configuration> - </plugin> - </plugins> - </build> - <dependencies> <dependency> <groupId>${project.groupId}</groupId> diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java similarity index 56% copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java copy to common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java index 18e7f5a..9c1e96f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java @@ -14,22 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.store.util; +package org.apache.rocketmq.common; -import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.MessageStore; -import org.apache.rocketmq.store.StreamMessageStore; -import org.apache.rocketmq.store.queue.CQType; +import org.apache.rocketmq.common.attribute.Attribute; +import org.apache.rocketmq.common.attribute.EnumAttribute; -public class QueueTypeUtils { +import java.util.HashMap; +import java.util.Map; - public static CQType getCQType(MessageStore messageStore) { - if (messageStore instanceof DefaultMessageStore) { - return CQType.SimpleCQ; - } else if (messageStore instanceof StreamMessageStore) { - return CQType.BatchCQ; - } else { - throw new RuntimeException("new cq type is not supported now."); - } +import static com.google.common.collect.Sets.newHashSet; + +public class TopicAttributes { + public static final EnumAttribute queueType = new EnumAttribute( + "queue.type", + false, + newHashSet("BatchCQ", "SimpleCQ"), + "SimpleCQ" + ); + public static final Map<String, Attribute> ALL; + + static { + ALL = new HashMap<>(); + ALL.put(queueType.getName(), queueType); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java index ec4d54b..610c3e2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.common; import org.apache.rocketmq.common.constant.PermName; +import java.util.Map; + public class TopicConfig { private static final String SEPARATOR = " "; public static int defaultReadQueueNums = 16; @@ -29,6 +31,7 @@ public class TopicConfig { private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG; private int topicSysFlag = 0; private boolean order = false; + private Map<String, String> attributes; public TopicConfig() { } @@ -72,6 +75,8 @@ public class TopicConfig { sb.append(SEPARATOR); sb.append(this.topicFilterType); + // Leave the encode/decode [attributes] out for now + return sb.toString(); } @@ -150,29 +155,29 @@ public class TopicConfig { this.order = isOrder; } - @Override - public boolean equals(final Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; + public Map<String, String> getAttributes() { + return attributes; + } + + public void setAttributes(Map<String, String> attributes) { + this.attributes = attributes; + } - final TopicConfig that = (TopicConfig) o; + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; - if (readQueueNums != that.readQueueNums) - return false; - if (writeQueueNums != that.writeQueueNums) - return false; - if (perm != that.perm) - return false; - if (topicSysFlag != that.topicSysFlag) - return false; - if (order != that.order) - return false; - if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) - return false; - return topicFilterType == that.topicFilterType; + TopicConfig that = (TopicConfig) o; + if (readQueueNums != that.readQueueNums) return false; + if (writeQueueNums != that.writeQueueNums) return false; + if (perm != that.perm) return false; + if (topicSysFlag != that.topicSysFlag) return false; + if (order != that.order) return false; + if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) return false; + if (topicFilterType != that.topicFilterType) return false; + return attributes != null ? attributes.equals(that.attributes) : that.attributes == null; } @Override @@ -184,6 +189,7 @@ public class TopicConfig { result = 31 * result + (topicFilterType != null ? topicFilterType.hashCode() : 0); result = 31 * result + topicSysFlag; result = 31 * result + (order ? 1 : 0); + result = 31 * result + (attributes != null ? attributes.hashCode() : 0); return result; } @@ -191,7 +197,7 @@ public class TopicConfig { public String toString() { return "TopicConfig [topicName=" + topicName + ", readQueueNums=" + readQueueNums + ", writeQueueNums=" + writeQueueNums + ", perm=" + PermName.perm2String(perm) - + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order=" - + order + "]"; + + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order=" + order + + ", attributes=" + attributes + "]"; } } diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/attribute/Attribute.java similarity index 56% copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java copy to common/src/main/java/org/apache/rocketmq/common/attribute/Attribute.java index 18e7f5a..ba9be3b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/attribute/Attribute.java @@ -14,22 +14,32 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.store.util; +package org.apache.rocketmq.common.attribute; -import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.MessageStore; -import org.apache.rocketmq.store.StreamMessageStore; -import org.apache.rocketmq.store.queue.CQType; +public abstract class Attribute { + protected String name; + protected boolean changeable; -public class QueueTypeUtils { + public abstract void verify(String value); - public static CQType getCQType(MessageStore messageStore) { - if (messageStore instanceof DefaultMessageStore) { - return CQType.SimpleCQ; - } else if (messageStore instanceof StreamMessageStore) { - return CQType.BatchCQ; - } else { - throw new RuntimeException("new cq type is not supported now."); - } + public Attribute(String name, boolean changeable) { + this.name = name; + this.changeable = changeable; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public boolean isChangeable() { + return changeable; + } + + public void setChangeable(boolean changeable) { + this.changeable = changeable; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeParser.java b/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeParser.java new file mode 100644 index 0000000..7ee7afc --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeParser.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.attribute; + +import com.google.common.base.Joiner; +import com.google.common.base.Strings; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AttributeParser { + public static Map<String, String> parseToMap(String attributesModification) { + if (Strings.isNullOrEmpty(attributesModification)) { + return new HashMap<>(); + } + + // format: +key1=value1,+key2=value2,-key3,+key4=value4 + Map<String, String> attributes = new HashMap<>(); + String arraySeparator = ","; + String kvSeparator = "="; + String[] kvs = attributesModification.split(arraySeparator); + for (String kv : kvs) { + String key; + String value; + if (kv.contains(kvSeparator)) { + key = kv.split(kvSeparator)[0]; + value = kv.split(kvSeparator)[1]; + if (!key.contains("+")) { + throw new RuntimeException("add/alter attribute format is wrong: " + key); + } + } else { + key = kv; + value = ""; + if (!key.contains("-")) { + throw new RuntimeException("delete attribute format is wrong: " + key); + } + } + String old = attributes.put(key, value); + if (old != null) { + throw new RuntimeException("key duplication: " + key); + } + } + return attributes; + } + + public static String parseToString(Map<String, String> attributes) { + if (attributes == null || attributes.size() == 0) { + return ""; + } + + List<String> kvs = new ArrayList<>(); + for (Map.Entry<String, String> entry : attributes.entrySet()) { + + String value = entry.getValue(); + if (Strings.isNullOrEmpty(value)) { + kvs.add(entry.getKey()); + } else { + kvs.add(entry.getKey() + "=" + entry.getValue()); + } + } + return Joiner.on(",").join(kvs); + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/attribute/BooleanAttribute.java similarity index 54% copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java copy to common/src/main/java/org/apache/rocketmq/common/attribute/BooleanAttribute.java index 18e7f5a..41ad748 100644 --- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/attribute/BooleanAttribute.java @@ -14,22 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.store.util; +package org.apache.rocketmq.common.attribute; -import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.MessageStore; -import org.apache.rocketmq.store.StreamMessageStore; -import org.apache.rocketmq.store.queue.CQType; +import static com.google.common.base.Preconditions.checkNotNull; -public class QueueTypeUtils { +public class BooleanAttribute extends Attribute { + private final boolean defaultValue; - public static CQType getCQType(MessageStore messageStore) { - if (messageStore instanceof DefaultMessageStore) { - return CQType.SimpleCQ; - } else if (messageStore instanceof StreamMessageStore) { - return CQType.BatchCQ; - } else { - throw new RuntimeException("new cq type is not supported now."); + public BooleanAttribute(String name, boolean changeable, boolean defaultValue) { + super(name, changeable); + this.defaultValue = defaultValue; + } + + @Override + public void verify(String value) { + checkNotNull(value); + + if (!"false".equalsIgnoreCase(value) && !"true".equalsIgnoreCase(value)) { + throw new RuntimeException("boolean attribute format is wrong."); } } + + public boolean getDefaultValue() { + return defaultValue; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/attribute/EnumAttribute.java similarity index 55% copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java copy to common/src/main/java/org/apache/rocketmq/common/attribute/EnumAttribute.java index 18e7f5a..5353b8a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/attribute/EnumAttribute.java @@ -14,22 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.store.util; +package org.apache.rocketmq.common.attribute; -import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.MessageStore; -import org.apache.rocketmq.store.StreamMessageStore; -import org.apache.rocketmq.store.queue.CQType; +import java.util.Set; -public class QueueTypeUtils { +public class EnumAttribute extends Attribute { + private final Set<String> universe; + private final String defaultValue; - public static CQType getCQType(MessageStore messageStore) { - if (messageStore instanceof DefaultMessageStore) { - return CQType.SimpleCQ; - } else if (messageStore instanceof StreamMessageStore) { - return CQType.BatchCQ; - } else { - throw new RuntimeException("new cq type is not supported now."); + public EnumAttribute(String name, boolean changeable, Set<String> universe, String defaultValue) { + super(name, changeable); + this.universe = universe; + this.defaultValue = defaultValue; + } + + @Override + public void verify(String value) { + if (!this.universe.contains(value)) { + throw new RuntimeException("value is not in set: " + this.universe); } } + + public String getDefaultValue() { + return defaultValue; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/attribute/LongRangeAttribute.java similarity index 52% copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java copy to common/src/main/java/org/apache/rocketmq/common/attribute/LongRangeAttribute.java index 18e7f5a..eeeda72 100644 --- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/attribute/LongRangeAttribute.java @@ -14,22 +14,31 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.store.util; +package org.apache.rocketmq.common.attribute; -import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.MessageStore; -import org.apache.rocketmq.store.StreamMessageStore; -import org.apache.rocketmq.store.queue.CQType; +import static java.lang.String.format; -public class QueueTypeUtils { +public class LongRangeAttribute extends Attribute { + private final long min; + private final long max; + private final long defaultValue; - public static CQType getCQType(MessageStore messageStore) { - if (messageStore instanceof DefaultMessageStore) { - return CQType.SimpleCQ; - } else if (messageStore instanceof StreamMessageStore) { - return CQType.BatchCQ; - } else { - throw new RuntimeException("new cq type is not supported now."); + public LongRangeAttribute(String name, boolean changeable, long min, long max, long defaultValue) { + super(name, changeable); + this.min = min; + this.max = max; + this.defaultValue = defaultValue; + } + + @Override + public void verify(String value) { + long l = Long.parseLong(value); + if (l < min || l > max) { + throw new RuntimeException(format("value is not in range(%d, %d)", min, max)); } } + + public long getDefaultValue() { + return defaultValue; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java index 290ec4c..2e381b3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java @@ -26,6 +26,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import java.util.Map; + public class CreateTopicRequestHeader implements CommandCustomHeader { @CFNotNull private String topic; @@ -42,6 +44,7 @@ public class CreateTopicRequestHeader implements CommandCustomHeader { private Integer topicSysFlag; @CFNotNull private Boolean order = false; + private String attributes; @CFNullable private Boolean force = false; @@ -130,4 +133,12 @@ public class CreateTopicRequestHeader implements CommandCustomHeader { public void setForce(Boolean force) { this.force = force; } + + public String getAttributes() { + return attributes; + } + + public void setAttributes(String attributes) { + this.attributes = attributes; + } } diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeParserTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeParserTest.java new file mode 100644 index 0000000..1239810 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeParserTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.attribute; + +import com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.Maps.newHashMap; +import static org.junit.Assert.assertTrue; + +public class AttributeParserTest { + @Test + public void testParseToMap() { + Assert.assertEquals(0, AttributeParser.parseToMap(null).size()); + AttributeParser.parseToMap("++=++"); + AttributeParser.parseToMap("--"); + Assert.assertThrows(RuntimeException.class, () -> AttributeParser.parseToMap("x")); + Assert.assertThrows(RuntimeException.class, () -> AttributeParser.parseToMap("+")); + Assert.assertThrows(RuntimeException.class, () -> AttributeParser.parseToMap("++")); + } + + @Test + public void testParseToString() { + Assert.assertEquals("", AttributeParser.parseToString(null)); + Assert.assertEquals("", AttributeParser.parseToString(newHashMap())); + HashMap<String, String> map = new HashMap<>(); + int addSize = 10; + for (int i = 0; i < addSize; i++) { + map.put("+add.key" + i, "value" + i); + } + int deleteSize = 10; + for (int i = 0; i < deleteSize; i++) { + map.put("-delete.key" + i, ""); + } + Assert.assertEquals(addSize + deleteSize, AttributeParser.parseToString(map).split(",").length); + } + + @Test + public void testParseBetweenStringAndMapWithoutDistortion() { + List<String> testCases = Arrays.asList("-a", "+a=b,+c=d,+z=z,+e=e", "+a=b,-d", "+a=b", "-a,-b"); + for (String testCase : testCases) { + assertTrue(Maps.difference(AttributeParser.parseToMap(testCase), AttributeParser.parseToMap(parse(testCase))).areEqual()); + } + } + + private String parse(String original) { + Map<String, String> stringStringMap = AttributeParser.parseToMap(original); + return AttributeParser.parseToString(stringStringMap); + } +} diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeTest.java new file mode 100644 index 0000000..39a12b9 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.attribute; + +import org.junit.Assert; +import org.junit.Test; + +import static com.google.common.collect.Sets.newHashSet; + +public class AttributeTest { + + @Test + public void testEnumAttribute() { + EnumAttribute enumAttribute = new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"); + + Assert.assertThrows(RuntimeException.class, () -> enumAttribute.verify("")); + Assert.assertThrows(RuntimeException.class, () -> enumAttribute.verify("x")); + Assert.assertThrows(RuntimeException.class, () -> enumAttribute.verify("enum-4")); + + enumAttribute.verify("enum-1"); + enumAttribute.verify("enum-2"); + enumAttribute.verify("enum-3"); + } + + @Test + public void testLongRangeAttribute() { + LongRangeAttribute longRangeAttribute = new LongRangeAttribute("long.range.key", true, 10, 20, 15); + Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify("")); + Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify(",")); + Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify("a")); + Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify("-1")); + Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify("21")); + + longRangeAttribute.verify("11"); + longRangeAttribute.verify("10"); + longRangeAttribute.verify("20"); + } + + @Test + public void testBooleanAttribute() { + BooleanAttribute booleanAttribute = new BooleanAttribute("bool.key", false, false); + + Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("")); + Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("a")); + Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify(",")); + Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("checked")); + Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("1")); + Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("0")); + Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("-1")); + + booleanAttribute.verify("true"); + booleanAttribute.verify("tRue"); + booleanAttribute.verify("false"); + booleanAttribute.verify("falSe"); + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java index 18e7f5a..612bf7e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java +++ b/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java @@ -16,13 +16,18 @@ */ package org.apache.rocketmq.store.util; +import org.apache.rocketmq.common.TopicAttributes; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.StreamMessageStore; import org.apache.rocketmq.store.queue.CQType; +import java.util.Map; + public class QueueTypeUtils { + @Deprecated public static CQType getCQType(MessageStore messageStore) { if (messageStore instanceof DefaultMessageStore) { return CQType.SimpleCQ; @@ -32,4 +37,19 @@ public class QueueTypeUtils { throw new RuntimeException("new cq type is not supported now."); } } + + public static CQType getCQType(TopicConfig topicConfig) { + String attributeName = TopicAttributes.queueType.getName(); + + Map<String, String> attributes = topicConfig.getAttributes(); + if (attributes == null || attributes.size() == 0) { + return CQType.valueOf(TopicAttributes.queueType.getDefaultValue()); + } + + if (attributes.containsKey(attributeName)) { + return CQType.valueOf(attributes.get(attributeName)); + } else { + return CQType.valueOf(TopicAttributes.queueType.getDefaultValue()); + } + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java index c33ae52..3caa477 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java @@ -16,12 +16,16 @@ */ package org.apache.rocketmq.tools.command.topic; +import java.util.Map; import java.util.Set; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.TopicAttributes; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.attribute.AttributeParser; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; @@ -55,6 +59,10 @@ public class UpdateTopicSubCommand implements SubCommand { optionGroup.setRequired(true); options.addOptionGroup(optionGroup); + opt = new Option("a", "attributes", true, "attribute(+a=b,+c=d,-e)"); + opt.setRequired(false); + options.addOption(opt); + opt = new Option("t", "topic", true, "topic name"); opt.setRequired(true); options.addOption(opt); @@ -98,6 +106,12 @@ public class UpdateTopicSubCommand implements SubCommand { topicConfig.setWriteQueueNums(8); topicConfig.setTopicName(commandLine.getOptionValue('t').trim()); + if (commandLine.hasOption('a')) { + String attributesModification = commandLine.getOptionValue('a').trim(); + Map<String, String> attributes = AttributeParser.parseToMap(attributesModification); + topicConfig.setAttributes(attributes); + } + // readQueueNums if (commandLine.hasOption('r')) { topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim())); @@ -187,4 +201,5 @@ public class UpdateTopicSubCommand implements SubCommand { defaultMQAdminExt.shutdown(); } } + }
