This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/re_dev by this push:
new f3a9097 fix: update topic subscribe info from cache first
f3a9097 is described below
commit f3a9097f612acf6db90c6f62e1a2fe87277e5e81
Author: James Yin <[email protected]>
AuthorDate: Mon Mar 29 13:49:17 2021 +0800
fix: update topic subscribe info from cache first
---
src/consumer/DefaultMQPushConsumerImpl.cpp | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp
b/src/consumer/DefaultMQPushConsumerImpl.cpp
index bf00cc2..77ba017 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -321,9 +321,16 @@ void
DefaultMQPushConsumerImpl::updateTopicSubscribeInfoWhenSubscriptionChanged(
auto& subTable = rebalance_impl_->getSubscriptionInner();
for (const auto& it : subTable) {
const auto& topic = it.first;
- bool ret = client_instance_->updateTopicRouteInfoFromNameServer(topic);
- if (!ret) {
- LOG_WARN_NEW("The topic:[{}] not exist", topic);
+ auto topic_route_data = client_instance_->getTopicRouteData(topic);
+ if (topic_route_data != nullptr) {
+ std::vector<MQMessageQueue> subscribeInfo =
+ MQClientInstance::topicRouteData2TopicSubscribeInfo(topic,
topic_route_data);
+ updateTopicSubscribeInfo(topic, subscribeInfo);
+ } else {
+ bool ret = client_instance_->updateTopicRouteInfoFromNameServer(topic);
+ if (!ret) {
+ LOG_WARN_NEW("The topic[{}] not exist, or its route data not changed",
topic);
+ }
}
}
}