This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f17112963 Optimize the build of sendMessageContext and avoid
unnecessary repeated request parsing (#5578)
f17112963 is described below
commit f17112963cad4d7b489d136c3c9f6dae3567c0f5
Author: rongtong <[email protected]>
AuthorDate: Thu Nov 24 09:54:54 2022 +0800
Optimize the build of sendMessageContext and avoid unnecessary repeated
request parsing (#5578)
---
.../processor/AbstractSendMessageProcessor.java | 57 +++++++++-------------
.../broker/processor/ReplyMessageProcessor.java | 4 +-
.../broker/processor/SendMessageProcessor.java | 12 ++---
3 files changed, 29 insertions(+), 44 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index fe7040863..d87b765b6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -345,20 +345,25 @@ public abstract class AbstractSendMessageProcessor
implements NettyRequestProces
}
protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx,
- SendMessageRequestHeader requestHeader) {
+ SendMessageRequestHeader requestHeader, RemotingCommand request) {
String namespace =
NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
- SendMessageContext traceContext;
- traceContext = new SendMessageContext();
- traceContext.setNamespace(namespace);
- traceContext.setProducerGroup(requestHeader.getProducerGroup());
- traceContext.setTopic(requestHeader.getTopic());
- traceContext.setMsgProps(requestHeader.getProperties());
-
traceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- traceContext.setBrokerAddr(this.brokerController.getBrokerAddr());
-
traceContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId());
- traceContext.setBornTimeStamp(requestHeader.getBornTimestamp());
- traceContext.setRequestTimeStamp(System.currentTimeMillis());
+ SendMessageContext sendMessageContext;
+ sendMessageContext = new SendMessageContext();
+ sendMessageContext.setNamespace(namespace);
+ sendMessageContext.setProducerGroup(requestHeader.getProducerGroup());
+ sendMessageContext.setTopic(requestHeader.getTopic());
+ sendMessageContext.setBodyLength(request.getBody().length);
+ sendMessageContext.setMsgProps(requestHeader.getProperties());
+
sendMessageContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
sendMessageContext.setBrokerAddr(this.brokerController.getBrokerAddr());
+ sendMessageContext.setQueueId(requestHeader.getQueueId());
+
sendMessageContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId());
+ sendMessageContext.setBornTimeStamp(requestHeader.getBornTimestamp());
+ sendMessageContext.setRequestTimeStamp(System.currentTimeMillis());
+
+ String owner =
request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+ sendMessageContext.setCommercialOwner(owner);
Map<String, String> properties =
MessageDecoder.string2messageProperties(requestHeader.getProperties());
String uniqueKey =
properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
@@ -369,14 +374,14 @@ public abstract class AbstractSendMessageProcessor
implements NettyRequestProces
if (uniqueKey == null) {
uniqueKey = "";
}
- traceContext.setMsgUniqueKey(uniqueKey);
+ sendMessageContext.setMsgUniqueKey(uniqueKey);
if (properties.containsKey(MessageConst.PROPERTY_SHARDING_KEY)) {
- traceContext.setMsgType(MessageType.Order_Msg);
+ sendMessageContext.setMsgType(MessageType.Order_Msg);
} else {
- traceContext.setMsgType(MessageType.Normal_Msg);
+ sendMessageContext.setMsgType(MessageType.Normal_Msg);
}
- return traceContext;
+ return sendMessageContext;
}
public boolean hasSendMessageHook() {
@@ -532,29 +537,11 @@ public abstract class AbstractSendMessageProcessor
implements NettyRequestProces
NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
}
- public void executeSendMessageHookBefore(final ChannelHandlerContext ctx,
final RemotingCommand request,
- SendMessageContext context) {
+ public void executeSendMessageHookBefore(SendMessageContext context) {
if (hasSendMessageHook()) {
for (SendMessageHook hook : this.sendMessageHookList) {
try {
- final SendMessageRequestHeader requestHeader =
parseRequestHeader(request);
-
- String namespace =
NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
- if (null != requestHeader) {
- context.setNamespace(namespace);
-
context.setProducerGroup(requestHeader.getProducerGroup());
- context.setTopic(requestHeader.getTopic());
- context.setBodyLength(request.getBody().length);
- context.setMsgProps(requestHeader.getProperties());
-
context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-
context.setBrokerAddr(this.brokerController.getBrokerAddr());
- context.setQueueId(requestHeader.getQueueId());
- }
-
hook.sendMessageBefore(context);
- if (requestHeader != null) {
- requestHeader.setProperties(context.getMsgProps());
- }
} catch (AbortProcessException e) {
throw e;
} catch (Throwable e) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index 35468ab31..dbc87a870 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -70,8 +70,8 @@ public class ReplyMessageProcessor extends
AbstractSendMessageProcessor {
return null;
}
- mqtraceContext = buildMsgContext(ctx, requestHeader);
- this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
+ mqtraceContext = buildMsgContext(ctx, requestHeader, request);
+ this.executeSendMessageHookBefore(mqtraceContext);
RemotingCommand response = this.processReplyMessageRequest(ctx,
request, mqtraceContext, requestHeader);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 3bf1f31a2..14095f9ec 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -81,7 +81,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
- SendMessageContext traceContext;
+ SendMessageContext sendMessageContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
@@ -95,11 +95,9 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
if (rewriteResult != null) {
return rewriteResult;
}
- traceContext = buildMsgContext(ctx, requestHeader);
- String owner =
request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
- traceContext.setCommercialOwner(owner);
+ sendMessageContext = buildMsgContext(ctx, requestHeader,
request);
try {
- this.executeSendMessageHookBefore(ctx, request,
traceContext);
+ this.executeSendMessageHookBefore(sendMessageContext);
} catch (AbortProcessException e) {
final RemotingCommand errorResponse =
RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
errorResponse.setOpaque(request.getOpaque());
@@ -108,10 +106,10 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
RemotingCommand response;
if (requestHeader.isBatch()) {
- response = this.sendBatchMessage(ctx, request,
traceContext, requestHeader, mappingContext,
+ response = this.sendBatchMessage(ctx, request,
sendMessageContext, requestHeader, mappingContext,
(ctx1, response1) ->
executeSendMessageHookAfter(response1, ctx1));
} else {
- response = this.sendMessage(ctx, request, traceContext,
requestHeader, mappingContext,
+ response = this.sendMessage(ctx, request,
sendMessageContext, requestHeader, mappingContext,
(ctx12, response12) ->
executeSendMessageHookAfter(response12, ctx12));
}