This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f7dbc94ad7 Add expression filtering capability to the
pullBlockIfNotFound method of pull consumer (#8024)
f7dbc94ad7 is described below
commit f7dbc94ad715143ad610e026f73a3d60a01204d6
Author: rongtong <[email protected]>
AuthorDate: Fri Apr 19 13:41:43 2024 +0800
Add expression filtering capability to the pullBlockIfNotFound method of
pull consumer (#8024)
---
.../client/consumer/DefaultMQPullConsumer.java | 18 ++++++++++++++++--
.../rocketmq/client/consumer/MQPullConsumer.java | 22 +++++++++++++++++++---
.../impl/consumer/DefaultMQPullConsumerImpl.java | 15 +++++++++++++++
3 files changed, 50 insertions(+), 5 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index b4ca6ab3b3..089fd39b3e 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -36,8 +36,8 @@ import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
/**
- * @deprecated Default pulling consumer. This class will be removed in 2022,
and a better implementation {@link
- * DefaultLitePullConsumer} is recommend to use in the scenario of actively
pulling messages.
+ * @deprecated Default pulling consumer. This class will be removed in 2022,
and a better implementation
+ * {@link DefaultLitePullConsumer} is recommend to use in the scenario of
actively pulling messages.
*/
@Deprecated
public class DefaultMQPullConsumer extends ClientConfig implements
MQPullConsumer {
@@ -375,6 +375,20 @@ public class DefaultMQPullConsumer extends ClientConfig
implements MQPullConsume
this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq),
subExpression, offset, maxNums, pullCallback);
}
+ @Override
+ public void pullBlockIfNotFoundWithMessageSelector(MessageQueue mq,
MessageSelector selector,
+ long offset, int maxNums,
+ PullCallback pullCallback) throws MQClientException,
RemotingException, InterruptedException {
+
this.defaultMQPullConsumerImpl.pullBlockIfNotFoundWithMessageSelector(mq,
selector, offset, maxNums, pullCallback);
+ }
+
+ @Override
+ public PullResult pullBlockIfNotFoundWithMessageSelector(MessageQueue mq,
MessageSelector selector,
+ long offset,
+ int maxNums) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
+ return
this.defaultMQPullConsumerImpl.pullBlockIfNotFoundWithMessageSelector(mq,
selector, offset, maxNums);
+ }
+
@Override
public void updateConsumeOffset(MessageQueue mq, long offset) throws
MQClientException {
this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq),
offset);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 868ee93ff8..ee77b12bbc 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -47,8 +47,7 @@ public interface MQPullConsumer extends MQConsumer {
*
* @param mq from which message queue
* @param subExpression subscription expression.it only support or
operation such as "tag1 || tag2 || tag3" <br> if
- * null or * expression,meaning subscribe
- * all
+ * null or * expression,meaning subscribe all
* @param offset from where to pull
* @param maxNums max pulling numbers
* @return The resulting {@code PullRequest}
@@ -121,7 +120,7 @@ public interface MQPullConsumer extends MQConsumer {
InterruptedException;
/**
- * Pulling the messages in a async. way. Support message selection
+ * Pulling the messages in a async way. Support message selection
*/
void pull(final MessageQueue mq, final MessageSelector selector, final
long offset, final int maxNums,
final PullCallback pullCallback) throws MQClientException,
RemotingException,
@@ -150,6 +149,23 @@ public interface MQPullConsumer extends MQConsumer {
final int maxNums, final PullCallback pullCallback) throws
MQClientException, RemotingException,
InterruptedException;
+ /**
+ * Pulling the messages through callback function,if no message
arrival,blocking. Support message selection
+ */
+ void pullBlockIfNotFoundWithMessageSelector(final MessageQueue mq, final
MessageSelector selector,
+ final long offset, final int maxNums,
+ final PullCallback pullCallback) throws MQClientException,
RemotingException,
+ InterruptedException;
+
+ /**
+ * Pulling the messages,if no message arrival,blocking some time. Support
message selection
+ *
+ * @return The resulting {@code PullRequest}
+ */
+ PullResult pullBlockIfNotFoundWithMessageSelector(final MessageQueue mq,
final MessageSelector selector,
+ final long offset, final int maxNums) throws MQClientException,
RemotingException,
+ MQBrokerException, InterruptedException;
+
/**
* Update the offset
*/
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 91d72989ca..c877ccc070 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -589,6 +589,21 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}
+ public void pullBlockIfNotFoundWithMessageSelector(MessageQueue mq,
MessageSelector messageSelector, long offset, int maxNums,
+ PullCallback pullCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ SubscriptionData subscriptionData = getSubscriptionData(mq,
messageSelector);
+ this.pullAsyncImpl(mq, subscriptionData, offset, maxNums,
pullCallback, true,
+ this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+ }
+
+ public PullResult pullBlockIfNotFoundWithMessageSelector(MessageQueue mq,
MessageSelector messageSelector, long offset, int maxNums)
+ throws MQClientException, RemotingException, InterruptedException,
MQBrokerException {
+ SubscriptionData subscriptionData = getSubscriptionData(mq,
messageSelector);
+ return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true,
this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+ }
+
+
public QueryResult queryMessage(String topic, String key, int maxNum, long
begin, long end)
throws MQClientException, InterruptedException {
this.isRunning();