BewareMyPower commented on code in PR #17739:
URL: https://github.com/apache/pulsar/pull/17739#discussion_r976542832
##########
pulsar-client-cpp/tests/PulsarFriend.h:
##########
@@ -57,6 +57,19 @@ class PulsarFriend {
return
std::static_pointer_cast<ConsumerStatsImpl>(consumerImpl->consumerStatsBasePtr_);
}
+ // just param is MultiTopicConsumerImpl
Review Comment:
This comment is confused. I think you want to mean it should only be called
when the underlying consumer of `consumer` is `MultiTopicConsumerImpl`?
If yes, please remove the confused comment and use `dynamic_cast` and null
check instead of `static_cast` in the implementation. `dynamic_cast` can check
the RTTI info so that it will return null if the cast is invalid. e.g.
```c++
#include <iostream>
using namespace std;
struct Base {
virtual void f() {}
};
struct Derived : Base {
void f() override {}
};
void test(Base* base) {
cout << "dynamic_cast: " << dynamic_cast<Derived*>(base)
<< "\nstatic_cast: " << static_cast<Derived*>(base) << endl;
delete base;
}
int main(int argc, char* argv[]) {
test(new Base);
test(new Derived);
return 0;
}
```
You can see the difference from the output:
```
dynamic_cast: 0x0
static_cast: 0x600001eac020
dynamic_cast: 0x600001eac020
static_cast: 0x600001eac020
```
##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:
##########
@@ -572,6 +572,46 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const
MessageId& msgId, ResultCal
}
}
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList&
messageIdList, ResultCallback callback) {
+ if (state_ != Ready) {
+ callback(ResultAlreadyClosed);
+ return;
+ }
+
+ std::map<std::string, MessageIdList> topicToMessageId;
+ for (const MessageId& messageId : messageIdList) {
+ auto topicName = messageId.getTopicName();
+ if (topicToMessageId.count(topicName) == 0) {
+ topicToMessageId.emplace(topicName, std::vector<MessageId>());
+ }
+ topicToMessageId[topicName].emplace_back(messageId);
+ }
+
+ std::shared_ptr<std::atomic<int>> needCallBack =
+ std::make_shared<std::atomic<int>>(topicToMessageId.size());
+ auto cb = [callback, needCallBack](Result result) {
+ if (result != ResultOk) {
+ callback(result);
+ }
+ needCallBack->fetch_sub(1);
+ if (needCallBack->load() == 0) {
+ callback(result);
+ }
+ };
+ for (const auto& kv : topicToMessageId) {
+ auto optConsumer = consumers_.find(kv.first);
+ if (optConsumer.is_present()) {
+ for (const auto& msgId : kv.second) {
+ unAckedMessageTrackerPtr_->remove(msgId);
+ }
Review Comment:
The same as my comment before, calling `remove` in a loop is not thread safe
for `unAckedMessageTrackerPtr_`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]