This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new e7097ae5 [ISSUE #1156] [C++] SimpleConsumer support subscription
expression (#1158)
e7097ae5 is described below
commit e7097ae507c4e20277d93b31a819e464f3ab57df
Author: lizhimins <[email protected]>
AuthorDate: Thu Dec 25 16:26:33 2025 +0800
[ISSUE #1156] [C++] SimpleConsumer support subscription expression (#1158)
---
cpp/examples/ExampleSimpleConsumer.cpp | 3 +++
cpp/source/rocketmq/SimpleConsumerImpl.cpp | 31 ++++++++++++++++++++++--
cpp/source/rocketmq/include/SimpleConsumerImpl.h | 4 ++-
3 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp
b/cpp/examples/ExampleSimpleConsumer.cpp
index d89d0b13..4aacfd08 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -40,7 +40,9 @@ int main(int argc, char* argv[]) {
logger.setLevel(Level::Info);
logger.init();
+ // Subscribe with Tag or SQL92 to filter messages on the server side
std::string tag = "*";
+ // auto filter_expression = new FilterExpression("a = 1",
ExpressionType::SQL92);
CredentialsProviderPtr credentials_provider;
if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
@@ -57,6 +59,7 @@ int main(int argc, char* argv[]) {
.withSsl(FLAGS_tls)
.build())
.subscribe(FLAGS_topic, tag)
+ // .subscribe(FLAGS_topic, *filter_expression)
.withAwaitDuration(std::chrono::seconds(10))
.build();
std::size_t total = 0;
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index 25803429..5bb3eaf9 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -290,6 +290,17 @@ void SimpleConsumerImpl::refreshAssignments() {
}
}
+absl::optional<FilterExpression> SimpleConsumerImpl::getFilterExpression(const
std::string &topic) const {
+ {
+ absl::MutexLock lk(&subscriptions_mtx_);
+ auto it = subscriptions_.find(topic);
+ if (it != subscriptions_.end()) {
+ return absl::make_optional(it->second);
+ }
+ return absl::nullopt;
+ }
+}
+
void SimpleConsumerImpl::receive(std::size_t limit,
std::chrono::milliseconds invisible_duration,
ReceiveCallback callback) {
@@ -331,8 +342,24 @@ void SimpleConsumerImpl::receive(std::size_t limit,
request.mutable_message_queue()->CopyFrom(assignment.message_queue());
request.set_batch_size((int32_t) limit);
- request.mutable_filter_expression()->set_type(rmq::FilterType::TAG);
- request.mutable_filter_expression()->set_expression("*");
+ auto filter_expression = request.mutable_filter_expression();
+ auto&& optional =
getFilterExpression(assignment.message_queue().topic().name());
+ if (optional.has_value()) {
+ auto expression = optional.value();
+ switch (expression.type_) {
+ case TAG:
+ filter_expression->set_type(rmq::FilterType::TAG);
+ filter_expression->set_expression(expression.content_);
+ break;
+ case SQL92:
+ filter_expression->set_type(rmq::FilterType::SQL);
+ filter_expression->set_expression(expression.content_);
+ break;
+ }
+ } else {
+ filter_expression->set_type(rmq::FilterType::TAG);
+ filter_expression->set_expression("*");
+ }
auto invisible_duration_request =
google::protobuf::util::TimeUtil::MillisecondsToDuration(invisible_duration.count());
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index 0bcb7fca..22850043 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -66,7 +66,7 @@ protected:
private:
absl::flat_hash_map<std::string, FilterExpression> subscriptions_
GUARDED_BY(subscriptions_mtx_);
- absl::Mutex subscriptions_mtx_;
+ mutable absl::Mutex subscriptions_mtx_;
absl::flat_hash_map<std::string, std::vector<rmq::Assignment>>
topic_assignments_ GUARDED_BY(topic_assignments_mtx_);
absl::Mutex topic_assignments_mtx_;
@@ -92,6 +92,8 @@ private:
void wrapAckRequest(const Message& message, AckMessageRequest& request);
void removeAssignmentsByTopic(const std::string& topic)
LOCKS_EXCLUDED(topic_assignments_mtx_, assignments_mtx_);
+
+ absl::optional<FilterExpression> getFilterExpression(const std::string
&topic) const;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file