This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new a5af2cf  Finish the UpdateStaticTopicSubCommand
a5af2cf is described below

commit a5af2cf519e7c2c2a306cc326926100fc75f57d3
Author: dongeforever <[email protected]>
AuthorDate: Wed Nov 17 23:05:03 2021 +0800

    Finish the UpdateStaticTopicSubCommand
---
 .../org/apache/rocketmq/common/TopicConfig.java    |   6 +
 .../common/TopicConfigAndQueueMapping.java         |  11 +-
 .../rocketmq/common/TopicQueueMappingInfo.java     |  11 +-
 .../rocketmq/common/TopicQueueMappingUtils.java    |  59 +++++--
 .../command/topic/UpdateStaticTopicSubCommand.java | 192 +++++++++++++--------
 5 files changed, 181 insertions(+), 98 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
index c082ba6..ec4d54b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
@@ -47,6 +47,12 @@ public class TopicConfig {
         this.order = other.order;
     }
 
+    public TopicConfig(String topicName, int readQueueNums, int 
writeQueueNums) {
+        this.topicName = topicName;
+        this.readQueueNums = readQueueNums;
+        this.writeQueueNums = writeQueueNums;
+    }
+
     public TopicConfig(String topicName, int readQueueNums, int 
writeQueueNums, int perm) {
         this.topicName = topicName;
         this.readQueueNums = readQueueNums;
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
 
b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
index 3bc7f24..e2eece6 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
@@ -17,18 +17,17 @@
 package org.apache.rocketmq.common;
 
 public class TopicConfigAndQueueMapping extends TopicConfig {
-    private TopicQueueMappingDetail topicQueueMappingDetail;
+    private TopicQueueMappingDetail mappingDetail;
 
     public TopicConfigAndQueueMapping() {
     }
 
-    public TopicConfigAndQueueMapping(TopicConfig topicConfig, 
TopicQueueMappingDetail topicQueueMappingDetail) {
+    public TopicConfigAndQueueMapping(TopicConfig topicConfig, 
TopicQueueMappingDetail mappingDetail) {
         super(topicConfig);
-        this.topicQueueMappingDetail = topicQueueMappingDetail;
+        this.mappingDetail = mappingDetail;
     }
 
-
-    public TopicQueueMappingDetail getTopicQueueMappingInfo() {
-        return topicQueueMappingDetail;
+    public TopicQueueMappingDetail getMappingDetail() {
+        return mappingDetail;
     }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java 
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index b2e8591..b4bf776 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -55,9 +55,6 @@ public class TopicQueueMappingInfo extends 
RemotingSerializable {
         return totalQueues;
     }
 
-    public void setTotalQueues(int totalQueues) {
-        this.totalQueues = totalQueues;
-    }
 
     public String getBname() {
         return bname;
@@ -71,6 +68,14 @@ public class TopicQueueMappingInfo extends 
RemotingSerializable {
         return epoch;
     }
 
+    public void setEpoch(int epoch) {
+        this.epoch = epoch;
+    }
+
+    public void setTotalQueues(int totalQueues) {
+        this.totalQueues = totalQueues;
+    }
+
     public ConcurrentMap<Integer, Integer> getCurrIdMap() {
         return currIdMap;
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java 
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
index 93345df..87c54a3 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
@@ -29,16 +29,18 @@ import java.util.Random;
 
 public class TopicQueueMappingUtils {
 
-    public static class MappingState {
+    public static class MappingAllocator {
         Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
+        Map<Integer, String> idToBroker = new HashMap<Integer, String>();
         int currentIndex = 0;
         Random random = new Random();
         List<String> leastBrokers = new ArrayList<String>();
-        private MappingState(Map<String, Integer> brokerNumMap) {
+        private MappingAllocator(Map<Integer, String> idToBroker, Map<String, 
Integer> brokerNumMap) {
+            this.idToBroker.putAll(idToBroker);
             this.brokerNumMap.putAll(brokerNumMap);
         }
 
-        public void freshState() {
+        private void freshState() {
             int minNum = -1;
             for (Map.Entry<String, Integer> entry : brokerNumMap.entrySet()) {
                 if (entry.getValue() > minNum) {
@@ -50,20 +52,41 @@ public class TopicQueueMappingUtils {
             }
             currentIndex = random.nextInt(leastBrokers.size());
         }
-
-        public String nextBroker() {
+        private String nextBroker() {
             if (leastBrokers.isEmpty()) {
                 freshState();
             }
-            int tmpIndex = (++currentIndex) % leastBrokers.size();
-            String broker = leastBrokers.remove(tmpIndex);
-            currentIndex--;
-            return broker;
+            int tmpIndex = currentIndex % leastBrokers.size();
+            return leastBrokers.remove(tmpIndex);
+        }
+
+        public Map<String, Integer> getBrokerNumMap() {
+            return brokerNumMap;
+        }
+
+        public void upToNum(int maxQueueNum) {
+            int currSize = idToBroker.size();
+            if (maxQueueNum <= currSize) {
+                return;
+            }
+            for (int i = currSize; i < maxQueueNum; i++) {
+                String nextBroker = nextBroker();
+                if (brokerNumMap.containsKey(nextBroker)) {
+                    brokerNumMap.put(nextBroker, brokerNumMap.get(nextBroker) 
+ 1);
+                } else {
+                    brokerNumMap.put(nextBroker, 1);
+                }
+                idToBroker.put(i, nextBroker);
+            }
+        }
+
+        public Map<Integer, String> getIdToBroker() {
+            return idToBroker;
         }
     }
 
-    public static MappingState buildMappingState(Map<String, Integer> 
brokerNumMap) {
-        return new MappingState(brokerNumMap);
+    public static MappingAllocator buildMappingAllocator(Map<Integer, String> 
idToBroker, Map<String, Integer> brokerNumMap) {
+        return new MappingAllocator(idToBroker, brokerNumMap);
     }
 
     public static Map.Entry<Integer, Integer> 
findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
@@ -92,14 +115,14 @@ public class TopicQueueMappingUtils {
         for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
             for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>  
entry : mappingDetail.getHostedQueues().entrySet()) {
                 Integer globalid = entry.getKey();
-                String leaerBrokerName  = 
entry.getValue().iterator().next().getBname();
-                if (!leaerBrokerName.equals(mappingDetail.getBname())) {
+                String leaderBrokerName  = getLeaderBroker(entry.getValue());
+                if (!leaderBrokerName.equals(mappingDetail.getBname())) {
                     //not the leader
                     continue;
                 }
                 if (globalIdMap.containsKey(globalid)) {
                     if (!replace) {
-                        throw new RuntimeException(String.format("The queue id 
is duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname()));
+                        throw new RuntimeException(String.format("The queue id 
is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname()));
                     }
                 } else {
                     globalIdMap.put(globalid, entry.getValue());
@@ -108,4 +131,12 @@ public class TopicQueueMappingUtils {
         }
         return globalIdMap;
     }
+
+    public static String getLeaderBroker(ImmutableList<LogicQueueMappingItem> 
items) {
+        return getLeaderItem(items).getBname();
+    }
+    public static LogicQueueMappingItem 
getLeaderItem(ImmutableList<LogicQueueMappingItem> items) {
+        assert items.size() > 0;
+        return items.get(items.size() - 1);
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 253d19c..a1d3f6f 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -22,6 +22,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
 import org.apache.rocketmq.common.LogicQueueMappingItem;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.TopicQueueMappingUtils;
@@ -35,11 +36,13 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class UpdateStaticTopicSubCommand implements SubCommand {
@@ -77,18 +80,6 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
         return options;
     }
 
-    private void validateIfNull(Map.Entry<String, TopicConfigAndQueueMapping> 
entry, boolean shouldNull) {
-        if (shouldNull) {
-            if (entry.getValue().getTopicQueueMappingInfo() != null) {
-                throw new RuntimeException("Mapping info should be null in 
broker " + entry.getKey());
-            }
-        } else {
-            if (entry.getValue().getTopicQueueMappingInfo() == null) {
-                throw new RuntimeException("Mapping info should not be null in 
broker  " + entry.getKey());
-            }
-        }
-    }
-
 
     @Override
     public void execute(final CommandLine commandLine, final Options options,
@@ -100,73 +91,88 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
         Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new 
HashMap<>();
         Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new 
HashMap<>();
         try {
-
+            if (!commandLine.hasOption('t')
+                    || !commandLine.hasOption('c')
+                    || !commandLine.hasOption("qn")) {
+                ServerUtil.printCommandLineHelp("mqadmin " + 
this.commandName(), options);
+                return;
+            }
             String topic = commandLine.getOptionValue('t').trim();
             int queueNum = 
Integer.parseInt(commandLine.getOptionValue("qn").trim());
-            String cluster = commandLine.getOptionValue('c').trim();
+            String clusters = commandLine.getOptionValue('c').trim();
             ClusterInfo clusterInfo  = 
defaultMQAdminExt.examineBrokerClusterInfo();
             if (clusterInfo == null
-                    || clusterInfo.getClusterAddrTable().isEmpty()
-                    || clusterInfo.getClusterAddrTable().get(cluster) == null
-                    || 
clusterInfo.getClusterAddrTable().get(cluster).isEmpty()) {
-                throw new RuntimeException("The Cluster info is null for " + 
cluster);
+                    || clusterInfo.getClusterAddrTable().isEmpty()) {
+                throw new RuntimeException("The Cluster info is empty");
+            } else {
+                clientMetadata.refreshClusterInfo(clusterInfo);
             }
-            clientMetadata.refreshClusterInfo(clusterInfo);
-            //first get the existed topic config and mapping
-            {
-                TopicRouteData routeData = 
defaultMQAdminExt.examineTopicRouteInfo(topic);
-                clientMetadata.freshTopicRoute(topic, routeData);
-                if (routeData != null
+            Set<String> brokers = new HashSet<>();
+            for (String cluster : clusters.split(",")) {
+                cluster = cluster.trim();
+                if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
+                    
brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+                }
+            }
+            if (brokers.isEmpty()) {
+                throw new RuntimeException("Find none brokers for " + 
clusters);
+            }
+
+            //get the existed topic config and mapping
+            TopicRouteData routeData = 
defaultMQAdminExt.examineTopicRouteInfo(topic);
+            clientMetadata.freshTopicRoute(topic, routeData);
+            if (routeData != null
                     && !routeData.getQueueDatas().isEmpty()) {
-                    for (QueueData queueData: routeData.getQueueDatas()) {
-                        String bname = queueData.getBrokerName();
-                        String addr = 
clientMetadata.findMasterBrokerAddr(bname);
-                        TopicConfigAndQueueMapping mapping = 
(TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
-                        //allow the mapping info is null
-                        if (mapping != null) {
-                            existedTopicConfigMap.put(bname, mapping);
-                        }
+                for (QueueData queueData: routeData.getQueueDatas()) {
+                    String bname = queueData.getBrokerName();
+                    String addr = clientMetadata.findMasterBrokerAddr(bname);
+                    TopicConfigAndQueueMapping mapping = 
(TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+                    //allow the config is null
+                    if (mapping != null) {
+                        existedTopicConfigMap.put(bname, mapping);
                     }
                 }
             }
-            // the
-            {
-                if (!existedTopicConfigMap.isEmpty()) {
-                    //make sure it it not null
-                    existedTopicConfigMap.entrySet().forEach(entry -> {
-                        validateIfNull(entry, false);
-                    });
-                    //make sure the detail is not dirty
-                    existedTopicConfigMap.entrySet().forEach(entry -> {
-                        if 
(!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname()))
 {
-                            throw new RuntimeException(String.format("The 
broker name is not equal %s != %s ",  entry.getKey(), 
entry.getValue().getTopicQueueMappingInfo().getBname()));
-                        }
-                        if 
(entry.getValue().getTopicQueueMappingInfo().isDirty()) {
-                            throw new RuntimeException("The mapping info is 
dirty in broker  " + entry.getValue().getTopicQueueMappingInfo().getBname());
-                        }
-                    });
-
-                    List<TopicQueueMappingDetail> detailList = 
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getTopicQueueMappingInfo).collect(Collectors.toList());
-                    //check the epoch and qnum
-                    Map.Entry<Integer, Integer> maxEpochAndNum = 
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
-                    detailList.forEach( mappingDetail -> {
-                        if (maxEpochAndNum.getKey() != 
mappingDetail.getEpoch()) {
-                            throw new RuntimeException(String.format("epoch 
dose not match %d != %d in %s", maxEpochAndNum.getKey(), 
mappingDetail.getEpoch(), mappingDetail.getBname()));
-                        }
-                        if (maxEpochAndNum.getValue() != 
mappingDetail.getTotalQueues()) {
-                            throw new RuntimeException(String.format("total 
queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), 
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
-                        }
-                    });
-
-                    globalIdMap = TopicQueueMappingUtils.buildMappingItems(new 
ArrayList<>(detailList), false);
-
-                    if (maxEpochAndNum.getValue() != globalIdMap.size()) {
-                        throw new RuntimeException(String.format("The total 
queue number in config dose not match the real hosted queues %d != %d", 
maxEpochAndNum.getValue(), globalIdMap.size()));
+
+            Map.Entry<Integer, Integer> maxEpochAndNum = new 
AbstractMap.SimpleImmutableEntry<>(-1, queueNum);
+            if (!existedTopicConfigMap.isEmpty()) {
+                //make sure it it not null
+                existedTopicConfigMap.forEach((key, value) -> {
+                    if (value.getMappingDetail() != null) {
+                        throw new RuntimeException("Mapping info should be 
null in broker " + key);
+                    }
+                });
+                //make sure the detail is not dirty
+                existedTopicConfigMap.forEach((key, value) -> {
+                    if (!key.equals(value.getMappingDetail().getBname())) {
+                        throw new RuntimeException(String.format("The broker 
name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
                     }
-                    for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
-                        if (!globalIdMap.containsKey(i)) {
-                            throw new RuntimeException(String.format("The 
queue number %s is not in globalIdMap", i));
-                        }
+                    if (value.getMappingDetail().isDirty()) {
+                        throw new RuntimeException("The mapping info is dirty 
in broker  " + value.getMappingDetail().getBname());
+                    }
+                });
+
+                List<TopicQueueMappingDetail> detailList = 
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
+                //check the epoch and qnum
+                maxEpochAndNum = 
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
+                final Map.Entry<Integer, Integer> tmpMaxEpochAndNum = 
maxEpochAndNum;
+                detailList.forEach( mappingDetail -> {
+                    if (tmpMaxEpochAndNum.getKey() != 
mappingDetail.getEpoch()) {
+                        throw new RuntimeException(String.format("epoch dose 
not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), 
mappingDetail.getEpoch(), mappingDetail.getBname()));
+                    }
+                    if (tmpMaxEpochAndNum.getValue() != 
mappingDetail.getTotalQueues()) {
+                        throw new RuntimeException(String.format("total queue 
number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), 
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
+                    }
+                });
+
+                globalIdMap = TopicQueueMappingUtils.buildMappingItems(new 
ArrayList<>(detailList), false);
+
+                if (maxEpochAndNum.getValue() != globalIdMap.size()) {
+                    throw new RuntimeException(String.format("The total queue 
number in config dose not match the real hosted queues %d != %d", 
maxEpochAndNum.getValue(), globalIdMap.size()));
+                }
+                for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
+                    if (!globalIdMap.containsKey(i)) {
+                        throw new RuntimeException(String.format("The queue 
number %s is not in globalIdMap", i));
                     }
                 }
             }
@@ -177,11 +183,47 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
             if (queueNum == globalIdMap.size()) {
                 throw new RuntimeException("The topic queue num is equal the 
existed queue num, do nothing");
             }
-            //the check is ok, now do the real
-
-
-
-            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), 
options);
+            //the check is ok, now do the mapping allocation
+            Map<String, Integer> brokerNumMap = 
brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
+            Map<Integer, String> idToBroker = new HashMap<>();
+            globalIdMap.forEach((key, value) -> {
+                String leaderbroker = 
TopicQueueMappingUtils.getLeaderBroker(value);
+                idToBroker.put(key, leaderbroker);
+                if (!brokerNumMap.containsKey(leaderbroker)) {
+                    brokerNumMap.put(leaderbroker, 1);
+                } else {
+                    brokerNumMap.put(leaderbroker, 
brokerNumMap.get(leaderbroker) + 1);
+                }
+            });
+            TopicQueueMappingUtils.MappingAllocator allocator = 
TopicQueueMappingUtils.buildMappingAllocator(idToBroker, brokerNumMap);
+            allocator.upToNum(queueNum);
+            Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
+
+            //construct the topic configAndMapping
+            int epoch = maxEpochAndNum.getKey() + 1;
+            newIdToBroker.forEach( (queueId, broker) -> {
+                TopicConfigAndQueueMapping configMapping;
+                if (!existedTopicConfigMap.containsKey(broker)) {
+                    TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
+                    TopicQueueMappingDetail mappingDetail = new 
TopicQueueMappingDetail(topic, queueNum, broker, epoch);
+                    configMapping = new 
TopicConfigAndQueueMapping(topicConfig, mappingDetail);
+                } else {
+                    configMapping = existedTopicConfigMap.get(broker);
+                    
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
+                    
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
+                    configMapping.getMappingDetail().setEpoch(epoch);
+                    configMapping.getMappingDetail().setTotalQueues(queueNum);
+                }
+                LogicQueueMappingItem mappingItem = new 
LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, 
-1, -1, -1);
+                configMapping.getMappingDetail().putMappingInfo(queueId, 
ImmutableList.of(mappingItem));
+            });
+
+            for (Map.Entry<String, TopicConfigAndQueueMapping> entry : 
existedTopicConfigMap.entrySet()) {
+                String broker = entry.getKey();
+                String addr = clientMetadata.findMasterBrokerAddr(broker);
+                TopicConfigAndQueueMapping configMapping = entry.getValue();
+                defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail());
+            }
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
         } finally {

Reply via email to