BewareMyPower commented on code in PR #16969:
URL: https://github.com/apache/pulsar/pull/16969#discussion_r955070110
##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:
##########
@@ -52,6 +53,13 @@
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
+ auto partitionsUpdateInterval = static_cast<unsigned
int>(client_->conf().getPartitionsUpdateInterval());
+ if (partitionsUpdateInterval > 0) {
+ listenerExecutor_ = client_->getListenerExecutorProvider()->get();
Review Comment:
```suggestion
```
The `listenerExecutor_` has already been initialized in the member
initializer list. Maybe we should add `const` before `listenerExecutor_` to
avoid it being assigned a value after being initialized.
```c++
const ExecutorServicePtr listenerExecutor_;
```
##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h:
##########
@@ -134,11 +142,19 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void handleOneTopicUnsubscribedAsync(Result result,
std::shared_ptr<std::atomic<int>> consumerUnsubed,
int numberPartitions, TopicNamePtr
topicNamePtr,
std::string& topicPartitionName,
ResultCallback callback);
+ void runPartitionUpdateTask();
+ void topicPartitionUpdate();
+ void handleGetPartitions(const TopicNamePtr topicName, const Result result,
+ const LookupDataResultPtr& lookupDataResult, int
currentNumPartitions);
+ void subscribeSingleNewConsumer(const int numPartitions, TopicNamePtr
topicName, int partitionIndex,
Review Comment:
```suggestion
void subscribeSingleNewConsumer(int numPartitions, TopicNamePtr
topicName, int partitionIndex,
```
##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h:
##########
@@ -52,6 +52,13 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
MultiTopicsConsumerImpl(ClientImplPtr client, const
std::vector<std::string>& topics,
const std::string& subscriptionName, TopicNamePtr
topicName,
const ConsumerConfiguration& conf, const
LookupServicePtr lookupServicePtr_);
+ MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName,
const int numPartitions,
+ const std::string& subscriptionName, const
ConsumerConfiguration& conf,
+ const LookupServicePtr lookupServicePtr)
Review Comment:
```suggestion
MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName,
int numPartitions,
const std::string& subscriptionName, const
ConsumerConfiguration& conf,
LookupServicePtr lookupServicePtr)
```
##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h:
##########
@@ -122,9 +131,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void handleOneTopicSubscribed(Result result, Consumer consumer, const
std::string& topic,
std::shared_ptr<std::atomic<int>>
topicsNeedCreate);
- void subscribeTopicPartitions(const Result result, const
LookupDataResultPtr partitionMetadata,
- TopicNamePtr topicName, const std::string&
consumerName,
- ConsumerConfiguration conf,
+ void subscribeTopicPartitions(const int numPartitions, TopicNamePtr
topicName,
+ const std::string& consumerName,
ConsumerSubResultPromisePtr
topicSubResultPromise);
Review Comment:
```suggestion
void subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName,
const std::string& consumerName,
ConsumerSubResultPromisePtr
topicSubResultPromise);
```
##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h:
##########
@@ -134,11 +142,19 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void handleOneTopicUnsubscribedAsync(Result result,
std::shared_ptr<std::atomic<int>> consumerUnsubed,
int numberPartitions, TopicNamePtr
topicNamePtr,
std::string& topicPartitionName,
ResultCallback callback);
+ void runPartitionUpdateTask();
+ void topicPartitionUpdate();
+ void handleGetPartitions(const TopicNamePtr topicName, const Result result,
Review Comment:
```suggestion
void handleGetPartitions(TopicNamePtr topicName, Result result,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]