This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new f12eceb Add the UpdateStaticTopicSubCommand
f12eceb is described below
commit f12eceb402766b83139085309327ec3f672e56c7
Author: dongeforever <[email protected]>
AuthorDate: Wed Nov 17 17:28:08 2021 +0800
Add the UpdateStaticTopicSubCommand
---
.../apache/rocketmq/common/rpc/ClientMetadata.java | 37 ++++
.../command/topic/UpdateStaticTopicSubCommand.java | 200 ++++++++++-----------
2 files changed, 135 insertions(+), 102 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index e2dd076..c37a065 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -10,6 +10,7 @@ import
org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -25,6 +26,42 @@ public class ClientMetadata {
private final ConcurrentMap<String/* Broker Name */, HashMap<String/*
address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>();
+ public void freshTopicRoute(String topic, TopicRouteData topicRouteData) {
+ if (topic == null
+ || topicRouteData == null) {
+ return;
+ }
+ TopicRouteData old = this.topicRouteTable.get(topic);
+ if (!topicRouteDataIsChange(old, topicRouteData)) {
+ return ;
+ }
+ {
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ this.brokerAddrTable.put(bd.getBrokerName(),
bd.getBrokerAddrs());
+ }
+ }
+ {
+ ConcurrentMap<MessageQueue, String> mqEndPoints =
topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
+ if (mqEndPoints != null
+ && !mqEndPoints.isEmpty()) {
+ topicEndPointsTable.put(topic, mqEndPoints);
+ }
+ }
+ }
+
+
+ public static boolean topicRouteDataIsChange(TopicRouteData olddata,
TopicRouteData nowdata) {
+ if (olddata == null || nowdata == null)
+ return true;
+ TopicRouteData old = new TopicRouteData(olddata);
+ TopicRouteData now = new TopicRouteData(nowdata);
+ Collections.sort(old.getQueueDatas());
+ Collections.sort(old.getBrokerDatas());
+ Collections.sort(now.getQueueDatas());
+ Collections.sort(now.getBrokerDatas());
+ return !old.equals(now);
+
+ }
public String getBrokerNameFromMessageQueue(final MessageQueue mq) {
if (topicEndPointsTable.get(mq.getTopic()) != null
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index dc23780..05f2807 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -16,22 +16,27 @@
*/
package org.apache.rocketmq.tools.command.topic;
+import com.google.common.collect.ImmutableList;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
-import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.LogicQueueMappingItem;
+import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
-import
org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
public class UpdateStaticTopicSubCommand implements SubCommand {
@@ -68,124 +73,115 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
return options;
}
+ private void validate(Map.Entry<String, TopicConfigAndQueueMapping> entry,
boolean shouldNull) {
+ if (shouldNull) {
+ if (entry.getValue().getTopicQueueMappingInfo() != null) {
+ throw new RuntimeException("Mapping info should be null in
broker " + entry.getKey());
+ }
+ } else {
+ if (entry.getValue().getTopicQueueMappingInfo() == null) {
+ throw new RuntimeException("Mapping info should not be null in
broker " + entry.getKey());
+ }
+ if
(!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname()))
{
+ throw new RuntimeException(String.format("The broker name is
not equal %s != %s ", entry.getKey(),
entry.getValue().getTopicQueueMappingInfo().getBname()));
+ }
+ }
+ }
+
+ public void validateQueueMappingInfo(Map<Integer,
ImmutableList<LogicQueueMappingItem>> globalIdMap, TopicQueueMappingDetail
mappingDetail) {
+ if (mappingDetail.isDirty()) {
+ throw new RuntimeException("The mapping info is dirty in broker "
+ mappingDetail.getBname());
+ }
+ for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry :
mappingDetail.getHostedQueues().entrySet()) {
+ Integer globalid = entry.getKey();
+ String leaerBrokerName =
entry.getValue().iterator().next().getBname();
+ if (!leaerBrokerName.equals(mappingDetail.getBname())) {
+ //not the leader
+ continue;
+ }
+ if (globalIdMap.containsKey(globalid)) {
+ throw new RuntimeException(String.format("The queue id is
duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname()));
+ } else {
+ globalIdMap.put(globalid, entry.getValue());
+ }
+ }
+ }
+
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ ClientMetadata clientMetadata = new ClientMetadata();
+ Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new
HashMap<>();
+ Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new
HashMap<>();
try {
String topic = commandLine.getOptionValue('t').trim();
int queueNum =
Integer.parseInt(commandLine.getOptionValue("qn").trim());
String cluster = commandLine.getOptionValue('c').trim();
+ ClusterInfo clusterInfo =
defaultMQAdminExt.examineBrokerClusterInfo();
+ if (clusterInfo == null
+ || clusterInfo.getClusterAddrTable().isEmpty()
+ || clusterInfo.getClusterAddrTable().get(cluster) == null
+ ||
clusterInfo.getClusterAddrTable().get(cluster).isEmpty()) {
+ throw new RuntimeException("The Cluster info is null for " +
cluster);
+ }
+ clientMetadata.refreshClusterInfo(clusterInfo);
- //first check the topic route
+ //first get the existed topic config and mapping
{
TopicRouteData routeData =
defaultMQAdminExt.examineTopicRouteInfo(topic);
+ clientMetadata.freshTopicRoute(topic, routeData);
+ if (routeData != null
+ && !routeData.getQueueDatas().isEmpty()) {
+ for (QueueData queueData: routeData.getQueueDatas()) {
+ String bname = queueData.getBrokerName();
+ String addr =
clientMetadata.findMasterBrokerAddr(bname);
+ TopicConfigAndQueueMapping mapping =
(TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+ //allow the mapping info is null
+ if (mapping != null) {
+ existedTopicConfigMap.put(bname, mapping);
+ }
+ }
+ }
}
- TopicConfig topicConfig = new TopicConfig();
- topicConfig.setReadQueueNums(8);
- topicConfig.setWriteQueueNums(8);
- topicConfig.setTopicName(commandLine.getOptionValue('t').trim());
-
- // readQueueNums
- if (commandLine.hasOption('r')) {
-
topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
- }
-
- // writeQueueNums
- if (commandLine.hasOption('w')) {
-
topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
- }
-
- // perm
- if (commandLine.hasOption('p')) {
-
topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
- }
-
- boolean isUnit = false;
- if (commandLine.hasOption('u')) {
- isUnit =
Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
- }
-
- boolean isCenterSync = false;
- if (commandLine.hasOption('s')) {
- isCenterSync =
Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
+ {
+ if (!existedTopicConfigMap.isEmpty()) {
+ Iterator<Map.Entry<String, TopicConfigAndQueueMapping>> it
= existedTopicConfigMap.entrySet().iterator();
+ Map.Entry<String, TopicConfigAndQueueMapping> first =
it.next();
+ validate(first, false);
+ validateQueueMappingInfo(globalIdMap,
first.getValue().getTopicQueueMappingInfo());
+ TopicQueueMappingDetail firstMapping =
first.getValue().getTopicQueueMappingInfo();
+ while (it.hasNext()) {
+ Map.Entry<String, TopicConfigAndQueueMapping> next =
it.next();
+ validate(next, false);
+ validateQueueMappingInfo(globalIdMap,
next.getValue().getTopicQueueMappingInfo());
+ TopicQueueMappingDetail nextMapping =
next.getValue().getTopicQueueMappingInfo();
+ if (firstMapping.getEpoch() !=
nextMapping.getEpoch()) {
+ throw new RuntimeException(String.format("epoch
dose not match %d != %d in %s %s", firstMapping.getEpoch(),
nextMapping.getEpoch(), firstMapping.getBname(), nextMapping.getBname()));
+ }
+ if (firstMapping.getTotalQueues() !=
nextMapping.getTotalQueues()) {
+ throw new RuntimeException(String.format("total
queue number dose not match %d != %d in %s %s", firstMapping.getTotalQueues(),
nextMapping.getTotalQueues(), firstMapping.getBname(), nextMapping.getBname()));
+ }
+ }
+ if (firstMapping.getTotalQueues() != globalIdMap.size()) {
+ throw new RuntimeException(String.format("The total
queue number in config dose not match the real hosted queues %d != %d",
firstMapping.getTotalQueues(), globalIdMap.size()));
+ }
+ }
}
-
- int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit,
isCenterSync);
- topicConfig.setTopicSysFlag(topicCenterSync);
-
- boolean isOrder = false;
- if (commandLine.hasOption('o')) {
- isOrder =
Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
+ if (queueNum < globalIdMap.size()) {
+ throw new RuntimeException(String.format("Cannot decrease the
queue num for static topic %d < %d", queueNum, globalIdMap.size()));
}
- topicConfig.setOrder(isOrder);
-
- boolean useLogicalQueue = false;
- if (commandLine.hasOption("lq")) {
- useLogicalQueue =
Boolean.parseBoolean(commandLine.getOptionValue("lq").trim());
+ //check the queue number
+ if (queueNum == globalIdMap.size()) {
+ throw new RuntimeException("The topic queue num is equal the
existed queue num, do nothing");
}
+ //the check is ok, now do the real
- if (commandLine.hasOption('b')) {
- if (useLogicalQueue) {
- System.out.printf("-lq and -b can not be used
together.%n");
- return;
- }
-
- String addr = commandLine.getOptionValue('b').trim();
-
- defaultMQAdminExt.start();
- defaultMQAdminExt.createAndUpdateTopicConfig(addr,
topicConfig);
-
- if (isOrder) {
- String brokerName =
CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
- String orderConf = brokerName + ":" +
topicConfig.getWriteQueueNums();
-
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
orderConf, false);
- System.out.printf("%s", String.format("set broker
orderConf. isOrder=%s, orderConf=[%s]",
- isOrder, orderConf.toString()));
- }
- System.out.printf("create topic to %s success.%n", addr);
- System.out.printf("%s", topicConfig);
- return;
- } else if (commandLine.hasOption('c')) {
- String clusterName = commandLine.getOptionValue('c').trim();
-
- defaultMQAdminExt.start();
-
- Set<String> masterSet =
-
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
-
- for (String addr : masterSet) {
- defaultMQAdminExt.createAndUpdateTopicConfig(addr,
topicConfig);
- System.out.printf("create topic to %s success.%n", addr);
- }
-
- if (isOrder) {
- Set<String> brokerNameSet =
-
CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
- StringBuilder orderConf = new StringBuilder();
- String splitor = "";
- for (String s : brokerNameSet) {
- orderConf.append(splitor).append(s).append(":")
- .append(topicConfig.getWriteQueueNums());
- splitor = ";";
- }
-
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
- orderConf.toString(), true);
- System.out.printf("set cluster orderConf. isOrder=%s,
orderConf=[%s]", isOrder, orderConf);
- }
-
- System.out.printf("%s", topicConfig);
-
- if (useLogicalQueue) {
- new
UpdateTopicLogicalQueueMappingCommand().execute(defaultMQAdminExt,
topicConfig.getTopicName(), masterSet);
- }
- return;
- }
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(),
options);
} catch (Exception e) {