This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new c38f7d55 fix #639 fix semantics of topicOfInterest (#642)
c38f7d55 is described below
commit c38f7d55cbceebb122c47f25a6cad72af210066c
Author: Zhanhui Li <[email protected]>
AuthorDate: Fri Dec 1 15:48:59 2023 +0800
fix #639 fix semantics of topicOfInterest (#642)
Signed-off-by: Zhanhui Li <[email protected]>
---
cpp/.gitignore | 1 +
cpp/source/rocketmq/Producer.cpp | 2 +-
cpp/source/rocketmq/ProducerImpl.cpp | 18 ++++++++++++++++--
cpp/source/rocketmq/PushConsumerImpl.cpp | 2 +-
cpp/source/rocketmq/SimpleConsumerImpl.cpp | 2 +-
cpp/source/rocketmq/include/ClientImpl.h | 2 +-
cpp/source/rocketmq/include/ProducerImpl.h | 4 +++-
cpp/source/rocketmq/include/PushConsumerImpl.h | 2 +-
cpp/source/rocketmq/include/SimpleConsumerImpl.h | 2 +-
9 files changed, 26 insertions(+), 9 deletions(-)
diff --git a/cpp/.gitignore b/cpp/.gitignore
index 23e0e933..b7f10c0f 100644
--- a/cpp/.gitignore
+++ b/cpp/.gitignore
@@ -18,3 +18,4 @@ bazel-rocketmq-client-cpp
/bazel-*
/compile_commands.json
/.cache/
+.clangd
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 8620f681..78d812ed 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -88,7 +88,7 @@ ProducerBuilder&
ProducerBuilder::withConfiguration(Configuration configuration)
}
ProducerBuilder& ProducerBuilder::withTopics(const std::vector<std::string>&
topics) {
- impl_->topicsOfInterest(topics);
+ impl_->withTopics(topics);
return *this;
}
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp
b/cpp/source/rocketmq/ProducerImpl.cpp
index 32b2ecad..73130161 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -16,6 +16,7 @@
*/
#include "ProducerImpl.h"
+#include <algorithm>
#include <apache/rocketmq/v2/definition.pb.h>
#include <atomic>
@@ -575,9 +576,22 @@ void
ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message)
}
}
-void ProducerImpl::topicsOfInterest(std::vector<std::string> topics) {
+void ProducerImpl::topicsOfInterest(std::vector<std::string> &topics) {
absl::MutexLock lk(&topics_mtx_);
- topics_.swap(topics);
+ for (auto& topic : topics_) {
+ if (std::find(topics.begin(), topics.end(), topic) == topics.end()) {
+ topics.push_back(topic);
+ }
+ }
+}
+
+void ProducerImpl::withTopics(const std::vector<std::string> &topics) {
+ absl::MutexLock lk(&topics_mtx_);
+ for (auto &topic: topics) {
+ if (std::find(topics_.begin(), topics_.end(), topic) == topics_.end()) {
+ topics_.push_back(topic);
+ }
+ }
}
void ProducerImpl::buildClientSettings(rmq::Settings& settings) {
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp
b/cpp/source/rocketmq/PushConsumerImpl.cpp
index d73407b4..505854db 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -47,7 +47,7 @@ PushConsumerImpl::~PushConsumerImpl() {
shutdown();
}
-void PushConsumerImpl::topicsOfInterest(std::vector<std::string> topics) {
+void PushConsumerImpl::topicsOfInterest(std::vector<std::string> &topics) {
absl::MutexLock lk(&topic_filter_expression_table_mtx_);
for (const auto& entry : topic_filter_expression_table_) {
topics.push_back(entry.first);
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index 09acb7ab..7a1b3edf 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -67,7 +67,7 @@ void SimpleConsumerImpl::buildClientSettings(rmq::Settings&
settings) {
}
}
-void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string> topics) {
+void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string> &topics) {
absl::MutexLock lk(&subscriptions_mtx_);
for (const auto& entry : subscriptions_) {
if (std::find(topics.begin(), topics.end(), entry.first) == topics.end()) {
diff --git a/cpp/source/rocketmq/include/ClientImpl.h
b/cpp/source/rocketmq/include/ClientImpl.h
index 70dc5382..c266047a 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -167,7 +167,7 @@ protected:
absl::flat_hash_map<std::string, std::unique_ptr<Session>> session_map_
GUARDED_BY(session_map_mtx_);
absl::Mutex session_map_mtx_;
- virtual void topicsOfInterest(std::vector<std::string> topics) {
+ virtual void topicsOfInterest(std::vector<std::string> &topics) {
}
void updateRouteInfo() LOCKS_EXCLUDED(topic_route_table_mtx_);
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h
b/cpp/source/rocketmq/include/ProducerImpl.h
index ad9b24d5..d7260a93 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -107,7 +107,9 @@ public:
void buildClientSettings(rmq::Settings& settings) override;
- void topicsOfInterest(std::vector<std::string> topics) override
LOCKS_EXCLUDED(topics_mtx_);
+ void topicsOfInterest(std::vector<std::string> &topics) override
LOCKS_EXCLUDED(topics_mtx_);
+
+ void withTopics(const std::vector<std::string> &topics)
LOCKS_EXCLUDED(topics_mtx_);
const PublishStats& stats() const {
return stats_;
diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h
b/cpp/source/rocketmq/include/PushConsumerImpl.h
index d512f4c8..7a4ff1a3 100644
--- a/cpp/source/rocketmq/include/PushConsumerImpl.h
+++ b/cpp/source/rocketmq/include/PushConsumerImpl.h
@@ -52,7 +52,7 @@ public:
void prepareHeartbeatData(HeartbeatRequest& request) override;
- void topicsOfInterest(std::vector<std::string> topics) override
LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
+ void topicsOfInterest(std::vector<std::string> &topics) override
LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
void start() override;
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index a20cce56..45aa61b9 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -66,7 +66,7 @@ public:
}
protected:
- void topicsOfInterest(std::vector<std::string> topics) override;
+ void topicsOfInterest(std::vector<std::string> &topics) override;
private:
absl::flat_hash_map<std::string, FilterExpression> subscriptions_
GUARDED_BY(subscriptions_mtx_);