This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ec734b  Fix invalid memory access on the first pending batch receive 
callback (#441)
2ec734b is described below

commit 2ec734b3d6e3bf77bbfa892d808499a2bfa57c06
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Aug 21 20:44:46 2024 +0800

    Fix invalid memory access on the first pending batch receive callback (#441)
---
 lib/ConsumerImplBase.cc | 16 +++++-----------
 lib/ConsumerImplBase.h  |  8 ++++++++
 2 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc
index 851d41e..098f2d5 100644
--- a/lib/ConsumerImplBase.cc
+++ b/lib/ConsumerImplBase.cc
@@ -76,10 +76,7 @@ void ConsumerImplBase::doBatchReceiveTimeTask() {
         long diff =
             batchReceivePolicy_.getTimeoutMs() - 
(TimeUtils::currentTimeMillis() - batchReceive.createAt_);
         if (diff <= 0) {
-            Lock batchOptionLock(batchReceiveOptionMutex_);
-            
notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
-            batchOptionLock.unlock();
-            batchPendingReceives_.pop();
+            notifyBatchPendingReceivedCallback(popBatchReceiveCallback());
         } else {
             hasPendingReceives = true;
             timeToWaitMs = diff;
@@ -96,20 +93,17 @@ void ConsumerImplBase::doBatchReceiveTimeTask() {
 void ConsumerImplBase::failPendingBatchReceiveCallback() {
     Lock lock(batchPendingReceiveMutex_);
     while (!batchPendingReceives_.empty()) {
-        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
-        batchPendingReceives_.pop();
-        listenerExecutor_->postWork(
-            [opBatchReceive]() { 
opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, {}); });
+        auto callback = popBatchReceiveCallback();
+        listenerExecutor_->postWork([callback]() { 
callback(ResultAlreadyClosed, {}); });
     }
 }
 
 void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
     Lock lock(batchPendingReceiveMutex_);
     if (!batchPendingReceives_.empty()) {
-        OpBatchReceive& batchReceive = batchPendingReceives_.front();
-        batchPendingReceives_.pop();
+        auto callback = popBatchReceiveCallback();
         lock.unlock();
-        notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+        notifyBatchPendingReceivedCallback(callback);
     }
 }
 
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 1b7e86e..79601e4 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -112,6 +112,14 @@ class ConsumerImplBase : public HandlerBase {
 
     virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
 
+    // Note: it should be protected by batchPendingReceiveMutex_ and called 
when `batchPendingReceives_` is
+    // not empty
+    BatchReceiveCallback popBatchReceiveCallback() {
+        auto callback = 
std::move(batchPendingReceives_.front().batchReceiveCallback_);
+        batchPendingReceives_.pop();
+        return callback;
+    }
+
     friend class MultiTopicsConsumerImpl;
     friend class PulsarFriend;
 };

Reply via email to