lizhimins commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055124280
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java:
##########
@@ -80,12 +84,31 @@ public RemotingCommand handle(final GetMessageResult
getMessageResult,
final boolean brokerAllowSuspend,
final MessageFilter messageFilter,
RemotingCommand response) {
-
PullMessageProcessor processor =
brokerController.getPullMessageProcessor();
- processor.updateBroadcastPulledOffset(requestHeader.getTopic(),
requestHeader.getConsumerGroup(),
- requestHeader.getQueueId(), requestHeader, channel, response,
getMessageResult.getNextBeginOffset());
+ final String clientAddress =
RemotingHelper.parseChannelRemoteAddr(channel);
+ TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+ processor.composeResponseHeader(requestHeader, getMessageResult,
topicConfig.getTopicSysFlag(),
+ subscriptionGroupConfig, response, clientAddress);
+ try {
+ processor.executeConsumeMessageHookBefore(request, requestHeader,
getMessageResult, brokerAllowSuspend, response.getCode());
+ } catch (AbortProcessException e) {
+ response.setCode(e.getResponseCode());
+ response.setRemark(e.getErrorMessage());
+ return response;
+ }
+ //rewrite the response for the
Review Comment:
注释不全
##########
broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java:
##########
@@ -283,6 +282,28 @@ public MessageExt getMessage(String topic, long offset,
int queueId, String brok
}
}
+ public CompletableFuture<MessageExt> getMessageAsync(String topic, long
offset, int queueId, String brokerName, boolean deCompressBody) {
Review Comment:
应该把同步接口也用异步的去实现
##########
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java:
##########
@@ -1269,6 +1269,44 @@ public PullResult pullMessageFromSpecificBroker(String
brokerName, String broker
return pullResultExt;
}
+ public CompletableFuture<PullResult>
pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr,
+ String consumerGroup,
String topic, int queueId, long offset,
+ int maxNums,
+ long timeoutMillis) throws
RemotingException, InterruptedException {
Review Comment:
codestyle is strange
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]