This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new c673cb7 Use timestamp as the epoch to prevent some unknown problem
c673cb7 is described below
commit c673cb734eb6bf81b3c446f6b3816bb354091ace
Author: dongeforever <[email protected]>
AuthorDate: Wed Nov 17 23:28:14 2021 +0800
Use timestamp as the epoch to prevent some unknown problem
---
.../java/org/apache/rocketmq/common/TopicQueueMappingDetail.java | 2 +-
.../java/org/apache/rocketmq/common/TopicQueueMappingInfo.java | 8 ++++----
.../java/org/apache/rocketmq/common/TopicQueueMappingUtils.java | 8 ++++----
.../protocol/body/TopicConfigAndMappingSerializeWrapper.java | 2 +-
.../rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java | 7 ++++---
5 files changed, 14 insertions(+), 13 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index 9b67751..188bfcc 100644
---
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -29,7 +29,7 @@ public class TopicQueueMappingDetail extends
TopicQueueMappingInfo {
// make sure this value is not null
private ConcurrentMap<Integer/*global id*/,
ImmutableList<LogicQueueMappingItem>> hostedQueues = new
ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
- public TopicQueueMappingDetail(String topic, int totalQueues, String
bname, int epoch) {
+ public TopicQueueMappingDetail(String topic, int totalQueues, String
bname, long epoch) {
super(topic, totalQueues, bname, epoch);
buildIdMap();
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index b4bf776..f8c803c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -28,14 +28,14 @@ public class TopicQueueMappingInfo extends
RemotingSerializable {
String topic; // redundant field
int totalQueues;
String bname; //identify the hosted broker name
- int epoch; //important to fence the old dirty data
+ long epoch; //important to fence the old dirty data
boolean dirty; //indicate if the data is dirty
//register to broker to construct the route
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/>
currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/>
prevIdMap = new ConcurrentHashMap<Integer, Integer>();
- public TopicQueueMappingInfo(String topic, int totalQueues, String bname,
int epoch) {
+ public TopicQueueMappingInfo(String topic, int totalQueues, String bname,
long epoch) {
this.topic = topic;
this.totalQueues = totalQueues;
this.bname = bname;
@@ -64,11 +64,11 @@ public class TopicQueueMappingInfo extends
RemotingSerializable {
return topic;
}
- public int getEpoch() {
+ public long getEpoch() {
return epoch;
}
- public void setEpoch(int epoch) {
+ public void setEpoch(long epoch) {
this.epoch = epoch;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
index 87c54a3..686208a 100644
---
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
+++
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
@@ -89,8 +89,8 @@ public class TopicQueueMappingUtils {
return new MappingAllocator(idToBroker, brokerNumMap);
}
- public static Map.Entry<Integer, Integer>
findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
- int epoch = -1;
+ public static Map.Entry<Long, Integer>
findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
+ long epoch = -1;
int queueNum = 0;
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
if (mappingDetail.getEpoch() > epoch) {
@@ -100,14 +100,14 @@ public class TopicQueueMappingUtils {
queueNum = mappingDetail.getTotalQueues();
}
}
- return new AbstractMap.SimpleImmutableEntry<Integer, Integer>(epoch,
queueNum);
+ return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch,
queueNum);
}
public static Map<Integer, ImmutableList<LogicQueueMappingItem>>
buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean
replace) {
Collections.sort(mappingDetailList, new
Comparator<TopicQueueMappingDetail>() {
@Override
public int compare(TopicQueueMappingDetail o1,
TopicQueueMappingDetail o2) {
- return o2.getEpoch() - o1.getEpoch();
+ return (int)(o2.getEpoch() - o1.getEpoch());
}
});
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
index e6a34c4..ba494aa 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
@@ -35,7 +35,7 @@ public class TopicConfigAndMappingSerializeWrapper extends
TopicConfigSerializeW
public static TopicConfigAndMappingSerializeWrapper
from(TopicConfigSerializeWrapper wrapper) {
if (wrapper instanceof TopicConfigAndMappingSerializeWrapper) {
- return (TopicConfigAndMappingSerializeWrapper)wrapper;
+ return (TopicConfigAndMappingSerializeWrapper) wrapper;
}
TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = new
TopicConfigAndMappingSerializeWrapper();
mappingSerializeWrapper.setDataVersion(wrapper.getDataVersion());
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index a1d3f6f..5cee8c8 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -134,7 +134,7 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
}
}
- Map.Entry<Integer, Integer> maxEpochAndNum = new
AbstractMap.SimpleImmutableEntry<>(-1, queueNum);
+ Map.Entry<Long, Integer> maxEpochAndNum = new
AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
if (!existedTopicConfigMap.isEmpty()) {
//make sure it it not null
existedTopicConfigMap.forEach((key, value) -> {
@@ -155,7 +155,7 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
List<TopicQueueMappingDetail> detailList =
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
//check the epoch and qnum
maxEpochAndNum =
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
- final Map.Entry<Integer, Integer> tmpMaxEpochAndNum =
maxEpochAndNum;
+ final Map.Entry<Long, Integer> tmpMaxEpochAndNum =
maxEpochAndNum;
detailList.forEach( mappingDetail -> {
if (tmpMaxEpochAndNum.getKey() !=
mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose
not match %d != %d in %s", tmpMaxEpochAndNum.getKey(),
mappingDetail.getEpoch(), mappingDetail.getBname()));
@@ -200,7 +200,7 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
//construct the topic configAndMapping
- int epoch = maxEpochAndNum.getKey() + 1;
+ long epoch = Math.max(maxEpochAndNum.getKey() + 1000,
System.currentTimeMillis());
newIdToBroker.forEach( (queueId, broker) -> {
TopicConfigAndQueueMapping configMapping;
if (!existedTopicConfigMap.containsKey(broker)) {
@@ -218,6 +218,7 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
configMapping.getMappingDetail().putMappingInfo(queueId,
ImmutableList.of(mappingItem));
});
+ //If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry :
existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);