This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new 36a82fa Polish the rewrite logic for SendProcessor
36a82fa is described below
commit 36a82fa67785aa4e98a2b6a6ceeda576e9cf66b7
Author: dongeforever <[email protected]>
AuthorDate: Wed Nov 10 14:57:19 2021 +0800
Polish the rewrite logic for SendProcessor
---
.../broker/processor/SendMessageProcessor.java | 29 ++++++++++++++++------
.../rocketmq/common/TopicQueueMappingDetail.java | 23 +++++++++++++++++
2 files changed, 45 insertions(+), 7 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 9daebf4..7b88c75 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.processor;
import java.net.SocketAddress;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -126,16 +127,28 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
try {
TopicQueueMappingDetail mappingDetail =
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
if (mappingDetail == null) {
+ //it is not static topic
return null;
}
- if
(!mappingDetail.getCurrIdMap().containsKey(requestHeader.getQueueId())) {
- return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in current broker %s",
requestHeader.getTopic(), requestHeader.getQueueId(),
this.brokerController.getBrokerConfig().getBrokerName()));
+ Integer phyQueueId = null;
+ //compatible with the old logic, but it fact, this should not
happen
+ if (requestHeader.getQueueId() < 0) {
+ Iterator<Map.Entry<Integer, Integer>> it =
mappingDetail.getCurrIdMap().entrySet().iterator();
+ if (it.hasNext()) {
+ phyQueueId = it.next().getValue();
+ }
+ } else {
+ phyQueueId =
mappingDetail.getCurrIdMap().get(requestHeader.getQueueId());
+ }
+ if (phyQueueId == null) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
requestHeader.getTopic(), requestHeader.getQueueId(),
this.brokerController.getBrokerConfig().getBrokerName()));
+ } else {
+ requestHeader.setQueueId(phyQueueId);
+ return null;
}
-
requestHeader.setQueueId(mappingDetail.getCurrIdMap().get(requestHeader.getQueueId()));
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR,
t.getMessage());
}
- return null;
}
private RemotingCommand rewriteResponseForStaticTopic(String topic,
SendMessageResponseHeader responseHeader) {
@@ -144,14 +157,16 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
if (mappingDetail == null) {
return null;
}
- if
(!mappingDetail.getCurrIdMap().containsKey(responseHeader.getQueueId())) {
- return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in current broker %s", topic,
responseHeader.getQueueId(),
this.brokerController.getBrokerConfig().getBrokerName()));
+ Integer globalId =
mappingDetail.getCurrIdMapRevert().get(responseHeader.getQueueId());
+ if (globalId == null) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exist in response process of current broker %s",
topic, responseHeader.getQueueId(),
this.brokerController.getBrokerConfig().getBrokerName()));
}
- long staticLogicOffset =
mappingDetail.convertToLogicOffset(responseHeader.getQueueId(),
responseHeader.getQueueOffset());
+ long staticLogicOffset =
mappingDetail.convertToLogicOffset(globalId, responseHeader.getQueueOffset());
if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d convert offset error in current broker %s", topic,
responseHeader.getQueueId(),
this.brokerController.getBrokerConfig().getBrokerName()));
}
+ responseHeader.setQueueId(globalId);
responseHeader.setQueueOffset(staticLogicOffset);
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR,
t.getMessage());
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index a181130..0021310 100644
---
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -27,6 +27,9 @@ public class TopicQueueMappingDetail extends
TopicQueueMappingInfo {
// the mapping info in current broker, do not register to nameserver
ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>>
hostedQueues = new ConcurrentHashMap<Integer,
ImmutableList<LogicQueueMappingItem>>();
+ transient ConcurrentMap<Integer/*physicalId*/, Integer/*logicId*/>
currIdMapRevert = new ConcurrentHashMap<Integer, Integer>();
+
+
public TopicQueueMappingDetail(String topic, int totalQueues, String
bname) {
super(topic, totalQueues, bname);
@@ -45,6 +48,18 @@ public class TopicQueueMappingDetail extends
TopicQueueMappingInfo {
public void buildIdMap() {
this.currIdMap = buildIdMap(LEVEL_0);
this.prevIdMap = buildIdMap(LEVEL_1);
+ this.currIdMapRevert = revert(this.currIdMap);
+ }
+
+ public ConcurrentMap<Integer, Integer> revert(ConcurrentMap<Integer,
Integer> original) {
+ if (original == null || original.isEmpty()) {
+ return new ConcurrentHashMap<Integer, Integer>();
+ }
+ ConcurrentMap<Integer, Integer> tmpIdMap = new
ConcurrentHashMap<Integer, Integer>();
+ for (Map.Entry<Integer, Integer> entry: tmpIdMap.entrySet()) {
+ tmpIdMap.put(entry.getValue(), entry.getKey());
+ }
+ return tmpIdMap;
}
public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
@@ -107,6 +122,14 @@ public class TopicQueueMappingDetail extends
TopicQueueMappingInfo {
return topicQueueMappingInfo;
}
+ public ConcurrentMap<Integer, Integer> getCurrIdMapRevert() {
+ return currIdMapRevert;
+ }
+
+ public void setCurrIdMapRevert(ConcurrentMap<Integer, Integer>
currIdMapRevert) {
+ this.currIdMapRevert = currIdMapRevert;
+ }
+
public int getTotalQueues() {
return totalQueues;
}