This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new f12eceb  Add the UpdateStaticTopicSubCommand
f12eceb is described below

commit f12eceb402766b83139085309327ec3f672e56c7
Author: dongeforever <[email protected]>
AuthorDate: Wed Nov 17 17:28:08 2021 +0800

    Add the UpdateStaticTopicSubCommand
---
 .../apache/rocketmq/common/rpc/ClientMetadata.java |  37 ++++
 .../command/topic/UpdateStaticTopicSubCommand.java | 200 ++++++++++-----------
 2 files changed, 135 insertions(+), 102 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java 
b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index e2dd076..c37a065 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -10,6 +10,7 @@ import 
org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -25,6 +26,42 @@ public class ClientMetadata {
     private final ConcurrentMap<String/* Broker Name */, HashMap<String/* 
address */, Integer>> brokerVersionTable =
             new ConcurrentHashMap<String, HashMap<String, Integer>>();
 
+    public void freshTopicRoute(String topic, TopicRouteData topicRouteData) {
+        if (topic == null
+            || topicRouteData == null) {
+            return;
+        }
+        TopicRouteData old = this.topicRouteTable.get(topic);
+        if (!topicRouteDataIsChange(old, topicRouteData)) {
+            return ;
+        }
+        {
+            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+                this.brokerAddrTable.put(bd.getBrokerName(), 
bd.getBrokerAddrs());
+            }
+        }
+        {
+            ConcurrentMap<MessageQueue, String> mqEndPoints = 
topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
+            if (mqEndPoints != null
+                    && !mqEndPoints.isEmpty()) {
+                topicEndPointsTable.put(topic, mqEndPoints);
+            }
+        }
+    }
+
+
+    public static boolean topicRouteDataIsChange(TopicRouteData olddata, 
TopicRouteData nowdata) {
+        if (olddata == null || nowdata == null)
+            return true;
+        TopicRouteData old = new TopicRouteData(olddata);
+        TopicRouteData now = new TopicRouteData(nowdata);
+        Collections.sort(old.getQueueDatas());
+        Collections.sort(old.getBrokerDatas());
+        Collections.sort(now.getQueueDatas());
+        Collections.sort(now.getBrokerDatas());
+        return !old.equals(now);
+
+    }
 
     public String getBrokerNameFromMessageQueue(final MessageQueue mq) {
         if (topicEndPointsTable.get(mq.getTopic()) != null
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index dc23780..05f2807 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -16,22 +16,27 @@
  */
 package org.apache.rocketmq.tools.command.topic;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
-import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.LogicQueueMappingItem;
+import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.command.CommandUtil;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
-import 
org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
 
-import java.util.Set;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 public class UpdateStaticTopicSubCommand implements SubCommand {
 
@@ -68,124 +73,115 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
         return options;
     }
 
+    private void validate(Map.Entry<String, TopicConfigAndQueueMapping> entry, 
boolean shouldNull) {
+        if (shouldNull) {
+            if (entry.getValue().getTopicQueueMappingInfo() != null) {
+                throw new RuntimeException("Mapping info should be null in 
broker " + entry.getKey());
+            }
+        } else {
+            if (entry.getValue().getTopicQueueMappingInfo() == null) {
+                throw new RuntimeException("Mapping info should not be null in 
broker  " + entry.getKey());
+            }
+            if 
(!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname()))
 {
+                throw new RuntimeException(String.format("The broker name is 
not equal %s != %s ",  entry.getKey(), 
entry.getValue().getTopicQueueMappingInfo().getBname()));
+            }
+        }
+    }
+
+    public void validateQueueMappingInfo(Map<Integer, 
ImmutableList<LogicQueueMappingItem>> globalIdMap, TopicQueueMappingDetail 
mappingDetail) {
+        if (mappingDetail.isDirty()) {
+            throw new RuntimeException("The mapping info is dirty in broker  " 
+ mappingDetail.getBname());
+        }
+        for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>  entry : 
mappingDetail.getHostedQueues().entrySet()) {
+            Integer globalid = entry.getKey();
+            String leaerBrokerName  = 
entry.getValue().iterator().next().getBname();
+            if (!leaerBrokerName.equals(mappingDetail.getBname())) {
+                //not the leader
+                continue;
+            }
+            if (globalIdMap.containsKey(globalid)) {
+                throw new RuntimeException(String.format("The queue id is 
duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname()));
+            } else {
+                globalIdMap.put(globalid, entry.getValue());
+            }
+        }
+    }
+
     @Override
     public void execute(final CommandLine commandLine, final Options options,
         RPCHook rpcHook) throws SubCommandException {
         DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
         
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
 
+        ClientMetadata clientMetadata = new ClientMetadata();
+        Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new 
HashMap<>();
+        Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new 
HashMap<>();
         try {
 
             String topic = commandLine.getOptionValue('t').trim();
             int queueNum = 
Integer.parseInt(commandLine.getOptionValue("qn").trim());
             String cluster = commandLine.getOptionValue('c').trim();
+            ClusterInfo clusterInfo  = 
defaultMQAdminExt.examineBrokerClusterInfo();
+            if (clusterInfo == null
+                    || clusterInfo.getClusterAddrTable().isEmpty()
+                    || clusterInfo.getClusterAddrTable().get(cluster) == null
+                    || 
clusterInfo.getClusterAddrTable().get(cluster).isEmpty()) {
+                throw new RuntimeException("The Cluster info is null for " + 
cluster);
+            }
+            clientMetadata.refreshClusterInfo(clusterInfo);
 
-            //first check the topic route
+            //first get the existed topic config and mapping
             {
                 TopicRouteData routeData = 
defaultMQAdminExt.examineTopicRouteInfo(topic);
+                clientMetadata.freshTopicRoute(topic, routeData);
+                if (routeData != null
+                    && !routeData.getQueueDatas().isEmpty()) {
+                    for (QueueData queueData: routeData.getQueueDatas()) {
+                        String bname = queueData.getBrokerName();
+                        String addr = 
clientMetadata.findMasterBrokerAddr(bname);
+                        TopicConfigAndQueueMapping mapping = 
(TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+                        //allow the mapping info is null
+                        if (mapping != null) {
+                            existedTopicConfigMap.put(bname, mapping);
+                        }
+                    }
+                }
 
             }
-            TopicConfig topicConfig = new TopicConfig();
-            topicConfig.setReadQueueNums(8);
-            topicConfig.setWriteQueueNums(8);
-            topicConfig.setTopicName(commandLine.getOptionValue('t').trim());
-
-            // readQueueNums
-            if (commandLine.hasOption('r')) {
-                
topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
-            }
-
-            // writeQueueNums
-            if (commandLine.hasOption('w')) {
-                
topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
-            }
-
-            // perm
-            if (commandLine.hasOption('p')) {
-                
topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
-            }
-
-            boolean isUnit = false;
-            if (commandLine.hasOption('u')) {
-                isUnit = 
Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
-            }
-
-            boolean isCenterSync = false;
-            if (commandLine.hasOption('s')) {
-                isCenterSync = 
Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
+            {
+                if (!existedTopicConfigMap.isEmpty()) {
+                    Iterator<Map.Entry<String, TopicConfigAndQueueMapping>> it 
= existedTopicConfigMap.entrySet().iterator();
+                    Map.Entry<String, TopicConfigAndQueueMapping> first = 
it.next();
+                    validate(first, false);
+                    validateQueueMappingInfo(globalIdMap, 
first.getValue().getTopicQueueMappingInfo());
+                    TopicQueueMappingDetail firstMapping = 
first.getValue().getTopicQueueMappingInfo();
+                    while (it.hasNext()) {
+                        Map.Entry<String, TopicConfigAndQueueMapping> next = 
it.next();
+                        validate(next, false);
+                        validateQueueMappingInfo(globalIdMap, 
next.getValue().getTopicQueueMappingInfo());
+                        TopicQueueMappingDetail nextMapping = 
next.getValue().getTopicQueueMappingInfo();
+                        if (firstMapping.getEpoch() !=  
nextMapping.getEpoch()) {
+                            throw new RuntimeException(String.format("epoch 
dose not match %d != %d in %s %s", firstMapping.getEpoch(), 
nextMapping.getEpoch(), firstMapping.getBname(), nextMapping.getBname()));
+                        }
+                        if (firstMapping.getTotalQueues() !=  
nextMapping.getTotalQueues()) {
+                            throw new RuntimeException(String.format("total 
queue number dose not match %d != %d in %s %s", firstMapping.getTotalQueues(), 
nextMapping.getTotalQueues(), firstMapping.getBname(), nextMapping.getBname()));
+                        }
+                    }
+                    if (firstMapping.getTotalQueues() != globalIdMap.size()) {
+                        throw new RuntimeException(String.format("The total 
queue number in config dose not match the real hosted queues %d != %d", 
firstMapping.getTotalQueues(), globalIdMap.size()));
+                    }
+                }
             }
-
-            int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, 
isCenterSync);
-            topicConfig.setTopicSysFlag(topicCenterSync);
-
-            boolean isOrder = false;
-            if (commandLine.hasOption('o')) {
-                isOrder = 
Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
+            if (queueNum < globalIdMap.size()) {
+                throw new RuntimeException(String.format("Cannot decrease the 
queue num for static topic %d < %d", queueNum, globalIdMap.size()));
             }
-            topicConfig.setOrder(isOrder);
-
-            boolean useLogicalQueue = false;
-            if (commandLine.hasOption("lq")) {
-                useLogicalQueue = 
Boolean.parseBoolean(commandLine.getOptionValue("lq").trim());
+            //check the queue number
+            if (queueNum == globalIdMap.size()) {
+                throw new RuntimeException("The topic queue num is equal the 
existed queue num, do nothing");
             }
+            //the check is ok, now do the real
 
-            if (commandLine.hasOption('b')) {
-                if (useLogicalQueue) {
-                    System.out.printf("-lq and -b can not be used 
together.%n");
-                    return;
-                }
-
-                String addr = commandLine.getOptionValue('b').trim();
-
-                defaultMQAdminExt.start();
-                defaultMQAdminExt.createAndUpdateTopicConfig(addr, 
topicConfig);
-
-                if (isOrder) {
-                    String brokerName = 
CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
-                    String orderConf = brokerName + ":" + 
topicConfig.getWriteQueueNums();
-                    
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), 
orderConf, false);
-                    System.out.printf("%s", String.format("set broker 
orderConf. isOrder=%s, orderConf=[%s]",
-                        isOrder, orderConf.toString()));
-                }
-                System.out.printf("create topic to %s success.%n", addr);
-                System.out.printf("%s", topicConfig);
-                return;
 
-            } else if (commandLine.hasOption('c')) {
-                String clusterName = commandLine.getOptionValue('c').trim();
-
-                defaultMQAdminExt.start();
-
-                Set<String> masterSet =
-                    
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
-
-                for (String addr : masterSet) {
-                    defaultMQAdminExt.createAndUpdateTopicConfig(addr, 
topicConfig);
-                    System.out.printf("create topic to %s success.%n", addr);
-                }
-
-                if (isOrder) {
-                    Set<String> brokerNameSet =
-                        
CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
-                    StringBuilder orderConf = new StringBuilder();
-                    String splitor = "";
-                    for (String s : brokerNameSet) {
-                        orderConf.append(splitor).append(s).append(":")
-                            .append(topicConfig.getWriteQueueNums());
-                        splitor = ";";
-                    }
-                    
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
-                        orderConf.toString(), true);
-                    System.out.printf("set cluster orderConf. isOrder=%s, 
orderConf=[%s]", isOrder, orderConf);
-                }
-
-                System.out.printf("%s", topicConfig);
-
-                if (useLogicalQueue) {
-                    new 
UpdateTopicLogicalQueueMappingCommand().execute(defaultMQAdminExt, 
topicConfig.getTopicName(), masterSet);
-                }
-                return;
-            }
 
             ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), 
options);
         } catch (Exception e) {

Reply via email to