This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 671977e46 [ISSUE #5847] Add checkBlock for hasMsgFromQueue
671977e46 is described below
commit 671977e4694865c6102cc85ce2e2fd0a8833561e
Author: zhouxiang <[email protected]>
AuthorDate: Fri Feb 3 16:07:34 2023 +0800
[ISSUE #5847] Add checkBlock for hasMsgFromQueue
---
.../org/apache/rocketmq/broker/processor/NotificationProcessor.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index e10d72328..3b306ca2d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -195,6 +195,7 @@ public class NotificationProcessor implements
NettyRequestProcessor {
response.setRemark(errorInfo);
return response;
}
+
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
@@ -263,6 +264,9 @@ public class NotificationProcessor implements
NettyRequestProcessor {
}
private boolean hasMsgFromQueue(boolean isRetry, NotificationRequestHeader
requestHeader, int queueId) {
+ if
(this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(),
requestHeader.getConsumerGroup(), queueId, 0)) {
+ return false;
+ }
String topic = isRetry ?
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup()) : requestHeader.getTopic();
long offset = getPopOffset(topic, requestHeader.getConsumerGroup(),
queueId);
long restNum =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) -
offset;