This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new 4e9b097  Check the correctness of logic items
4e9b097 is described below

commit 4e9b097478e7de052a14cd883b47858e6a47b94e
Author: dongeforever <[email protected]>
AuthorDate: Fri Nov 19 20:00:23 2021 +0800

    Check the correctness of logic items
---
 .../common/statictopic/LogicQueueMappingItem.java  |  2 +-
 .../common/statictopic/TopicQueueMappingUtils.java | 67 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 3ab69d7..01686fd 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -7,7 +7,7 @@ public class LogicQueueMappingItem {
     private String bname;
     private long logicOffset; // the start of the logic offset
     private long startOffset; // the start of the physical offset, included
-    private long endOffset; // the end of the physical offset, excluded
+    private long endOffset = -1; // the end of the physical offset, excluded
     private long timeOfStart = -1; // mutable
     private long timeOfEnd = -1; // mutable
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 545b7cf..370f0ec 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -162,6 +162,72 @@ public class TopicQueueMappingUtils {
         return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
     }
 
+    public static void 
checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> oldItems, 
ImmutableList<LogicQueueMappingItem> newItems) {
+        if (oldItems == null || oldItems.isEmpty()) {
+            return;
+        }
+        if (newItems == null || newItems.isEmpty() || newItems.size() < 
oldItems.size()) {
+            throw new RuntimeException("The new item list is smaller than old 
ones");
+        }
+        int iold = 0, inew = 0;
+        while (iold < oldItems.size() && inew < newItems.size()) {
+            LogicQueueMappingItem newItem = newItems.get(inew);
+            LogicQueueMappingItem oldItem = oldItems.get(iold);
+            if (newItem.getGen() < oldItem.getGen()) {
+                inew++;
+                continue;
+            } else if (oldItem.getGen() < newItem.getGen()){
+                throw new RuntimeException("The gen is not correct for old 
item");
+            } else {
+                assert oldItem.getBname().equals(newItem.getBname());
+                assert oldItem.getQueueId() == newItem.getQueueId();
+                assert oldItem.getStartOffset() == newItem.getStartOffset();
+                if (oldItem.getLogicOffset() != -1) {
+                    assert oldItem.getLogicOffset() == 
newItem.getLogicOffset();
+                }
+                iold++;
+                inew++;
+            }
+        }
+    }
+
+
+    public static void 
checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> items) {
+        if (items == null
+            || items.isEmpty()) {
+            return;
+        }
+        int lastGen = -1;
+        long lastOffset = -1;
+        for (int i = items.size() - 1; i >=0 ; i--) {
+            LogicQueueMappingItem item = items.get(i);
+            if (item.getStartOffset() < 0
+                    || item.getGen() < 0
+                    || item.getQueueId() < 0) {
+                throw new RuntimeException("The field is illegal, should not 
be negative");
+            }
+            if (lastGen != -1 && item.getGen() >= lastGen) {
+                throw new RuntimeException("The gen dose not increase 
monotonically");
+            }
+
+            if (item.getEndOffset() != -1
+                && item.getEndOffset() < item.getStartOffset()) {
+                throw new RuntimeException("The endOffset is smaller than the 
start offset");
+            }
+
+            if (lastOffset != -1 && item.getLogicOffset() != -1) {
+                if (item.getLogicOffset() >= lastOffset) {
+                    throw new RuntimeException("The base logic offset dose not 
increase monotonically");
+                }
+                if (item.computeMaxStaticQueueOffset() >= lastOffset) {
+                    throw new RuntimeException("The max logic offset dose not 
increase monotonically");
+                }
+            }
+            lastGen = item.getGen();
+            lastOffset = item.getLogicOffset();
+        }
+    }
+
     public static Map<Integer, TopicQueueMappingOne> 
checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, 
boolean replace, boolean checkConsistence) {
         Collections.sort(mappingDetailList, new 
Comparator<TopicQueueMappingDetail>() {
             @Override
@@ -178,6 +244,7 @@ public class TopicQueueMappingUtils {
             }
             for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>  
entry : mappingDetail.getHostedQueues().entrySet()) {
                 Integer globalid = entry.getKey();
+                checkLogicQueueMappingItemOffset(entry.getValue());
                 String leaderBrokerName  = getLeaderBroker(entry.getValue());
                 if (!leaderBrokerName.equals(mappingDetail.getBname())) {
                     //not the leader

Reply via email to