This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 2ec734b Fix invalid memory access on the first pending batch receive
callback (#441)
2ec734b is described below
commit 2ec734b3d6e3bf77bbfa892d808499a2bfa57c06
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Aug 21 20:44:46 2024 +0800
Fix invalid memory access on the first pending batch receive callback (#441)
---
lib/ConsumerImplBase.cc | 16 +++++-----------
lib/ConsumerImplBase.h | 8 ++++++++
2 files changed, 13 insertions(+), 11 deletions(-)
diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc
index 851d41e..098f2d5 100644
--- a/lib/ConsumerImplBase.cc
+++ b/lib/ConsumerImplBase.cc
@@ -76,10 +76,7 @@ void ConsumerImplBase::doBatchReceiveTimeTask() {
long diff =
batchReceivePolicy_.getTimeoutMs() -
(TimeUtils::currentTimeMillis() - batchReceive.createAt_);
if (diff <= 0) {
- Lock batchOptionLock(batchReceiveOptionMutex_);
-
notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
- batchOptionLock.unlock();
- batchPendingReceives_.pop();
+ notifyBatchPendingReceivedCallback(popBatchReceiveCallback());
} else {
hasPendingReceives = true;
timeToWaitMs = diff;
@@ -96,20 +93,17 @@ void ConsumerImplBase::doBatchReceiveTimeTask() {
void ConsumerImplBase::failPendingBatchReceiveCallback() {
Lock lock(batchPendingReceiveMutex_);
while (!batchPendingReceives_.empty()) {
- OpBatchReceive opBatchReceive = batchPendingReceives_.front();
- batchPendingReceives_.pop();
- listenerExecutor_->postWork(
- [opBatchReceive]() {
opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, {}); });
+ auto callback = popBatchReceiveCallback();
+ listenerExecutor_->postWork([callback]() {
callback(ResultAlreadyClosed, {}); });
}
}
void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
Lock lock(batchPendingReceiveMutex_);
if (!batchPendingReceives_.empty()) {
- OpBatchReceive& batchReceive = batchPendingReceives_.front();
- batchPendingReceives_.pop();
+ auto callback = popBatchReceiveCallback();
lock.unlock();
- notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+ notifyBatchPendingReceivedCallback(callback);
}
}
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 1b7e86e..79601e4 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -112,6 +112,14 @@ class ConsumerImplBase : public HandlerBase {
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
+ // Note: it should be protected by batchPendingReceiveMutex_ and called
when `batchPendingReceives_` is
+ // not empty
+ BatchReceiveCallback popBatchReceiveCallback() {
+ auto callback =
std::move(batchPendingReceives_.front().batchReceiveCallback_);
+ batchPendingReceives_.pop();
+ return callback;
+ }
+
friend class MultiTopicsConsumerImpl;
friend class PulsarFriend;
};