This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7fe0349f71 [ISSUE #10063] Notification request adds subscription
expression to support on-demand wake-up (#10064)
7fe0349f71 is described below
commit 7fe0349f7167de2b97b644602e04c9fe05a84333
Author: qianye <[email protected]>
AuthorDate: Thu Feb 5 10:55:32 2026 +0800
[ISSUE #10063] Notification request adds subscription expression to support
on-demand wake-up (#10064)
---
.../broker/processor/NotificationProcessor.java | 102 ++++++++++++++++++---
.../org/apache/rocketmq/common/BrokerConfig.java | 22 ++++-
.../protocol/header/NotificationRequestHeader.java | 19 ++++
.../rocketmq/test/client/rmq/RMQPopClient.java | 8 ++
4 files changed, 137 insertions(+), 14 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 4563132fe4..24b587d1c6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -20,7 +20,11 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.Map;
import java.util.Random;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.filter.ConsumerFilterData;
+import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.longpolling.PollingHeader;
import org.apache.rocketmq.broker.longpolling.PollingResult;
import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
@@ -29,6 +33,7 @@ import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -37,10 +42,17 @@ import
org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
+import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
+import org.apache.rocketmq.store.queue.CqUnit;
+import org.apache.rocketmq.store.queue.ReferredIterator;
+import org.rocksdb.RocksDBException;
public class NotificationProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -136,25 +148,60 @@ public class NotificationProcessor implements
NettyRequestProcessor {
int randomQ = random.nextInt(100);
boolean hasMsg = false;
BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+
+ SubscriptionData subscriptionData = null;
+ ExpressionMessageFilter messageFilter = null;
+ if (brokerConfig.isUseMessageFilterForNotification() &&
+ StringUtils.isNotEmpty(requestHeader.getExpType()) &&
+ StringUtils.isNotEmpty(requestHeader.getExp())) {
+ try {
+ // origin topic
+ subscriptionData = FilterAPI.build(
+ requestHeader.getTopic(), requestHeader.getExp(),
requestHeader.getExpType());
+
+ ConsumerFilterData consumerFilterData = null;
+ if
(!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+ consumerFilterData = ConsumerFilterManager.build(
+ requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getExp(),
+ requestHeader.getExpType(),
System.currentTimeMillis());
+ if (consumerFilterData == null) {
+ POP_LOGGER.warn("Parse the consumer's subscription[{}]
failed, group: {}",
+ requestHeader.getExp(),
requestHeader.getConsumerGroup());
+
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
+ response.setRemark("parse the consumer's subscription
failed");
+ return response;
+ }
+ }
+ messageFilter = new ExpressionMessageFilter(
+ subscriptionData, consumerFilterData,
brokerController.getConsumerFilterManager());
+ } catch (Exception e) {
+ POP_LOGGER.warn("Parse the consumer's subscription[{}] error,
group: {}", requestHeader.getExp(),
+ requestHeader.getConsumerGroup());
+ response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
+ response.setRemark("parse the consumer's subscription failed");
+ return response;
+ }
+ }
+
if (requestHeader.getQueueId() < 0) {
// read all queue
- hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader);
+ hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader,
subscriptionData, messageFilter);
} else {
int queueId = requestHeader.getQueueId();
- hasMsg = hasMsgFromQueue(topicConfig.getTopicName(),
requestHeader, queueId);
+ hasMsg = hasMsgFromQueue(topicConfig.getTopicName(),
requestHeader, queueId, subscriptionData, messageFilter);
}
// if it doesn't have message, fetch retry
if (!hasMsg) {
String retryTopic =
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
- hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader);
+ hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader, null,
null);
if (!hasMsg && brokerConfig.isEnableRetryTopicV2() &&
brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
String retryTopicConfigV1 =
KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(),
requestHeader.getConsumerGroup());
- hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ,
requestHeader);
+ hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ,
requestHeader, null, null);
}
}
if (!hasMsg) {
- PollingResult pollingResult = popLongPollingService.polling(ctx,
request, new PollingHeader(requestHeader));
+ PollingResult pollingResult = popLongPollingService.polling(ctx,
request, new PollingHeader(requestHeader), subscriptionData, messageFilter);
if (pollingResult == PollingResult.POLLING_SUC) {
return null;
} else if (pollingResult == PollingResult.POLLING_FULL) {
@@ -166,19 +213,19 @@ public class NotificationProcessor implements
NettyRequestProcessor {
return response;
}
- private boolean hasMsgFromTopic(String topicName, int randomQ,
NotificationRequestHeader requestHeader)
+ private boolean hasMsgFromTopic(String topicName, int randomQ,
NotificationRequestHeader requestHeader, SubscriptionData subscriptionData,
MessageFilter messageFilter)
throws RemotingCommandException {
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(topicName);
- return hasMsgFromTopic(topicConfig, randomQ, requestHeader);
+ return hasMsgFromTopic(topicConfig, randomQ, requestHeader,
subscriptionData, messageFilter);
}
- private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ,
NotificationRequestHeader requestHeader)
+ private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ,
NotificationRequestHeader requestHeader, SubscriptionData subscriptionData,
MessageFilter messageFilter)
throws RemotingCommandException {
boolean hasMsg;
if (topicConfig != null) {
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
- hasMsg = hasMsgFromQueue(topicConfig.getTopicName(),
requestHeader, queueId);
+ hasMsg = hasMsgFromQueue(topicConfig.getTopicName(),
requestHeader, queueId, subscriptionData, messageFilter);
if (hasMsg) {
return true;
}
@@ -187,7 +234,7 @@ public class NotificationProcessor implements
NettyRequestProcessor {
return false;
}
- private boolean hasMsgFromQueue(String targetTopic,
NotificationRequestHeader requestHeader, int queueId) throws
RemotingCommandException {
+ private boolean hasMsgFromQueue(String targetTopic,
NotificationRequestHeader requestHeader, int queueId, SubscriptionData
subscriptionData, MessageFilter messageFilter) throws RemotingCommandException {
if (Boolean.TRUE.equals(requestHeader.getOrder())) {
if
(this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getAttemptId(),
requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
return false;
@@ -196,9 +243,40 @@ public class NotificationProcessor implements
NettyRequestProcessor {
long offset = getPopOffset(targetTopic,
requestHeader.getConsumerGroup(), queueId);
try {
long restNum =
this.brokerController.getMessageStore().getMaxOffsetInQueue(targetTopic,
queueId) - offset;
+ int maxFilterMessageNum =
this.brokerController.getBrokerConfig().getMaxMessageFilterNumForNotification();
+ boolean needFilter = restNum < maxFilterMessageNum &&
+ subscriptionData != null &&
+ messageFilter != null &&
+ ExpressionType.isTagType(subscriptionData.getExpressionType());
+ if (needFilter) {
+ ConsumeQueueInterface queue =
this.brokerController.getMessageStore().getConsumeQueue(targetTopic, queueId);
+ // If the ConsumeQueue doesn't exist, it's not readable.
+ if (queue == null) {
+ return false;
+ }
+ ReferredIterator<CqUnit> iterator = null;
+ try {
+ // In order to take into account both the file CQ and the
Rocksdb CQ,
+ // the count passed here is 32.
+ iterator = queue.iterateFrom(offset, 32);
+ if (iterator != null) {
+ while (iterator.hasNext()) {
+ CqUnit cqUnit = iterator.next();
+ if
(messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(),
cqUnit.getCqExtUnit())) {
+ return true;
+ }
+ }
+ return false;
+ }
+ } finally {
+ if (iterator != null) {
+ iterator.release();
+ }
+ }
+ }
return restNum > 0;
- } catch (ConsumeQueueException e) {
- throw new RemotingCommandException("Failed tp get max offset in
queue", e);
+ } catch (ConsumeQueueException | RocksDBException e) {
+ throw new RemotingCommandException("Failed to get max offset in
queue or iterate in queue", e);
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index caee5e45f2..7271c12b18 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.common;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.config.ConfigManagerVersion;
import org.apache.rocketmq.common.constant.PermName;
@@ -24,8 +25,6 @@ import org.apache.rocketmq.common.metrics.MetricsExporterType;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;
-import java.util.concurrent.TimeUnit;
-
public class BrokerConfig extends BrokerIdentity {
private String brokerConfigPath = null;
@@ -256,6 +255,9 @@ public class BrokerConfig extends BrokerIdentity {
private boolean useSeparateRetryQueue = false;
private boolean realTimeNotifyConsumerChange = true;
+ private boolean useMessageFilterForNotification = true;
+ private int maxMessageFilterNumForNotification = 64;
+
private boolean litePullMessageEnable = true;
// The period to sync broker member group from namesrv, default value is 1
second
@@ -2407,4 +2409,20 @@ public class BrokerConfig extends BrokerIdentity {
public void setLiteLagLatencyTopK(int liteLagLatencyTopK) {
this.liteLagLatencyTopK = liteLagLatencyTopK;
}
+
+ public boolean isUseMessageFilterForNotification() {
+ return useMessageFilterForNotification;
+ }
+
+ public void setUseMessageFilterForNotification(boolean
useMessageFilterForNotification) {
+ this.useMessageFilterForNotification = useMessageFilterForNotification;
+ }
+
+ public int getMaxMessageFilterNumForNotification() {
+ return maxMessageFilterNumForNotification;
+ }
+
+ public void setMaxMessageFilterNumForNotification(int
maxMessageFilterNumForNotification) {
+ this.maxMessageFilterNumForNotification =
maxMessageFilterNumForNotification;
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
index 0e484f82c0..46c5930c1d 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
@@ -44,6 +44,9 @@ public class NotificationRequestHeader extends
TopicQueueRequestHeader {
private Boolean order = Boolean.FALSE;
private String attemptId;
+ private String expType;
+ private String exp;
+
@CFNotNull
@Override
public void checkFields() throws RemotingCommandException {
@@ -108,6 +111,22 @@ public class NotificationRequestHeader extends
TopicQueueRequestHeader {
this.attemptId = attemptId;
}
+ public String getExpType() {
+ return expType;
+ }
+
+ public void setExpType(String expType) {
+ this.expType = expType;
+ }
+
+ public String getExp() {
+ return exp;
+ }
+
+ public void setExp(String exp) {
+ this.exp = exp;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
diff --git
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index 09c60c0b45..c45a26c59d 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -199,6 +199,12 @@ public class RMQPopClient implements MQConsumer {
public CompletableFuture<Boolean> notification(String brokerAddr, String
topic,
String consumerGroup, int queueId, Boolean order, String attemptId,
long pollTime, long bornTime, long timeoutMillis) {
+ return notification(brokerAddr, topic, consumerGroup, queueId, order,
attemptId, pollTime, bornTime, timeoutMillis, null, null);
+ }
+
+
+ public CompletableFuture<Boolean> notification(String brokerAddr, String
topic,
+ String consumerGroup, int queueId, Boolean order, String attemptId,
long pollTime, long bornTime, long timeoutMillis, String expType, String exp) {
NotificationRequestHeader requestHeader = new
NotificationRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(topic);
@@ -207,6 +213,8 @@ public class RMQPopClient implements MQConsumer {
requestHeader.setBornTime(bornTime);
requestHeader.setOrder(order);
requestHeader.setAttemptId(attemptId);
+ requestHeader.setExpType(expType);
+ requestHeader.setExp(exp);
return this.mqClientAPI.notification(brokerAddr, requestHeader,
timeoutMillis);
}
}