This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new 4e9b097 Check the correctness of logic items
4e9b097 is described below
commit 4e9b097478e7de052a14cd883b47858e6a47b94e
Author: dongeforever <[email protected]>
AuthorDate: Fri Nov 19 20:00:23 2021 +0800
Check the correctness of logic items
---
.../common/statictopic/LogicQueueMappingItem.java | 2 +-
.../common/statictopic/TopicQueueMappingUtils.java | 67 ++++++++++++++++++++++
2 files changed, 68 insertions(+), 1 deletion(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 3ab69d7..01686fd 100644
---
a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++
b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -7,7 +7,7 @@ public class LogicQueueMappingItem {
private String bname;
private long logicOffset; // the start of the logic offset
private long startOffset; // the start of the physical offset, included
- private long endOffset; // the end of the physical offset, excluded
+ private long endOffset = -1; // the end of the physical offset, excluded
private long timeOfStart = -1; // mutable
private long timeOfEnd = -1; // mutable
diff --git
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 545b7cf..370f0ec 100644
---
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -162,6 +162,72 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
}
+ public static void
checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> oldItems,
ImmutableList<LogicQueueMappingItem> newItems) {
+ if (oldItems == null || oldItems.isEmpty()) {
+ return;
+ }
+ if (newItems == null || newItems.isEmpty() || newItems.size() <
oldItems.size()) {
+ throw new RuntimeException("The new item list is smaller than old
ones");
+ }
+ int iold = 0, inew = 0;
+ while (iold < oldItems.size() && inew < newItems.size()) {
+ LogicQueueMappingItem newItem = newItems.get(inew);
+ LogicQueueMappingItem oldItem = oldItems.get(iold);
+ if (newItem.getGen() < oldItem.getGen()) {
+ inew++;
+ continue;
+ } else if (oldItem.getGen() < newItem.getGen()){
+ throw new RuntimeException("The gen is not correct for old
item");
+ } else {
+ assert oldItem.getBname().equals(newItem.getBname());
+ assert oldItem.getQueueId() == newItem.getQueueId();
+ assert oldItem.getStartOffset() == newItem.getStartOffset();
+ if (oldItem.getLogicOffset() != -1) {
+ assert oldItem.getLogicOffset() ==
newItem.getLogicOffset();
+ }
+ iold++;
+ inew++;
+ }
+ }
+ }
+
+
+ public static void
checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> items) {
+ if (items == null
+ || items.isEmpty()) {
+ return;
+ }
+ int lastGen = -1;
+ long lastOffset = -1;
+ for (int i = items.size() - 1; i >=0 ; i--) {
+ LogicQueueMappingItem item = items.get(i);
+ if (item.getStartOffset() < 0
+ || item.getGen() < 0
+ || item.getQueueId() < 0) {
+ throw new RuntimeException("The field is illegal, should not
be negative");
+ }
+ if (lastGen != -1 && item.getGen() >= lastGen) {
+ throw new RuntimeException("The gen dose not increase
monotonically");
+ }
+
+ if (item.getEndOffset() != -1
+ && item.getEndOffset() < item.getStartOffset()) {
+ throw new RuntimeException("The endOffset is smaller than the
start offset");
+ }
+
+ if (lastOffset != -1 && item.getLogicOffset() != -1) {
+ if (item.getLogicOffset() >= lastOffset) {
+ throw new RuntimeException("The base logic offset dose not
increase monotonically");
+ }
+ if (item.computeMaxStaticQueueOffset() >= lastOffset) {
+ throw new RuntimeException("The max logic offset dose not
increase monotonically");
+ }
+ }
+ lastGen = item.getGen();
+ lastOffset = item.getLogicOffset();
+ }
+ }
+
public static Map<Integer, TopicQueueMappingOne>
checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList,
boolean replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new
Comparator<TopicQueueMappingDetail>() {
@Override
@@ -178,6 +244,7 @@ public class TopicQueueMappingUtils {
}
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>
entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
+ checkLogicQueueMappingItemOffset(entry.getValue());
String leaderBrokerName = getLeaderBroker(entry.getValue());
if (!leaderBrokerName.equals(mappingDetail.getBname())) {
//not the leader