BewareMyPower commented on code in PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#discussion_r1010280775
##########
lib/MultiTopicsConsumerImpl.cc:
##########
@@ -647,6 +647,41 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const
MessageId& msgId, ResultCal
}
}
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList&
messageIdList, ResultCallback callback) {
+ if (state_ != Ready) {
+ callback(ResultAlreadyClosed);
+ return;
+ }
+
+ std::unordered_map<std::string, MessageIdList> topicToMessageId;
+ for (const MessageId& messageId : messageIdList) {
+ auto topicName = messageId.getTopicName();
+ topicToMessageId[topicName].emplace_back(messageId);
+ }
+
+ auto needCallBack =
std::make_shared<std::atomic<int>>(topicToMessageId.size());
+ Result res = ResultOk;
+ auto cb = [callback, needCallBack, &res](Result result) {
+ if (result != ResultOk) {
+ res = result;
+ }
+ needCallBack->fetch_sub(1);
+ if (needCallBack->load() == 0) {
+ callback(res);
+ }
Review Comment:
`res` should be defined inside the lambda, not outside. Because it's only
used in the lambda.
BTW, please keep it in mind that **never capture a local variable by
reference**.
1. The lifetime of the local variable will end after going out of the scope,
while the lambda could be invoked in another thread.
2. Since multiple threads can access the shared local variable, it must be
atomic.
That's also why `std::shared_ptr<std::atomic<T>>` is used in other places.
1. Capturing a `std::shared_ptr<U>` **by value** will increase the
reference count and extend the lifetime of the `U` instance until all lambdas
finished.
2. `U` is `std::atomic<T>`, which is thread safe to access among different
threads.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]