This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ccf973081e [ISSUE #9789] LitePullConsumer supports manually adding
subscription reported in Heartbeat (#9790)
ccf973081e is described below
commit ccf973081e033511537d2221f695fff023cebce9
Author: qianye <[email protected]>
AuthorDate: Thu Nov 20 19:53:21 2025 +0800
[ISSUE #9789] LitePullConsumer supports manually adding subscription
reported in Heartbeat (#9790)
---
.../client/consumer/DefaultLitePullConsumer.java | 22 ++++++++++++++++++++--
.../rocketmq/client/consumer/LitePullConsumer.java | 9 +++++----
.../impl/consumer/DefaultLitePullConsumerImpl.java | 7 ++++---
3 files changed, 29 insertions(+), 9 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 20857f14e0..0840354d79 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.consumer;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,11 +34,13 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import static
org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData.SUB_ALL;
@@ -171,6 +174,8 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
private RPCHook rpcHook;
+ private final Set<SubscriptionData> subscriptionsForHeartbeat = new
HashSet<>();
+
/**
* Default constructor.
*/
@@ -618,4 +623,17 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableTrace = enableMsgTrace;
}
+
+ public Set<SubscriptionData> getSubscriptionsForHeartbeat() {
+ return this.subscriptionsForHeartbeat;
+ }
+
+ public synchronized void buildSubscriptionsForHeartbeat(Map<String,
MessageSelector> messageSelectorMap) throws Exception {
+ this.subscriptionsForHeartbeat.clear();
+ for (Map.Entry<String, MessageSelector> entry :
messageSelectorMap.entrySet()) {
+ SubscriptionData subscriptionData = FilterAPI.build(entry.getKey(),
+ entry.getValue().getExpression(),
entry.getValue().getExpressionType());
+ this.subscriptionsForHeartbeat.add(subscriptionData);
+ }
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index 6c6a5970a6..d16a7c9535 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -16,14 +16,13 @@
*/
package org.apache.rocketmq.client.consumer;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
public interface LitePullConsumer {
@@ -107,6 +106,8 @@ public interface LitePullConsumer {
*/
void setSubExpressionForAssign(final String topic, final String
subExpression);
+ void buildSubscriptionsForHeartbeat(Map<String, MessageSelector>
subExpressionMap) throws Exception;
+
/**
* Fetch data for the topics or partitions specified using assign API
*
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index f85dcc7b45..6ce8b2d1cd 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -63,6 +63,8 @@ import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
@@ -73,8 +75,6 @@ import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
@@ -1122,7 +1122,8 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
Set<SubscriptionData> subSet = new HashSet<>();
subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
-
+
subSet.addAll(this.defaultLitePullConsumer.getSubscriptionsForHeartbeat());
+
return subSet;
}