lizhimins commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055124280


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java:
##########
@@ -80,12 +84,31 @@ public RemotingCommand handle(final GetMessageResult 
getMessageResult,
         final boolean brokerAllowSuspend,
         final MessageFilter messageFilter,
         RemotingCommand response) {
-
         PullMessageProcessor processor = 
brokerController.getPullMessageProcessor();
-        processor.updateBroadcastPulledOffset(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(),
-            requestHeader.getQueueId(), requestHeader, channel, response, 
getMessageResult.getNextBeginOffset());
+        final String clientAddress = 
RemotingHelper.parseChannelRemoteAddr(channel);
+        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+        processor.composeResponseHeader(requestHeader, getMessageResult, 
topicConfig.getTopicSysFlag(),
+            subscriptionGroupConfig, response, clientAddress);
+        try {
+            processor.executeConsumeMessageHookBefore(request, requestHeader, 
getMessageResult, brokerAllowSuspend, response.getCode());
+        } catch (AbortProcessException e) {
+            response.setCode(e.getResponseCode());
+            response.setRemark(e.getErrorMessage());
+            return response;
+        }
 
+        //rewrite the response for the

Review Comment:
   注释不全



##########
broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java:
##########
@@ -283,6 +282,28 @@ public MessageExt getMessage(String topic, long offset, 
int queueId, String brok
         }
     }
 
+    public CompletableFuture<MessageExt> getMessageAsync(String topic, long 
offset, int queueId, String brokerName, boolean deCompressBody) {

Review Comment:
   应该把同步接口也用异步的去实现



##########
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java:
##########
@@ -1269,6 +1269,44 @@ public PullResult pullMessageFromSpecificBroker(String 
brokerName, String broker
         return pullResultExt;
     }
 
+    public CompletableFuture<PullResult> 
pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr,
+                                                    String consumerGroup, 
String topic, int queueId, long offset,
+                                                    int maxNums,
+                                                    long timeoutMillis) throws 
RemotingException, InterruptedException {

Review Comment:
   codestyle is strange



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to