This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new cb7fa3e0b0 Revise the measurement method of GROUP_GET_LATENCY to
reveal its intended semantics (#7808)
cb7fa3e0b0 is described below
commit cb7fa3e0b015ec1bf74b95a98e7ef84eaf83b52d
Author: rongtong <[email protected]>
AuthorDate: Sun Feb 4 10:27:21 2024 +0800
Revise the measurement method of GROUP_GET_LATENCY to reveal its intended
semantics (#7808)
---
.../apache/rocketmq/broker/plugin/PullMessageResultHandler.java | 3 ++-
.../rocketmq/broker/processor/DefaultPullMessageResultHandler.java | 5 ++---
.../org/apache/rocketmq/broker/processor/PeekMessageProcessor.java | 2 +-
.../org/apache/rocketmq/broker/processor/PopMessageProcessor.java | 2 +-
.../org/apache/rocketmq/broker/processor/PullMessageProcessor.java | 7 +++++--
5 files changed, 11 insertions(+), 8 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
index 0b9f4295c2..bddb57f150 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
@@ -51,5 +51,6 @@ public interface PullMessageResultHandler {
final boolean brokerAllowSuspend,
final MessageFilter messageFilter,
final RemotingCommand response,
- final TopicQueueMappingContext mappingContext);
+ final TopicQueueMappingContext mappingContext,
+ final long beginTimeMills);
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index 913e1a96c4..43b66b4c51 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -84,7 +84,8 @@ public class DefaultPullMessageResultHandler implements
PullMessageResultHandler
final boolean brokerAllowSuspend,
final MessageFilter messageFilter,
RemotingCommand response,
- TopicQueueMappingContext mappingContext) {
+ TopicQueueMappingContext mappingContext,
+ long beginTimeMills) {
PullMessageProcessor processor =
brokerController.getPullMessageProcessor();
final String clientAddress =
RemotingHelper.parseChannelRemoteAddr(channel);
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
@@ -137,8 +138,6 @@ public class DefaultPullMessageResultHandler implements
PullMessageResultHandler
}
if
(this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
-
- final long beginTimeMills =
this.brokerController.getMessageStore().now();
final byte[] r =
this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
index a72759883c..55552003d8 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
@@ -81,6 +81,7 @@ public class PeekMessageProcessor implements
NettyRequestProcessor {
private RemotingCommand processRequest(final Channel channel,
RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
+ final long beginTimeMills =
this.brokerController.getMessageStore().now();
RemotingCommand response =
RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
final PopMessageResponseHeader responseHeader =
(PopMessageResponseHeader) response.readCustomHeader();
final PeekMessageRequestHeader requestHeader =
@@ -188,7 +189,6 @@ public class PeekMessageProcessor implements
NettyRequestProcessor {
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(),
getMessageResult.getMessageCount());
if
(this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
- final long beginTimeMills =
this.brokerController.getMessageStore().now();
final byte[] r =
this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 105e11643f..59ff2e0fd5 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -196,6 +196,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
RemotingCommand request)
throws RemotingCommandException {
+ final long beginTimeMills =
this.brokerController.getMessageStore().now();
request.addExtFieldIfNotExist(BORN_TIME,
String.valueOf(System.currentTimeMillis()));
if (Objects.equals(request.getExtFields().get(BORN_TIME), "0")) {
request.addExtField(BORN_TIME,
String.valueOf(System.currentTimeMillis()));
@@ -435,7 +436,6 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
switch (finalResponse.getCode()) {
case ResponseCode.SUCCESS:
if
(this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
- final long beginTimeMills =
this.brokerController.getMessageStore().now();
final byte[] r =
this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index ea9c327e98..d53454f215 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -300,6 +300,7 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
private RemotingCommand processRequest(final Channel channel,
RemotingCommand request, boolean brokerAllowSuspend, boolean
brokerAllowFlowCtrSuspend)
throws RemotingCommandException {
+ final long beginTimeMills =
this.brokerController.getMessageStore().now();
RemotingCommand response =
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
@@ -555,7 +556,8 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
brokerAllowSuspend,
messageFilter,
finalResponse,
- mappingContext
+ mappingContext,
+ beginTimeMills
);
})
.thenAccept(result ->
NettyRemotingAbstract.writeResponse(channel, request, result));
@@ -574,7 +576,8 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
brokerAllowSuspend,
messageFilter,
response,
- mappingContext
+ mappingContext,
+ beginTimeMills
);
}
return null;