BewareMyPower commented on code in PR #17739:
URL: https://github.com/apache/pulsar/pull/17739#discussion_r976542832


##########
pulsar-client-cpp/tests/PulsarFriend.h:
##########
@@ -57,6 +57,19 @@ class PulsarFriend {
         return 
std::static_pointer_cast<ConsumerStatsImpl>(consumerImpl->consumerStatsBasePtr_);
     }
 
+    // just param is MultiTopicConsumerImpl

Review Comment:
   This comment is confused. I think you want to mean it should only be called 
when the underlying consumer of `consumer` is `MultiTopicConsumerImpl`?
   
   If yes, please remove the confused comment and use `dynamic_cast` and null 
check instead of `static_cast` in the implementation. `dynamic_cast` can check 
the RTTI info so that it will return null if the cast is invalid. e.g.
   
   ```c++
   #include <iostream>
   using namespace std;
   
   struct Base {
       virtual void f() {}
   };
   
   struct Derived : Base {
       void f() override {}
   };
   
   void test(Base* base) {
       cout << "dynamic_cast: " << dynamic_cast<Derived*>(base)
            << "\nstatic_cast: " << static_cast<Derived*>(base) << endl;
       delete base;
   }
   
   int main(int argc, char* argv[]) {
       test(new Base);
       test(new Derived);
       return 0;
   }
   ```
   
   You can see the difference from the output:
   
   ```
   dynamic_cast: 0x0
   static_cast: 0x600001eac020
   dynamic_cast: 0x600001eac020
   static_cast: 0x600001eac020
   ```
   



##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:
##########
@@ -572,6 +572,46 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const 
MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& 
messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        if (topicToMessageId.count(topicName) == 0) {
+            topicToMessageId.emplace(topicName, std::vector<MessageId>());
+        }
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    std::shared_ptr<std::atomic<int>> needCallBack =
+        std::make_shared<std::atomic<int>>(topicToMessageId.size());
+    auto cb = [callback, needCallBack](Result result) {
+        if (result != ResultOk) {
+            callback(result);
+        }
+        needCallBack->fetch_sub(1);
+        if (needCallBack->load() == 0) {
+            callback(result);
+        }
+    };
+    for (const auto& kv : topicToMessageId) {
+        auto optConsumer = consumers_.find(kv.first);
+        if (optConsumer.is_present()) {
+            for (const auto& msgId : kv.second) {
+                unAckedMessageTrackerPtr_->remove(msgId);
+            }

Review Comment:
   The same as my comment before, calling `remove` in a loop is not thread safe 
for `unAckedMessageTrackerPtr_`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to