This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f17112963 Optimize the build of sendMessageContext and avoid 
unnecessary repeated request parsing (#5578)
f17112963 is described below

commit f17112963cad4d7b489d136c3c9f6dae3567c0f5
Author: rongtong <[email protected]>
AuthorDate: Thu Nov 24 09:54:54 2022 +0800

    Optimize the build of sendMessageContext and avoid unnecessary repeated 
request parsing (#5578)
---
 .../processor/AbstractSendMessageProcessor.java    | 57 +++++++++-------------
 .../broker/processor/ReplyMessageProcessor.java    |  4 +-
 .../broker/processor/SendMessageProcessor.java     | 12 ++---
 3 files changed, 29 insertions(+), 44 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index fe7040863..d87b765b6 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -345,20 +345,25 @@ public abstract class AbstractSendMessageProcessor 
implements NettyRequestProces
     }
 
     protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx,
-        SendMessageRequestHeader requestHeader) {
+        SendMessageRequestHeader requestHeader, RemotingCommand request) {
         String namespace = 
NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
 
-        SendMessageContext traceContext;
-        traceContext = new SendMessageContext();
-        traceContext.setNamespace(namespace);
-        traceContext.setProducerGroup(requestHeader.getProducerGroup());
-        traceContext.setTopic(requestHeader.getTopic());
-        traceContext.setMsgProps(requestHeader.getProperties());
-        
traceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-        traceContext.setBrokerAddr(this.brokerController.getBrokerAddr());
-        
traceContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId());
-        traceContext.setBornTimeStamp(requestHeader.getBornTimestamp());
-        traceContext.setRequestTimeStamp(System.currentTimeMillis());
+        SendMessageContext sendMessageContext;
+        sendMessageContext = new SendMessageContext();
+        sendMessageContext.setNamespace(namespace);
+        sendMessageContext.setProducerGroup(requestHeader.getProducerGroup());
+        sendMessageContext.setTopic(requestHeader.getTopic());
+        sendMessageContext.setBodyLength(request.getBody().length);
+        sendMessageContext.setMsgProps(requestHeader.getProperties());
+        
sendMessageContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+        
sendMessageContext.setBrokerAddr(this.brokerController.getBrokerAddr());
+        sendMessageContext.setQueueId(requestHeader.getQueueId());
+        
sendMessageContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId());
+        sendMessageContext.setBornTimeStamp(requestHeader.getBornTimestamp());
+        sendMessageContext.setRequestTimeStamp(System.currentTimeMillis());
+
+        String owner = 
request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+        sendMessageContext.setCommercialOwner(owner);
 
         Map<String, String> properties = 
MessageDecoder.string2messageProperties(requestHeader.getProperties());
         String uniqueKey = 
properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
@@ -369,14 +374,14 @@ public abstract class AbstractSendMessageProcessor 
implements NettyRequestProces
         if (uniqueKey == null) {
             uniqueKey = "";
         }
-        traceContext.setMsgUniqueKey(uniqueKey);
+        sendMessageContext.setMsgUniqueKey(uniqueKey);
 
         if (properties.containsKey(MessageConst.PROPERTY_SHARDING_KEY)) {
-            traceContext.setMsgType(MessageType.Order_Msg);
+            sendMessageContext.setMsgType(MessageType.Order_Msg);
         } else {
-            traceContext.setMsgType(MessageType.Normal_Msg);
+            sendMessageContext.setMsgType(MessageType.Normal_Msg);
         }
-        return traceContext;
+        return sendMessageContext;
     }
 
     public boolean hasSendMessageHook() {
@@ -532,29 +537,11 @@ public abstract class AbstractSendMessageProcessor 
implements NettyRequestProces
         NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
     }
 
-    public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, 
final RemotingCommand request,
-        SendMessageContext context) {
+    public void executeSendMessageHookBefore(SendMessageContext context) {
         if (hasSendMessageHook()) {
             for (SendMessageHook hook : this.sendMessageHookList) {
                 try {
-                    final SendMessageRequestHeader requestHeader = 
parseRequestHeader(request);
-
-                    String namespace = 
NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
-                    if (null != requestHeader) {
-                        context.setNamespace(namespace);
-                        
context.setProducerGroup(requestHeader.getProducerGroup());
-                        context.setTopic(requestHeader.getTopic());
-                        context.setBodyLength(request.getBody().length);
-                        context.setMsgProps(requestHeader.getProperties());
-                        
context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-                        
context.setBrokerAddr(this.brokerController.getBrokerAddr());
-                        context.setQueueId(requestHeader.getQueueId());
-                    }
-
                     hook.sendMessageBefore(context);
-                    if (requestHeader != null) {
-                        requestHeader.setProperties(context.getMsgProps());
-                    }
                 } catch (AbortProcessException e) {
                     throw e;
                 } catch (Throwable e) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index 35468ab31..dbc87a870 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -70,8 +70,8 @@ public class ReplyMessageProcessor extends 
AbstractSendMessageProcessor {
             return null;
         }
 
-        mqtraceContext = buildMsgContext(ctx, requestHeader);
-        this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
+        mqtraceContext = buildMsgContext(ctx, requestHeader, request);
+        this.executeSendMessageHookBefore(mqtraceContext);
 
         RemotingCommand response = this.processReplyMessageRequest(ctx, 
request, mqtraceContext, requestHeader);
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 3bf1f31a2..14095f9ec 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -81,7 +81,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
-        SendMessageContext traceContext;
+        SendMessageContext sendMessageContext;
         switch (request.getCode()) {
             case RequestCode.CONSUMER_SEND_MSG_BACK:
                 return this.consumerSendMsgBack(ctx, request);
@@ -95,11 +95,9 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                 if (rewriteResult != null) {
                     return rewriteResult;
                 }
-                traceContext = buildMsgContext(ctx, requestHeader);
-                String owner = 
request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
-                traceContext.setCommercialOwner(owner);
+                sendMessageContext = buildMsgContext(ctx, requestHeader, 
request);
                 try {
-                    this.executeSendMessageHookBefore(ctx, request, 
traceContext);
+                    this.executeSendMessageHookBefore(sendMessageContext);
                 } catch (AbortProcessException e) {
                     final RemotingCommand errorResponse = 
RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
                     errorResponse.setOpaque(request.getOpaque());
@@ -108,10 +106,10 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
 
                 RemotingCommand response;
                 if (requestHeader.isBatch()) {
-                    response = this.sendBatchMessage(ctx, request, 
traceContext, requestHeader, mappingContext,
+                    response = this.sendBatchMessage(ctx, request, 
sendMessageContext, requestHeader, mappingContext,
                         (ctx1, response1) -> 
executeSendMessageHookAfter(response1, ctx1));
                 } else {
-                    response = this.sendMessage(ctx, request, traceContext, 
requestHeader, mappingContext,
+                    response = this.sendMessage(ctx, request, 
sendMessageContext, requestHeader, mappingContext,
                         (ctx12, response12) -> 
executeSendMessageHookAfter(response12, ctx12));
                 }
 

Reply via email to