This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 804847e877 test (#9010)
804847e877 is described below
commit 804847e87765f835afa147887f9507b8a41ae08c
Author: qianye <[email protected]>
AuthorDate: Fri Nov 29 17:18:49 2024 +0800
test (#9010)
---
.../client/consumer/DefaultMQPullConsumer.java | 29 ++++++++++++++++++++++
.../impl/consumer/DefaultMQPullConsumerImpl.java | 9 +++++--
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 9e7a86d9b4..38841e4128 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -16,9 +16,11 @@
*/
package org.apache.rocketmq.client.consumer;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
@@ -33,7 +35,9 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
/**
* @deprecated Default pulling consumer. This class will be removed in 2022,
and a better implementation
@@ -77,6 +81,8 @@ public class DefaultMQPullConsumer extends ClientConfig
implements MQPullConsume
* Topic set you want to register
*/
private Set<String> registerTopics = new HashSet<>();
+
+ private final Set<SubscriptionData> registerSubscriptions =
Collections.newSetFromMap(new ConcurrentHashMap<>());
/**
* Queue allocation algorithm
*/
@@ -255,6 +261,29 @@ public class DefaultMQPullConsumer extends ClientConfig
implements MQPullConsume
this.registerTopics = withNamespace(registerTopics);
}
+ public Set<SubscriptionData> getRegisterSubscriptions() {
+ return registerSubscriptions;
+ }
+
+ public void addRegisterSubscriptions(String topic, MessageSelector
messageSelector) throws MQClientException {
+ try {
+ if (messageSelector == null) {
+ messageSelector =
MessageSelector.byTag(SubscriptionData.SUB_ALL);
+ }
+
+ SubscriptionData subscriptionData =
FilterAPI.build(withNamespace(topic),
+ messageSelector.getExpression(),
messageSelector.getExpressionType());
+
+ this.registerSubscriptions.add(subscriptionData);
+ } catch (Exception e) {
+ throw new MQClientException("add subscription exception", e);
+ }
+ }
+
+ public void clearRegisterSubscriptions() {
+ this.registerSubscriptions.clear();
+ }
+
/**
* This method will be removed or it's visibility will be changed in a
certain version after April 5, 2020, so
* please do not use this method.
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 371a4a0dbd..9d46e28f5d 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -54,6 +54,8 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -63,8 +65,6 @@ import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
/**
* This class will be removed in 2022, and a better implementation {@link
DefaultLitePullConsumerImpl} is recommend to use
@@ -356,6 +356,11 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
@Override
public Set<SubscriptionData> subscriptions() {
+ Set<SubscriptionData> registerSubscriptions =
defaultMQPullConsumer.getRegisterSubscriptions();
+ if (registerSubscriptions != null && !registerSubscriptions.isEmpty())
{
+ return registerSubscriptions;
+ }
+
Set<SubscriptionData> result = new HashSet<>();
Set<String> topics = this.defaultMQPullConsumer.getRegisterTopics();