RongtongJin commented on code in PR #7891:
URL: https://github.com/apache/rocketmq/pull/7891#discussion_r1519139992


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java:
##########
@@ -536,29 +540,33 @@ private RemotingCommand processRequest(final Channel 
channel, RemotingCommand re
                 getMessageResult.setNextBeginOffset(broadcastInitOffset);
             } else {
                 SubscriptionData finalSubscriptionData = subscriptionData;
-                RemotingCommand finalResponse = response;
                 messageStore.getMessageAsync(group, topic, queueId, 
requestHeader.getQueueOffset(),
                         requestHeader.getMaxMsgNums(), messageFilter)
                     .thenApply(result -> {
                         if (null == result) {
-                            finalResponse.setCode(ResponseCode.SYSTEM_ERROR);
-                            finalResponse.setRemark("store getMessage return 
null");
-                            return finalResponse;
+                            response.setCode(ResponseCode.SYSTEM_ERROR);
+                            response.setRemark("store getMessage return null");
+                            return response;
                         }
+
                         
brokerController.getColdDataCgCtrService().coldAcc(requestHeader.getConsumerGroup(),
 result.getColdDataSum());
-                        return pullMessageResultHandler.handle(
-                            result,
-                            request,
-                            requestHeader,
-                            channel,
-                            finalSubscriptionData,
-                            subscriptionGroupConfig,
-                            brokerAllowSuspend,
-                            messageFilter,
-                            finalResponse,
-                            mappingContext,
-                            beginTimeMills
+                        RemotingCommand finalResponse = 
pullMessageResultHandler.handle(
+                                result,
+                                request,
+                                requestHeader,
+                                channel,
+                                finalSubscriptionData,
+                                subscriptionGroupConfig,
+                                brokerAllowSuspend,
+                                messageFilter,
+                                response,
+                                mappingContext,
+                                beginTimeMills
                         );
+                        // uses asynchronous return, causing the Response in 
doAfterResponse of NettyRemotingAbstract rpcHook to be null
+                        // After successfully pulling the message 
asynchronously, the doAfterResponse of rpcHook is actively called back.
+                        finalResponse = doAfterRpcHooks(channel, request, 
finalResponse);

Review Comment:
   It would be better to avoid executing rpchook within the business layer, as 
this would require all business layers to duplicate the same code. It would be 
more efficient to consolidate everything uniformly at the remoting layer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to