This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new 71af9c7 Finish the logic for SendProcessor
71af9c7 is described below
commit 71af9c75cfdfc3a0a026b6524c45daea4b68cd65
Author: dongeforever <[email protected]>
AuthorDate: Wed Nov 10 14:23:12 2021 +0800
Finish the logic for SendProcessor
---
.../broker/processor/SendMessageProcessor.java | 75 +++++++++++++++++-----
.../client/impl/producer/TopicPublishInfo.java | 5 --
.../rocketmq/common/LogicQueueMappingItem.java | 4 ++
.../rocketmq/common/TopicQueueMappingDetail.java | 18 ++++++
.../rocketmq/common/protocol/ResponseCode.java | 2 +
5 files changed, 82 insertions(+), 22 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 91463a4..9daebf4 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
@@ -96,6 +97,10 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
+ RemotingCommand rewriteResult =
rewriteRequestForStaticTopic(requestHeader);
+ if (rewriteResult != null) {
+ return CompletableFuture.completedFuture(rewriteResult);
+ }
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request,
mqtraceContext);
if (requestHeader.isBatch()) {
@@ -106,6 +111,54 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
}
}
+ private RemotingCommand buildErrorResponse(int code, String remark) {
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+ response.setCode(code);
+ response.setRemark(remark);
+ return response;
+ }
+ /**
+ * If the response is not null, it meets some errors
+ * @param requestHeader
+ * @return
+ */
+ private RemotingCommand
rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader) {
+ try {
+ TopicQueueMappingDetail mappingDetail =
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
+ if (mappingDetail == null) {
+ return null;
+ }
+ if
(!mappingDetail.getCurrIdMap().containsKey(requestHeader.getQueueId())) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in current broker %s",
requestHeader.getTopic(), requestHeader.getQueueId(),
this.brokerController.getBrokerConfig().getBrokerName()));
+ }
+
requestHeader.setQueueId(mappingDetail.getCurrIdMap().get(requestHeader.getQueueId()));
+ } catch (Throwable t) {
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR,
t.getMessage());
+ }
+ return null;
+ }
+
+ private RemotingCommand rewriteResponseForStaticTopic(String topic,
SendMessageResponseHeader responseHeader) {
+ try {
+ TopicQueueMappingDetail mappingDetail =
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
+ if (mappingDetail == null) {
+ return null;
+ }
+ if
(!mappingDetail.getCurrIdMap().containsKey(responseHeader.getQueueId())) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in current broker %s", topic,
responseHeader.getQueueId(),
this.brokerController.getBrokerConfig().getBrokerName()));
+ }
+ long staticLogicOffset =
mappingDetail.convertToLogicOffset(responseHeader.getQueueId(),
responseHeader.getQueueOffset());
+ if (staticLogicOffset < 0) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d convert offset error in current broker %s", topic,
responseHeader.getQueueId(),
this.brokerController.getBrokerConfig().getBrokerName()));
+
+ }
+ responseHeader.setQueueOffset(staticLogicOffset);
+ } catch (Throwable t) {
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR,
t.getMessage());
+ }
+ return null;
+ }
+
@Override
public boolean rejectRequest() {
return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
@@ -310,12 +363,6 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
}
- LogicalQueueContext logicalQueueContext =
super.buildLogicalQueueContext(msgInner.getTopic(), msgInner.getQueueId(),
response);
- CompletableFuture<RemotingCommand> future =
logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response);
- if (future != null) {
- return future;
- }
-
CompletableFuture<PutMessageResult> putMessageResult = null;
String transFlag =
origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
@@ -324,14 +371,12 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
response.setRemark(
"the broker[" +
this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is
forbidden");
- logicalQueueContext.hookAfterPut(null);
return CompletableFuture.completedFuture(response);
}
putMessageResult =
this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
putMessageResult =
this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
- logicalQueueContext.hookAfterPut(putMessageResult);
return handlePutMessageResultFuture(putMessageResult, response,
request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
@@ -549,6 +594,11 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+ RemotingCommand rewriteResult =
rewriteResponseForStaticTopic(msg.getTopic(), responseHeader);
+ if (rewriteResult != null) {
+ return rewriteResult;
+ }
+
doResponse(ctx, request, response);
if (hasSendMessageHook()) {
@@ -623,16 +673,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
String clusterName =
this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(messageExtBatch,
MessageConst.PROPERTY_CLUSTER, clusterName);
- LogicalQueueContext logicalQueueContext =
super.buildLogicalQueueContext(messageExtBatch.getTopic(),
messageExtBatch.getQueueId(), response);
- CompletableFuture<RemotingCommand> future =
logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response);
- if (future != null) {
- return future;
- }
-
CompletableFuture<PutMessageResult> putMessageResult =
this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);
-
- logicalQueueContext.hookAfterPut(putMessageResult);
-
return handlePutMessageResultFuture(putMessageResult, response,
request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 60974cc..2f8337e 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -17,13 +17,9 @@
package org.apache.rocketmq.client.impl.producer;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -120,5 +116,4 @@ public class TopicPublishInfo {
public void setTopicRouteData(final TopicRouteData topicRouteData) {
this.topicRouteData = topicRouteData;
}
-
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index 50d88ae..78a27dc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -18,6 +18,10 @@ public class LogicQueueMappingItem {
this.timeOfStart = timeOfStart;
}
+ public long convertToStaticLogicOffset(long physicalLogicOffset) {
+ return logicOffset + (physicalLogicOffset - startOffset);
+ }
+
public int getGen() {
return gen;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index d3e3d92..a181130 100644
---
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -81,6 +81,24 @@ public class TopicQueueMappingDetail extends
TopicQueueMappingInfo {
}
+ public long convertToLogicOffset(Integer globalId, long
physicalLogicOffset) {
+ List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
+ if (mappingItems == null
+ || mappingItems.isEmpty()) {
+ return -1;
+ }
+ if (bname.equals(mappingItems.get(mappingItems.size() -
1).getBname())) {
+ return mappingItems.get(mappingItems.size() -
1).convertToStaticLogicOffset(physicalLogicOffset);
+ }
+ //Consider the "switch" process, reduce the error
+ if (mappingItems.size() >= 2
+ && bname.equals(mappingItems.get(mappingItems.size() -
2).getBname())) {
+ return mappingItems.get(mappingItems.size() -
2).convertToStaticLogicOffset(physicalLogicOffset);
+ }
+ return -1;
+ }
+
+
public TopicQueueMappingInfo cloneAsMappingInfo() {
TopicQueueMappingInfo topicQueueMappingInfo = new
TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index 42b9c4f..3944c18 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -84,4 +84,6 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int POLLING_FULL = 209;
public static final int POLLING_TIMEOUT = 210;
+
+ public static final int NOT_LEADER_FOR_QUEUE = 501;
}