BewareMyPower commented on code in PR #16969:
URL: https://github.com/apache/pulsar/pull/16969#discussion_r950054182
##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:
##########
@@ -712,3 +746,79 @@ uint64_t
MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
});
return numberOfConnectedConsumer;
}
+void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
+ partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
+ partitionsUpdateTimer_->async_wait([this](const boost::system::error_code&
ec) {
+ // If two requests call runPartitionUpdateTask at the same time, the
timer will fail, and it
+ // cannot continue at this time, and the request needs to be ignored.
+ if (!ec) {
+ topicPartitionUpdate();
+ }
+ });
+}
+void MultiTopicsConsumerImpl::topicPartitionUpdate() {
+ using namespace std::placeholders;
+ for (const auto& item : topicsPartitions_) {
+ auto topicName = TopicName::get(item.first);
+ auto currentNumPartitions = item.second;
+ lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+ std::bind(&MultiTopicsConsumerImpl::handleGetPartitions,
shared_from_this(), topicName,
+ std::placeholders::_1, std::placeholders::_2,
currentNumPartitions));
+ }
+}
+void MultiTopicsConsumerImpl::handleGetPartitions(const TopicNamePtr
topicName, const Result result,
+ const LookupDataResultPtr&
lookupDataResult,
+ int currentNumPartitions) {
+ if (state_ != Ready) {
+ return;
+ }
+ if (!result) {
+ const auto newNumPartitions = static_cast<unsigned
int>(lookupDataResult->getPartitions());
+ if (newNumPartitions > currentNumPartitions) {
+ LOG_INFO("new partition count: " << newNumPartitions
+ << " current partition count: "
<< currentNumPartitions);
+ std::shared_ptr<std::atomic<int>> partitionsNeedCreate =
+ std::make_shared<std::atomic<int>>(newNumPartitions -
currentNumPartitions);
+ ConsumerSubResultPromisePtr topicPromise =
std::make_shared<Promise<Result, Consumer>>();
+ topicsPartitions_[topicName->toString()] = newNumPartitions;
Review Comment:
You must use a lock to protect `topicsPartitions_`. In previous code in
`PartitionedConsumerImpl`, the `consumers_` field (a vector of consumers) is
protected by `consumersMutex`.
```c++
unsigned int numPartitions_;
typedef std::vector<ConsumerImplPtr> ConsumerList;
ConsumerList consumers_;
// consumersMutex_ is used to share consumers_ and numPartitions_
mutable std::mutex consumersMutex_;
mutable std::mutex mutex_;
```
```c++
Lock consumersLock(consumersMutex_);
const auto currentNumPartitions = getNumPartitions(); // READ
numPartitions_
assert(currentNumPartitions == consumers_.size());
if (newNumPartitions > currentNumPartitions) {
LOG_INFO("new partition count: " << newNumPartitions);
numPartitions_ = newNumPartitions; // WRITE numPartitions_
const auto config = getSinglePartitionConsumerConfig();
for (unsigned int i = currentNumPartitions; i <
newNumPartitions; i++) {
auto consumer = newInternalConsumer(i, config);
consumer->start();
consumers_.push_back(consumer); // WRITE (modify) consumers_
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]