This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new 295a6bd  Polish the update utils
295a6bd is described below

commit 295a6bde63210d73570dcdfc71ccd100a3c87444
Author: dongeforever <[email protected]>
AuthorDate: Wed Nov 17 21:49:27 2021 +0800

    Polish the update utils
---
 .../rocketmq/common/TopicQueueMappingUtils.java    | 111 +++++++++++++++++++++
 .../command/topic/UpdateStaticTopicSubCommand.java |  78 +++++++--------
 2 files changed, 149 insertions(+), 40 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java 
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
new file mode 100644
index 0000000..93345df
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class TopicQueueMappingUtils {
+
+    public static class MappingState {
+        Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
+        int currentIndex = 0;
+        Random random = new Random();
+        List<String> leastBrokers = new ArrayList<String>();
+        private MappingState(Map<String, Integer> brokerNumMap) {
+            this.brokerNumMap.putAll(brokerNumMap);
+        }
+
+        public void freshState() {
+            int minNum = -1;
+            for (Map.Entry<String, Integer> entry : brokerNumMap.entrySet()) {
+                if (entry.getValue() > minNum) {
+                    leastBrokers.clear();
+                    leastBrokers.add(entry.getKey());
+                } else if (entry.getValue() == minNum) {
+                    leastBrokers.add(entry.getKey());
+                }
+            }
+            currentIndex = random.nextInt(leastBrokers.size());
+        }
+
+        public String nextBroker() {
+            if (leastBrokers.isEmpty()) {
+                freshState();
+            }
+            int tmpIndex = (++currentIndex) % leastBrokers.size();
+            String broker = leastBrokers.remove(tmpIndex);
+            currentIndex--;
+            return broker;
+        }
+    }
+
+    public static MappingState buildMappingState(Map<String, Integer> 
brokerNumMap) {
+        return new MappingState(brokerNumMap);
+    }
+
+    public static Map.Entry<Integer, Integer> 
findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
+        int epoch = -1;
+        int queueNum = 0;
+        for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
+            if (mappingDetail.getEpoch() > epoch) {
+                epoch = mappingDetail.getEpoch();
+            }
+            if (mappingDetail.getTotalQueues() > queueNum) {
+                queueNum = mappingDetail.getTotalQueues();
+            }
+        }
+        return new AbstractMap.SimpleImmutableEntry<Integer, Integer>(epoch, 
queueNum);
+    }
+
+    public static Map<Integer, ImmutableList<LogicQueueMappingItem>> 
buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean 
replace) {
+        Collections.sort(mappingDetailList, new 
Comparator<TopicQueueMappingDetail>() {
+            @Override
+            public int compare(TopicQueueMappingDetail o1, 
TopicQueueMappingDetail o2) {
+                return o2.getEpoch() - o1.getEpoch();
+            }
+        });
+
+        Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new 
HashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+        for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
+            for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>  
entry : mappingDetail.getHostedQueues().entrySet()) {
+                Integer globalid = entry.getKey();
+                String leaerBrokerName  = 
entry.getValue().iterator().next().getBname();
+                if (!leaerBrokerName.equals(mappingDetail.getBname())) {
+                    //not the leader
+                    continue;
+                }
+                if (globalIdMap.containsKey(globalid)) {
+                    if (!replace) {
+                        throw new RuntimeException(String.format("The queue id 
is duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname()));
+                    }
+                } else {
+                    globalIdMap.put(globalid, entry.getValue());
+                }
+            }
+        }
+        return globalIdMap;
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 05f2807..253d19c 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Options;
 import org.apache.rocketmq.common.LogicQueueMappingItem;
 import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -34,9 +35,12 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class UpdateStaticTopicSubCommand implements SubCommand {
 
@@ -73,7 +77,7 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
         return options;
     }
 
-    private void validate(Map.Entry<String, TopicConfigAndQueueMapping> entry, 
boolean shouldNull) {
+    private void validateIfNull(Map.Entry<String, TopicConfigAndQueueMapping> 
entry, boolean shouldNull) {
         if (shouldNull) {
             if (entry.getValue().getTopicQueueMappingInfo() != null) {
                 throw new RuntimeException("Mapping info should be null in 
broker " + entry.getKey());
@@ -82,30 +86,9 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
             if (entry.getValue().getTopicQueueMappingInfo() == null) {
                 throw new RuntimeException("Mapping info should not be null in 
broker  " + entry.getKey());
             }
-            if 
(!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname()))
 {
-                throw new RuntimeException(String.format("The broker name is 
not equal %s != %s ",  entry.getKey(), 
entry.getValue().getTopicQueueMappingInfo().getBname()));
-            }
         }
     }
 
-    public void validateQueueMappingInfo(Map<Integer, 
ImmutableList<LogicQueueMappingItem>> globalIdMap, TopicQueueMappingDetail 
mappingDetail) {
-        if (mappingDetail.isDirty()) {
-            throw new RuntimeException("The mapping info is dirty in broker  " 
+ mappingDetail.getBname());
-        }
-        for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>  entry : 
mappingDetail.getHostedQueues().entrySet()) {
-            Integer globalid = entry.getKey();
-            String leaerBrokerName  = 
entry.getValue().iterator().next().getBname();
-            if (!leaerBrokerName.equals(mappingDetail.getBname())) {
-                //not the leader
-                continue;
-            }
-            if (globalIdMap.containsKey(globalid)) {
-                throw new RuntimeException(String.format("The queue id is 
duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname()));
-            } else {
-                globalIdMap.put(globalid, entry.getValue());
-            }
-        }
-    }
 
     @Override
     public void execute(final CommandLine commandLine, final Options options,
@@ -129,7 +112,6 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
                 throw new RuntimeException("The Cluster info is null for " + 
cluster);
             }
             clientMetadata.refreshClusterInfo(clusterInfo);
-
             //first get the existed topic config and mapping
             {
                 TopicRouteData routeData = 
defaultMQAdminExt.examineTopicRouteInfo(topic);
@@ -146,29 +128,45 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
                         }
                     }
                 }
-
             }
+            // the
             {
                 if (!existedTopicConfigMap.isEmpty()) {
-                    Iterator<Map.Entry<String, TopicConfigAndQueueMapping>> it 
= existedTopicConfigMap.entrySet().iterator();
-                    Map.Entry<String, TopicConfigAndQueueMapping> first = 
it.next();
-                    validate(first, false);
-                    validateQueueMappingInfo(globalIdMap, 
first.getValue().getTopicQueueMappingInfo());
-                    TopicQueueMappingDetail firstMapping = 
first.getValue().getTopicQueueMappingInfo();
-                    while (it.hasNext()) {
-                        Map.Entry<String, TopicConfigAndQueueMapping> next = 
it.next();
-                        validate(next, false);
-                        validateQueueMappingInfo(globalIdMap, 
next.getValue().getTopicQueueMappingInfo());
-                        TopicQueueMappingDetail nextMapping = 
next.getValue().getTopicQueueMappingInfo();
-                        if (firstMapping.getEpoch() !=  
nextMapping.getEpoch()) {
-                            throw new RuntimeException(String.format("epoch 
dose not match %d != %d in %s %s", firstMapping.getEpoch(), 
nextMapping.getEpoch(), firstMapping.getBname(), nextMapping.getBname()));
+                    //make sure it it not null
+                    existedTopicConfigMap.entrySet().forEach(entry -> {
+                        validateIfNull(entry, false);
+                    });
+                    //make sure the detail is not dirty
+                    existedTopicConfigMap.entrySet().forEach(entry -> {
+                        if 
(!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname()))
 {
+                            throw new RuntimeException(String.format("The 
broker name is not equal %s != %s ",  entry.getKey(), 
entry.getValue().getTopicQueueMappingInfo().getBname()));
+                        }
+                        if 
(entry.getValue().getTopicQueueMappingInfo().isDirty()) {
+                            throw new RuntimeException("The mapping info is 
dirty in broker  " + entry.getValue().getTopicQueueMappingInfo().getBname());
+                        }
+                    });
+
+                    List<TopicQueueMappingDetail> detailList = 
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getTopicQueueMappingInfo).collect(Collectors.toList());
+                    //check the epoch and qnum
+                    Map.Entry<Integer, Integer> maxEpochAndNum = 
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
+                    detailList.forEach( mappingDetail -> {
+                        if (maxEpochAndNum.getKey() != 
mappingDetail.getEpoch()) {
+                            throw new RuntimeException(String.format("epoch 
dose not match %d != %d in %s", maxEpochAndNum.getKey(), 
mappingDetail.getEpoch(), mappingDetail.getBname()));
                         }
-                        if (firstMapping.getTotalQueues() !=  
nextMapping.getTotalQueues()) {
-                            throw new RuntimeException(String.format("total 
queue number dose not match %d != %d in %s %s", firstMapping.getTotalQueues(), 
nextMapping.getTotalQueues(), firstMapping.getBname(), nextMapping.getBname()));
+                        if (maxEpochAndNum.getValue() != 
mappingDetail.getTotalQueues()) {
+                            throw new RuntimeException(String.format("total 
queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), 
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
                         }
+                    });
+
+                    globalIdMap = TopicQueueMappingUtils.buildMappingItems(new 
ArrayList<>(detailList), false);
+
+                    if (maxEpochAndNum.getValue() != globalIdMap.size()) {
+                        throw new RuntimeException(String.format("The total 
queue number in config dose not match the real hosted queues %d != %d", 
maxEpochAndNum.getValue(), globalIdMap.size()));
                     }
-                    if (firstMapping.getTotalQueues() != globalIdMap.size()) {
-                        throw new RuntimeException(String.format("The total 
queue number in config dose not match the real hosted queues %d != %d", 
firstMapping.getTotalQueues(), globalIdMap.size()));
+                    for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
+                        if (!globalIdMap.containsKey(i)) {
+                            throw new RuntimeException(String.format("The 
queue number %s is not in globalIdMap", i));
+                        }
                     }
                 }
             }

Reply via email to