BewareMyPower commented on code in PR #16969:
URL: https://github.com/apache/pulsar/pull/16969#discussion_r955070110


##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:
##########
@@ -52,6 +53,13 @@ 
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
     } else {
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
+    auto partitionsUpdateInterval = static_cast<unsigned 
int>(client_->conf().getPartitionsUpdateInterval());
+    if (partitionsUpdateInterval > 0) {
+        listenerExecutor_ = client_->getListenerExecutorProvider()->get();

Review Comment:
   ```suggestion
   ```
   
   The `listenerExecutor_` has already been initialized in the member 
initializer list. Maybe we should add `const` before `listenerExecutor_` to 
avoid it being assigned a value after being initialized.
   
   ```c++
   const ExecutorServicePtr listenerExecutor_;
   ```



##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h:
##########
@@ -134,11 +142,19 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     void handleOneTopicUnsubscribedAsync(Result result, 
std::shared_ptr<std::atomic<int>> consumerUnsubed,
                                          int numberPartitions, TopicNamePtr 
topicNamePtr,
                                          std::string& topicPartitionName, 
ResultCallback callback);
+    void runPartitionUpdateTask();
+    void topicPartitionUpdate();
+    void handleGetPartitions(const TopicNamePtr topicName, const Result result,
+                             const LookupDataResultPtr& lookupDataResult, int 
currentNumPartitions);
+    void subscribeSingleNewConsumer(const int numPartitions, TopicNamePtr 
topicName, int partitionIndex,

Review Comment:
   ```suggestion
       void subscribeSingleNewConsumer(int numPartitions, TopicNamePtr 
topicName, int partitionIndex,
   ```



##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h:
##########
@@ -52,6 +52,13 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     MultiTopicsConsumerImpl(ClientImplPtr client, const 
std::vector<std::string>& topics,
                             const std::string& subscriptionName, TopicNamePtr 
topicName,
                             const ConsumerConfiguration& conf, const 
LookupServicePtr lookupServicePtr_);
+    MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, 
const int numPartitions,
+                            const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
+                            const LookupServicePtr lookupServicePtr)

Review Comment:
   ```suggestion
       MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, 
int numPartitions,
                               const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
                               LookupServicePtr lookupServicePtr)
   ```



##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h:
##########
@@ -122,9 +131,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
 
     void handleOneTopicSubscribed(Result result, Consumer consumer, const 
std::string& topic,
                                   std::shared_ptr<std::atomic<int>> 
topicsNeedCreate);
-    void subscribeTopicPartitions(const Result result, const 
LookupDataResultPtr partitionMetadata,
-                                  TopicNamePtr topicName, const std::string& 
consumerName,
-                                  ConsumerConfiguration conf,
+    void subscribeTopicPartitions(const int numPartitions, TopicNamePtr 
topicName,
+                                  const std::string& consumerName,
                                   ConsumerSubResultPromisePtr 
topicSubResultPromise);

Review Comment:
   ```suggestion
       void subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName, 
const std::string& consumerName,
                                     ConsumerSubResultPromisePtr 
topicSubResultPromise);
   ```



##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h:
##########
@@ -134,11 +142,19 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     void handleOneTopicUnsubscribedAsync(Result result, 
std::shared_ptr<std::atomic<int>> consumerUnsubed,
                                          int numberPartitions, TopicNamePtr 
topicNamePtr,
                                          std::string& topicPartitionName, 
ResultCallback callback);
+    void runPartitionUpdateTask();
+    void topicPartitionUpdate();
+    void handleGetPartitions(const TopicNamePtr topicName, const Result result,

Review Comment:
   ```suggestion
       void handleGetPartitions(TopicNamePtr topicName, Result result,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to