This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 18c30cbab6 [ISSUE #8592] Not notify long polling request when pop
orderly consume blocked (#8593)
18c30cbab6 is described below
commit 18c30cbab653a2e5c383aace271e1972204b5291
Author: lizhimins <[email protected]>
AuthorDate: Thu Aug 29 15:05:52 2024 +0800
[ISSUE #8592] Not notify long polling request when pop orderly consume
blocked (#8593)
---
.../broker/processor/PopMessageProcessor.java | 24 ++++++++++++++--------
1 file changed, 15 insertions(+), 9 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 6073023722..47ef8e4013 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -551,18 +551,24 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
future.whenComplete((result, throwable) ->
queueLockManager.unLock(lockKey));
offset = getPopOffset(topic, requestHeader.getConsumerGroup(),
queueId, requestHeader.getInitMode(),
true, lockKey, true);
- if (isOrder &&
brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic,
- requestHeader.getConsumerGroup(), queueId,
requestHeader.getInvisibleTime())) {
-
future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
queueId) - offset + restNum);
- return future;
- }
+ // Current requests would calculate the total number of messages
+ // waiting to be filtered for new message arrival notifications in
+ // the long-polling service, need disregarding the backlog in order
+ // consumption scenario. If rest message num including the blocked
+ // queue accumulation would lead to frequent unnecessary wake-ups
+ // of long-polling requests, resulting unnecessary CPU usage.
+ // When client ack message, long-polling request would be
notifications
+ // by AckMessageProcessor.ackOrderly() and message will not be
delayed.
if (isOrder) {
+ if (brokerController.getConsumerOrderInfoManager().checkBlock(
+ attemptId, topic, requestHeader.getConsumerGroup(),
queueId, requestHeader.getInvisibleTime())) {
+ // should not add accumulation(max offset - consumer
offset) here
+ future.complete(restNum);
+ return future;
+ }
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(
- topic,
- requestHeader.getConsumerGroup(),
- queueId
- );
+ topic, requestHeader.getConsumerGroup(), queueId);
}
if (getMessageResult.getMessageMapedList().size() >=
requestHeader.getMaxMsgNums()) {