This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ccf973081e [ISSUE #9789] LitePullConsumer supports manually adding 
subscription reported in Heartbeat (#9790)
ccf973081e is described below

commit ccf973081e033511537d2221f695fff023cebce9
Author: qianye <[email protected]>
AuthorDate: Thu Nov 20 19:53:21 2025 +0800

    [ISSUE #9789] LitePullConsumer supports manually adding subscription 
reported in Heartbeat (#9790)
---
 .../client/consumer/DefaultLitePullConsumer.java   | 22 ++++++++++++++++++++--
 .../rocketmq/client/consumer/LitePullConsumer.java |  9 +++++----
 .../impl/consumer/DefaultLitePullConsumerImpl.java |  7 ++++---
 3 files changed, 29 insertions(+), 9 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 20857f14e0..0840354d79 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.client.consumer;
 
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -33,11 +34,13 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 
 import static 
org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData.SUB_ALL;
 
@@ -171,6 +174,8 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
 
     private RPCHook rpcHook;
 
+    private final Set<SubscriptionData> subscriptionsForHeartbeat = new 
HashSet<>();
+
     /**
      * Default constructor.
      */
@@ -618,4 +623,17 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
     public void setEnableMsgTrace(boolean enableMsgTrace) {
         this.enableTrace = enableMsgTrace;
     }
+
+    public Set<SubscriptionData> getSubscriptionsForHeartbeat() {
+        return this.subscriptionsForHeartbeat;
+    }
+
+    public synchronized void buildSubscriptionsForHeartbeat(Map<String, 
MessageSelector> messageSelectorMap) throws Exception {
+        this.subscriptionsForHeartbeat.clear();
+        for (Map.Entry<String, MessageSelector> entry : 
messageSelectorMap.entrySet()) {
+            SubscriptionData subscriptionData = FilterAPI.build(entry.getKey(),
+                entry.getValue().getExpression(), 
entry.getValue().getExpressionType());
+            this.subscriptionsForHeartbeat.add(subscriptionData);
+        }
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index 6c6a5970a6..d16a7c9535 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -16,14 +16,13 @@
  */
 package org.apache.rocketmq.client.consumer;
 
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 
 public interface LitePullConsumer {
 
@@ -107,6 +106,8 @@ public interface LitePullConsumer {
      */
     void setSubExpressionForAssign(final String topic, final String 
subExpression);
 
+    void buildSubscriptionsForHeartbeat(Map<String, MessageSelector> 
subExpressionMap) throws Exception;
+
     /**
      * Fetch data for the topics or partitions specified using assign API
      *
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index f85dcc7b45..6ce8b2d1cd 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -63,6 +63,8 @@ import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
@@ -73,8 +75,6 @@ import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
@@ -1122,7 +1122,8 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         Set<SubscriptionData> subSet = new HashSet<>();
 
         subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
-
+        
subSet.addAll(this.defaultLitePullConsumer.getSubscriptionsForHeartbeat());
+        
         return subSet;
     }
 

Reply via email to