This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new e7097ae5 [ISSUE #1156] [C++] SimpleConsumer support subscription 
expression (#1158)
e7097ae5 is described below

commit e7097ae507c4e20277d93b31a819e464f3ab57df
Author: lizhimins <[email protected]>
AuthorDate: Thu Dec 25 16:26:33 2025 +0800

    [ISSUE #1156] [C++] SimpleConsumer support subscription expression (#1158)
---
 cpp/examples/ExampleSimpleConsumer.cpp           |  3 +++
 cpp/source/rocketmq/SimpleConsumerImpl.cpp       | 31 ++++++++++++++++++++++--
 cpp/source/rocketmq/include/SimpleConsumerImpl.h |  4 ++-
 3 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/cpp/examples/ExampleSimpleConsumer.cpp 
b/cpp/examples/ExampleSimpleConsumer.cpp
index d89d0b13..4aacfd08 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -40,7 +40,9 @@ int main(int argc, char* argv[]) {
   logger.setLevel(Level::Info);
   logger.init();
 
+  // Subscribe with Tag or SQL92 to filter messages on the server side
   std::string tag = "*";
+  // auto filter_expression = new FilterExpression("a = 1", 
ExpressionType::SQL92);
 
   CredentialsProviderPtr credentials_provider;
   if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
@@ -57,6 +59,7 @@ int main(int argc, char* argv[]) {
                                                     .withSsl(FLAGS_tls)
                                                     .build())
                              .subscribe(FLAGS_topic, tag)
+                             // .subscribe(FLAGS_topic, *filter_expression)
                              .withAwaitDuration(std::chrono::seconds(10))
                              .build();
   std::size_t total = 0;
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp 
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index 25803429..5bb3eaf9 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -290,6 +290,17 @@ void SimpleConsumerImpl::refreshAssignments() {
   }
 }
 
+absl::optional<FilterExpression> SimpleConsumerImpl::getFilterExpression(const 
std::string &topic) const {
+  {
+    absl::MutexLock lk(&subscriptions_mtx_);
+    auto it = subscriptions_.find(topic);
+    if (it != subscriptions_.end()) {
+      return absl::make_optional(it->second);
+    }
+    return absl::nullopt;
+  }
+}
+
 void SimpleConsumerImpl::receive(std::size_t limit,
                                  std::chrono::milliseconds invisible_duration,
                                  ReceiveCallback callback) {
@@ -331,8 +342,24 @@ void SimpleConsumerImpl::receive(std::size_t limit,
   request.mutable_message_queue()->CopyFrom(assignment.message_queue());
   request.set_batch_size((int32_t) limit);
 
-  request.mutable_filter_expression()->set_type(rmq::FilterType::TAG);
-  request.mutable_filter_expression()->set_expression("*");
+  auto filter_expression = request.mutable_filter_expression();
+  auto&& optional = 
getFilterExpression(assignment.message_queue().topic().name());
+  if (optional.has_value()) {
+    auto expression = optional.value();
+    switch (expression.type_) {
+      case TAG:
+        filter_expression->set_type(rmq::FilterType::TAG);
+        filter_expression->set_expression(expression.content_);
+        break;
+      case SQL92:
+        filter_expression->set_type(rmq::FilterType::SQL);
+        filter_expression->set_expression(expression.content_);
+        break;
+    }
+  } else {
+    filter_expression->set_type(rmq::FilterType::TAG);
+    filter_expression->set_expression("*");
+  }
 
   auto invisible_duration_request =
       
google::protobuf::util::TimeUtil::MillisecondsToDuration(invisible_duration.count());
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h 
b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index 0bcb7fca..22850043 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -66,7 +66,7 @@ protected:
 
 private:
   absl::flat_hash_map<std::string, FilterExpression> subscriptions_ 
GUARDED_BY(subscriptions_mtx_);
-  absl::Mutex subscriptions_mtx_;
+  mutable absl::Mutex subscriptions_mtx_;
 
   absl::flat_hash_map<std::string, std::vector<rmq::Assignment>> 
topic_assignments_ GUARDED_BY(topic_assignments_mtx_);
   absl::Mutex topic_assignments_mtx_;
@@ -92,6 +92,8 @@ private:
   void wrapAckRequest(const Message& message, AckMessageRequest& request);
 
   void removeAssignmentsByTopic(const std::string& topic) 
LOCKS_EXCLUDED(topic_assignments_mtx_, assignments_mtx_);
+
+  absl::optional<FilterExpression> getFilterExpression(const std::string 
&topic) const;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file

Reply via email to