This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new 82364fc  Polish the tools
82364fc is described below

commit 82364fc28592980c8215edfaf4e20e254c39a861
Author: dongeforever <[email protected]>
AuthorDate: Thu Nov 18 15:52:54 2021 +0800

    Polish the tools
---
 .../common/statictopic/TopicQueueMappingUtils.java | 16 +++++++-
 .../MigrateTopicLogicalQueueCommand.java           |  2 +-
 .../topic/RemappingStaticTopicSubCommand.java      | 24 ++++-------
 .../command/topic/UpdateStaticTopicSubCommand.java | 47 ++++++++++------------
 4 files changed, 45 insertions(+), 44 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 27dcc0e..dfb6bbf 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -103,7 +103,7 @@ public class TopicQueueMappingUtils {
         return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, 
queueNum);
     }
 
-    public static Map<Integer, TopicQueueMappingOne> 
buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean 
replace) {
+    public static Map<Integer, TopicQueueMappingOne> 
buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean 
replace, boolean checkConsistence) {
         Collections.sort(mappingDetailList, new 
Comparator<TopicQueueMappingDetail>() {
             @Override
             public int compare(TopicQueueMappingDetail o1, 
TopicQueueMappingDetail o2) {
@@ -111,8 +111,12 @@ public class TopicQueueMappingUtils {
             }
         });
 
+        int maxNum = 0;
         Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, 
TopicQueueMappingOne>();
         for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
+            if (mappingDetail.totalQueues > maxNum) {
+                maxNum = mappingDetail.totalQueues;
+            }
             for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>  
entry : mappingDetail.getHostedQueues().entrySet()) {
                 Integer globalid = entry.getKey();
                 String leaderBrokerName  = getLeaderBroker(entry.getValue());
@@ -129,6 +133,16 @@ public class TopicQueueMappingUtils {
                 }
             }
         }
+        if (checkConsistence) {
+            if (maxNum != globalIdMap.size()) {
+                throw new RuntimeException(String.format("The total queue 
number in config dose not match the real hosted queues %d != %d", maxNum, 
globalIdMap.size()));
+            }
+            for (int i = 0; i < maxNum; i++) {
+                if (!globalIdMap.containsKey(i)) {
+                    throw new RuntimeException(String.format("The queue number 
%s is not in globalIdMap", i));
+                }
+            }
+        }
         return globalIdMap;
     }
 
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java
index 5da8b0d..a3f212c 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java
@@ -87,7 +87,7 @@ public class MigrateTopicLogicalQueueCommand implements 
SubCommand {
         String toBrokerName,
         Long forceDelta) throws RemotingException, MQBrokerException, 
InterruptedException, SubCommandException, MQClientException {
         TopicRouteData topicRouteInfo = 
mqAdminExt.examineTopicRouteInfo(topic);
-        LogicalQueuesInfo logicalQueuesInfo = 
topicRouteInfo.getLogicalQueuesInfo();
+        LogicalQueuesInfo logicalQueuesInfo =  null; 
/*topicRouteInfo.getLogicalQueuesInfo();*/
         if (logicalQueuesInfo == null) {
             throw new SubCommandException("topic not enabled logical queue");
         }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index edae005..472ce0b 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -172,26 +172,15 @@ public class RemappingStaticTopicSubCommand implements 
SubCommand {
             List<TopicQueueMappingDetail> detailList = 
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
             //check the epoch and qnum
             maxEpochAndNum = 
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
-            final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
-            detailList.forEach( mappingDetail -> {
-                if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
-                    throw new RuntimeException(String.format("epoch dose not 
match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), 
mappingDetail.getBname()));
+            for (TopicQueueMappingDetail mappingDetail : detailList) {
+                if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
+                    throw new RuntimeException(String.format("epoch dose not 
match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), 
mappingDetail.getBname()));
                 }
-                if (tmpMaxEpochAndNum.getValue() != 
mappingDetail.getTotalQueues()) {
-                    throw new RuntimeException(String.format("total queue 
number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), 
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
-                }
-            });
-
-            globalIdMap = TopicQueueMappingUtils.buildMappingItems(new 
ArrayList<>(detailList), false);
-
-            if (maxEpochAndNum.getValue() != globalIdMap.size()) {
-                throw new RuntimeException(String.format("The total queue 
number in config dose not match the real hosted queues %d != %d", 
maxEpochAndNum.getValue(), globalIdMap.size()));
-            }
-            for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
-                if (!globalIdMap.containsKey(i)) {
-                    throw new RuntimeException(String.format("The queue number 
%s is not in globalIdMap", i));
+                if (maxEpochAndNum.getValue() != 
mappingDetail.getTotalQueues()) {
+                    throw new RuntimeException(String.format("total queue 
number dose not match %d != %d in %s", maxEpochAndNum.getValue(), 
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
                 }
             }
+            globalIdMap = TopicQueueMappingUtils.buildMappingItems(new 
ArrayList<>(detailList), false, true);
 
             //the check is ok, now do the mapping allocation
             int maxNum = maxEpochAndNum.getValue();
@@ -262,6 +251,7 @@ public class RemappingStaticTopicSubCommand implements 
SubCommand {
                 configMapping.getMappingDetail().setEpoch(epoch);
                 configMapping.getMappingDetail().setTotalQueues(maxNum);
             });
+            TopicQueueMappingUtils.buildMappingItems(new 
ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())),
 false, true);
             // now do the remapping
             //Step1: let the new leader can be write without the logicOffset
             for (String broker: brokersToMapIn) {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index d25fb70..65c7377 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -64,7 +64,10 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
 
         Option opt = null;
 
-        opt = new Option("c", "clusters", true, "create topic to clusters, 
comma separated");
+        opt = new Option("c", "clusters", true, "remapping static topic to 
clusters, comma separated");
+        optionGroup.addOption(opt);
+
+        opt = new Option("b", "brokers", true, "remapping static topic to 
brokers, comma separated");
         optionGroup.addOption(opt);
 
         optionGroup.setRequired(true);
@@ -94,22 +97,23 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
         Set<String> brokers = new HashSet<>();
 
         try {
-            if (!commandLine.hasOption('t')
-                    || !commandLine.hasOption('c')
+            if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
+                    || !commandLine.hasOption('t')
                     || !commandLine.hasOption("qn")) {
                 ServerUtil.printCommandLineHelp("mqadmin " + 
this.commandName(), options);
                 return;
             }
-            String topic = commandLine.getOptionValue('t').trim();
-            int queueNum = 
Integer.parseInt(commandLine.getOptionValue("qn").trim());
-            String clusters = commandLine.getOptionValue('c').trim();
+
             ClusterInfo clusterInfo  = 
defaultMQAdminExt.examineBrokerClusterInfo();
             if (clusterInfo == null
                     || clusterInfo.getClusterAddrTable().isEmpty()) {
                 throw new RuntimeException("The Cluster info is empty");
-            } else {
-                clientMetadata.refreshClusterInfo(clusterInfo);
             }
+            clientMetadata.refreshClusterInfo(clusterInfo);
+
+            String topic = commandLine.getOptionValue('t').trim();
+            int queueNum = 
Integer.parseInt(commandLine.getOptionValue("qn").trim());
+            String clusters = commandLine.getOptionValue('c').trim();
             for (String cluster : clusters.split(",")) {
                 cluster = cluster.trim();
                 if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
@@ -163,26 +167,15 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
                 List<TopicQueueMappingDetail> detailList = 
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
                 //check the epoch and qnum
                 maxEpochAndNum = 
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
-                final Map.Entry<Long, Integer> tmpMaxEpochAndNum = 
maxEpochAndNum;
-                detailList.forEach( mappingDetail -> {
-                    if (tmpMaxEpochAndNum.getKey() != 
mappingDetail.getEpoch()) {
-                        throw new RuntimeException(String.format("epoch dose 
not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), 
mappingDetail.getEpoch(), mappingDetail.getBname()));
-                    }
-                    if (tmpMaxEpochAndNum.getValue() != 
mappingDetail.getTotalQueues()) {
-                        throw new RuntimeException(String.format("total queue 
number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), 
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
+                for (TopicQueueMappingDetail mappingDetail : detailList) {
+                    if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
+                        throw new RuntimeException(String.format("epoch dose 
not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), 
mappingDetail.getBname()));
                     }
-                });
-
-                globalIdMap = TopicQueueMappingUtils.buildMappingItems(new 
ArrayList<>(detailList), false);
-
-                if (maxEpochAndNum.getValue() != globalIdMap.size()) {
-                    throw new RuntimeException(String.format("The total queue 
number in config dose not match the real hosted queues %d != %d", 
maxEpochAndNum.getValue(), globalIdMap.size()));
-                }
-                for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
-                    if (!globalIdMap.containsKey(i)) {
-                        throw new RuntimeException(String.format("The queue 
number %s is not in globalIdMap", i));
+                    if (maxEpochAndNum.getValue() != 
mappingDetail.getTotalQueues()) {
+                        throw new RuntimeException(String.format("total queue 
number dose not match %d != %d in %s", maxEpochAndNum.getValue(), 
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
                     }
                 }
+                globalIdMap = TopicQueueMappingUtils.buildMappingItems(new 
ArrayList<>(detailList), false, true);
             }
             if (queueNum < globalIdMap.size()) {
                 throw new RuntimeException(String.format("Cannot decrease the 
queue num for static topic %d < %d", queueNum, globalIdMap.size()));
@@ -230,10 +223,14 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
                 LogicQueueMappingItem mappingItem = new 
LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, 
-1, -1, -1);
                 configMapping.getMappingDetail().putMappingInfo(queueId, 
ImmutableList.of(mappingItem));
             }
+
+            //double check the topic config map
             existedTopicConfigMap.values().forEach( configMapping -> {
                 configMapping.getMappingDetail().setEpoch(epoch);
                 configMapping.getMappingDetail().setTotalQueues(queueNum);
             });
+            TopicQueueMappingUtils.buildMappingItems(new 
ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())),
 false, true);
+
             //If some succeed, and others fail, it will cause inconsistent data
             for (Map.Entry<String, TopicConfigAndQueueMapping> entry : 
existedTopicConfigMap.entrySet()) {
                 String broker = entry.getKey();

Reply via email to