This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 648b48b  Fix a null ACK grouping tracker can be accessed after 
consumer is closed (#517)
648b48b is described below

commit 648b48be58f65a15f45d952986f3d9d5936f75a8
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Oct 27 15:20:28 2025 +0800

    Fix a null ACK grouping tracker can be accessed after consumer is closed 
(#517)
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/516
---
 lib/AckGroupingTracker.cc         | 141 --------------------------
 lib/AckGroupingTracker.h          |  40 +++-----
 lib/AckGroupingTrackerDisabled.cc |  29 ++++--
 lib/AckGroupingTrackerEnabled.cc  |  70 +++++++++----
 lib/AckGroupingTrackerEnabled.h   |  26 ++---
 lib/ConsumerImpl.cc               | 202 +++++++++++++++++++++++++-------------
 lib/ConsumerImpl.h                |  22 ++++-
 tests/AcknowledgeTest.cc          |  28 ++++++
 tests/BasicEndToEndTest.cc        |  45 +++------
 tests/ConsumerTest.cc             |  25 +++--
 10 files changed, 311 insertions(+), 317 deletions(-)

diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc
deleted file mode 100644
index 8a9ea0d..0000000
--- a/lib/AckGroupingTracker.cc
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "AckGroupingTracker.h"
-
-#include <atomic>
-#include <limits>
-#include <set>
-
-#include "BitSet.h"
-#include "ChunkMessageIdImpl.h"
-#include "ClientConnection.h"
-#include "Commands.h"
-#include "LogUtils.h"
-#include "MessageIdImpl.h"
-
-namespace pulsar {
-
-DECLARE_LOG_OBJECT();
-
-void AckGroupingTracker::doImmediateAck(const MessageId& msgId, const 
ResultCallback& callback,
-                                        CommandAck_AckType ackType) const {
-    const auto cnx = connectionSupplier_();
-    if (!cnx) {
-        LOG_DEBUG("Connection is not ready, ACK failed for " << msgId);
-        if (callback) {
-            callback(ResultAlreadyClosed);
-        }
-        return;
-    }
-    if (ackType == CommandAck_AckType_Individual) {
-        // If it's individual ack, we need to acknowledge all message IDs in a 
chunked message Id
-        // If it's cumulative ack, we only need to ack the last message ID of 
a chunked message.
-        // ChunkedMessageId return last chunk message ID by default, so we 
don't need to handle it.
-        if (auto chunkMessageId =
-                
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId)))
 {
-            auto msgIdList = chunkMessageId->getChunkedMessageIds();
-            doImmediateAck(std::set<MessageId>(msgIdList.begin(), 
msgIdList.end()), callback);
-            return;
-        }
-    }
-    const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
-    if (waitResponse_) {
-        const auto requestId = requestIdSupplier_();
-        cnx->sendRequestWithId(
-               Commands::newAck(consumerId_, msgId.ledgerId(), 
msgId.entryId(), ackSet, ackType, requestId),
-               requestId)
-            .addListener([callback](Result result, const ResponseData&) {
-                if (callback) {
-                    callback(result);
-                }
-            });
-    } else {
-        cnx->sendCommand(Commands::newAck(consumerId_, msgId.ledgerId(), 
msgId.entryId(), ackSet, ackType));
-        if (callback) {
-            callback(ResultOk);
-        }
-    }
-}
-
-static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& 
msgIds) {
-    bool first = true;
-    for (auto&& msgId : msgIds) {
-        if (first) {
-            first = false;
-        } else {
-            os << ", ";
-        }
-        os << "[" << msgId << "]";
-    }
-    return os;
-}
-
-void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds,
-                                        const ResultCallback& callback) const {
-    const auto cnx = connectionSupplier_();
-    if (!cnx) {
-        LOG_DEBUG("Connection is not ready, ACK failed for " << msgIds);
-        if (callback) {
-            callback(ResultAlreadyClosed);
-        }
-        return;
-    }
-
-    std::set<MessageId> ackMsgIds;
-
-    for (const auto& msgId : msgIds) {
-        if (auto chunkMessageId =
-                
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId)))
 {
-            auto msgIdList = chunkMessageId->getChunkedMessageIds();
-            ackMsgIds.insert(msgIdList.begin(), msgIdList.end());
-        } else {
-            ackMsgIds.insert(msgId);
-        }
-    }
-
-    if 
(Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion()))
 {
-        if (waitResponse_) {
-            const auto requestId = requestIdSupplier_();
-            cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, 
ackMsgIds, requestId), requestId)
-                .addListener([callback](Result result, const ResponseData&) {
-                    if (callback) {
-                        callback(result);
-                    }
-                });
-        } else {
-            cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, 
ackMsgIds));
-            if (callback) {
-                callback(ResultOk);
-            }
-        }
-    } else {
-        auto count = std::make_shared<std::atomic<size_t>>(ackMsgIds.size());
-        auto wrappedCallback = [callback, count](Result result) {
-            if (--*count == 0 && callback) {
-                callback(result);
-            }
-        };
-        for (auto&& msgId : ackMsgIds) {
-            doImmediateAck(msgId, wrappedCallback, 
CommandAck_AckType_Individual);
-        }
-    }
-}
-
-}  // namespace pulsar
diff --git a/lib/AckGroupingTracker.h b/lib/AckGroupingTracker.h
index f62492f..d00c3a2 100644
--- a/lib/AckGroupingTracker.h
+++ b/lib/AckGroupingTracker.h
@@ -22,11 +22,7 @@
 #include <pulsar/MessageId.h>
 #include <pulsar/Result.h>
 
-#include <cstdint>
 #include <functional>
-#include <set>
-
-#include "ProtoApiEnums.h"
 
 namespace pulsar {
 
@@ -34,6 +30,9 @@ class ClientConnection;
 using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
 using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
 using ResultCallback = std::function<void(Result)>;
+class ConsumerImpl;
+using ConsumerImplPtr = std::shared_ptr<ConsumerImpl>;
+using ConsumerImplWeakPtr = std::weak_ptr<ConsumerImpl>;
 
 /**
  * @class AckGroupingTracker
@@ -42,19 +41,12 @@ using ResultCallback = std::function<void(Result)>;
  */
 class AckGroupingTracker : public 
std::enable_shared_from_this<AckGroupingTracker> {
    public:
-    AckGroupingTracker(std::function<ClientConnectionPtr()> connectionSupplier,
-                       std::function<uint64_t()> requestIdSupplier, uint64_t 
consumerId, bool waitResponse)
-        : connectionSupplier_(std::move(connectionSupplier)),
-          requestIdSupplier_(std::move(requestIdSupplier)),
-          consumerId_(consumerId),
-          waitResponse_(waitResponse) {}
-
     virtual ~AckGroupingTracker() = default;
 
     /**
      * Start tracking the ACK requests.
      */
-    virtual void start() {}
+    virtual void start(const ConsumerImplPtr& consumer) { consumer_ = 
consumer; }
 
     /**
      * Since ACK requests are grouped and delayed, we need to do some 
best-effort duplicate check to
@@ -72,7 +64,9 @@ class AckGroupingTracker : public 
std::enable_shared_from_this<AckGroupingTracke
      * @param[in] callback the callback that is triggered when the message is 
acknowledged
      */
     virtual void addAcknowledge(const MessageId& msgId, const ResultCallback& 
callback) {
-        callback(ResultOk);
+        if (callback) {
+            callback(ResultOk);
+        }
     }
 
     /**
@@ -81,7 +75,9 @@ class AckGroupingTracker : public 
std::enable_shared_from_this<AckGroupingTracke
      * @param[in] callback the callback that is triggered when the messages 
are acknowledged
      */
     virtual void addAcknowledgeList(const MessageIdList& msgIds, const 
ResultCallback& callback) {
-        callback(ResultOk);
+        if (callback) {
+            callback(ResultOk);
+        }
     }
 
     /**
@@ -90,7 +86,9 @@ class AckGroupingTracker : public 
std::enable_shared_from_this<AckGroupingTracke
      * @param[in] callback the callback that is triggered when the message is 
acknowledged
      */
     virtual void addAcknowledgeCumulative(const MessageId& msgId, const 
ResultCallback& callback) {
-        callback(ResultOk);
+        if (callback) {
+            callback(ResultOk);
+        }
     }
 
     /**
@@ -99,18 +97,10 @@ class AckGroupingTracker : public 
std::enable_shared_from_this<AckGroupingTracke
      */
     virtual void flushAndClean() {}
 
-   protected:
-    void doImmediateAck(const MessageId& msgId, const ResultCallback& callback,
-                        CommandAck_AckType ackType) const;
-    void doImmediateAck(const std::set<MessageId>& msgIds, const 
ResultCallback& callback) const;
-
-   private:
-    const std::function<ClientConnectionPtr()> connectionSupplier_;
-    const std::function<uint64_t()> requestIdSupplier_;
-    const uint64_t consumerId_;
+    virtual void close() {}
 
    protected:
-    const bool waitResponse_;
+    ConsumerImplWeakPtr consumer_;
 
 };  // class AckGroupingTracker
 
diff --git a/lib/AckGroupingTrackerDisabled.cc 
b/lib/AckGroupingTrackerDisabled.cc
index d20de44..92cdfba 100644
--- a/lib/AckGroupingTrackerDisabled.cc
+++ b/lib/AckGroupingTrackerDisabled.cc
@@ -19,26 +19,41 @@
 
 #include "AckGroupingTrackerDisabled.h"
 
-#include "ProtoApiEnums.h"
+#include "ConsumerImpl.h"
 
 namespace pulsar {
 
 void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId, const 
ResultCallback& callback) {
-    doImmediateAck(msgId, callback, CommandAck_AckType_Individual);
+    auto consumer = consumer_.lock();
+    if (consumer && !consumer->isClosingOrClosed()) {
+        consumer->doImmediateAck(msgId, callback, 
CommandAck_AckType_Individual);
+    } else if (callback) {
+        callback(ResultAlreadyClosed);
+    }
 }
 
 void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& 
msgIds,
                                                     const ResultCallback& 
callback) {
-    std::set<MessageId> msgIdSet;
-    for (auto&& msgId : msgIds) {
-        msgIdSet.emplace(msgId);
+    auto consumer = consumer_.lock();
+    if (consumer && !consumer->isClosingOrClosed()) {
+        std::set<MessageId> uniqueMsgIds(msgIds.begin(), msgIds.end());
+        for (auto&& msgId : msgIds) {
+            uniqueMsgIds.insert(msgId);
+        }
+        consumer->doImmediateAck(uniqueMsgIds, callback);
+    } else if (callback) {
+        callback(ResultAlreadyClosed);
     }
-    doImmediateAck(msgIdSet, callback);
 }
 
 void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& 
msgId,
                                                           const 
ResultCallback& callback) {
-    doImmediateAck(msgId, callback, CommandAck_AckType_Cumulative);
+    auto consumer = consumer_.lock();
+    if (consumer && !consumer->isClosingOrClosed()) {
+        consumer->doImmediateAck(msgId, callback, 
CommandAck_AckType_Cumulative);
+    } else if (callback) {
+        callback(ResultAlreadyClosed);
+    }
 }
 
 }  // namespace pulsar
diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc
index d88426e..3a2a35d 100644
--- a/lib/AckGroupingTrackerEnabled.cc
+++ b/lib/AckGroupingTrackerEnabled.cc
@@ -23,11 +23,8 @@
 #include <memory>
 #include <mutex>
 
-#include "ClientConnection.h"
-#include "ClientImpl.h"
-#include "Commands.h"
+#include "ConsumerImpl.h"
 #include "ExecutorService.h"
-#include "HandlerBase.h"
 #include "MessageIdUtil.h"
 
 namespace pulsar {
@@ -45,7 +42,10 @@ static int compare(const MessageId& lhs, const MessageId& 
rhs) {
     }
 }
 
-void AckGroupingTrackerEnabled::start() { this->scheduleTimer(); }
+void AckGroupingTrackerEnabled::start(const ConsumerImplPtr& consumer) {
+    AckGroupingTracker::start(consumer);
+    this->scheduleTimer();
+}
 
 bool AckGroupingTrackerEnabled::isDuplicate(const MessageId& msgId) {
     {
@@ -62,6 +62,13 @@ bool AckGroupingTrackerEnabled::isDuplicate(const MessageId& 
msgId) {
 }
 
 void AckGroupingTrackerEnabled::addAcknowledge(const MessageId& msgId, const 
ResultCallback& callback) {
+    auto consumer = consumer_.lock();
+    if (!consumer || consumer->isClosingOrClosed()) {
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
+        return;
+    }
     std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
     this->pendingIndividualAcks_.insert(msgId);
     if (waitResponse_) {
@@ -70,12 +77,19 @@ void AckGroupingTrackerEnabled::addAcknowledge(const 
MessageId& msgId, const Res
         callback(ResultOk);
     }
     if (this->ackGroupingMaxSize_ > 0 && this->pendingIndividualAcks_.size() 
>= this->ackGroupingMaxSize_) {
-        this->flush();
+        this->flush(consumer);
     }
 }
 
 void AckGroupingTrackerEnabled::addAcknowledgeList(const MessageIdList& msgIds,
                                                    const ResultCallback& 
callback) {
+    auto consumer = consumer_.lock();
+    if (!consumer || consumer->isClosingOrClosed()) {
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
+        return;
+    }
     std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
     for (const auto& msgId : msgIds) {
         this->pendingIndividualAcks_.emplace(msgId);
@@ -86,12 +100,19 @@ void AckGroupingTrackerEnabled::addAcknowledgeList(const 
MessageIdList& msgIds,
         callback(ResultOk);
     }
     if (this->ackGroupingMaxSize_ > 0 && this->pendingIndividualAcks_.size() 
>= this->ackGroupingMaxSize_) {
-        this->flush();
+        this->flush(consumer);
     }
 }
 
 void AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId& 
msgId,
                                                          const ResultCallback& 
callback) {
+    auto consumer = consumer_.lock();
+    if (!consumer || consumer->isClosingOrClosed()) {
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
+        return;
+    }
     std::unique_lock<std::mutex> lock(this->mutexCumulativeAckMsgId_);
     bool completeCallback = true;
     if (compare(msgId, this->nextCumulativeAckMsgId_) > 0) {
@@ -115,23 +136,28 @@ void 
AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId& msgId,
         callback(ResultOk);
     }
 }
-
 AckGroupingTrackerEnabled::~AckGroupingTrackerEnabled() {
-    isClosed_ = true;
-    this->flush();
     std::lock_guard<std::mutex> lock(this->mutexTimer_);
     if (this->timer_) {
         cancelTimer(*this->timer_);
     }
 }
 
-void AckGroupingTrackerEnabled::flush() {
+void AckGroupingTrackerEnabled::close() {
+    flushAndClean();
+    std::lock_guard<std::mutex> lock(this->mutexTimer_);
+    if (this->timer_) {
+        cancelTimer(*this->timer_);
+    }
+}
+
+void AckGroupingTrackerEnabled::flush(const ConsumerImplPtr& consumer) {
     // Send ACK for cumulative ACK requests.
     {
         std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
         if (this->requireCumulativeAck_) {
-            this->doImmediateAck(this->nextCumulativeAckMsgId_, 
this->latestCumulativeCallback_,
-                                 CommandAck_AckType_Cumulative);
+            consumer->doImmediateAck(this->nextCumulativeAckMsgId_, 
this->latestCumulativeCallback_,
+                                     CommandAck_AckType_Cumulative);
             this->latestCumulativeCallback_ = nullptr;
             this->requireCumulativeAck_ = false;
         }
@@ -147,13 +173,17 @@ void AckGroupingTrackerEnabled::flush() {
                 callback(result);
             }
         };
-        this->doImmediateAck(this->pendingIndividualAcks_, callback);
+        consumer->doImmediateAck(this->pendingIndividualAcks_, callback);
         this->pendingIndividualAcks_.clear();
     }
 }
 
 void AckGroupingTrackerEnabled::flushAndClean() {
-    this->flush();
+    auto consumer = consumer_.lock();
+    if (!consumer) {
+        return;
+    }
+    this->flush(consumer);
     {
         std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
         this->nextCumulativeAckMsgId_ = MessageId::earliest();
@@ -165,10 +195,6 @@ void AckGroupingTrackerEnabled::flushAndClean() {
 }
 
 void AckGroupingTrackerEnabled::scheduleTimer() {
-    if (isClosed_) {
-        return;
-    }
-
     std::lock_guard<std::mutex> lock(this->mutexTimer_);
     this->timer_ = this->executor_->createDeadlineTimer();
     this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, 
this->ackGroupingTimeMs_)));
@@ -176,7 +202,11 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
     this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
         auto self = weakSelf.lock();
         if (self && !ec) {
-            this->flush();
+            auto consumer = consumer_.lock();
+            if (!consumer || consumer->isClosingOrClosed()) {
+                return;
+            }
+            this->flush(consumer);
             this->scheduleTimer();
         }
     });
diff --git a/lib/AckGroupingTrackerEnabled.h b/lib/AckGroupingTrackerEnabled.h
index 5eb04b9..eb2b449 100644
--- a/lib/AckGroupingTrackerEnabled.h
+++ b/lib/AckGroupingTrackerEnabled.h
@@ -21,8 +21,6 @@
 
 #include <pulsar/MessageId.h>
 
-#include <atomic>
-#include <cstdint>
 #include <mutex>
 #include <set>
 
@@ -35,9 +33,6 @@ class ClientImpl;
 using ClientImplPtr = std::shared_ptr<ClientImpl>;
 class ExecutorService;
 using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
-class HandlerBase;
-using HandlerBasePtr = std::shared_ptr<HandlerBase>;
-using HandlerBaseWeakPtr = std::weak_ptr<HandlerBase>;
 
 /**
  * @class AckGroupingTrackerEnabled
@@ -45,34 +40,31 @@ using HandlerBaseWeakPtr = std::weak_ptr<HandlerBase>;
  */
 class AckGroupingTrackerEnabled : public AckGroupingTracker {
    public:
-    AckGroupingTrackerEnabled(const std::function<ClientConnectionPtr()>& 
connectionSupplier,
-                              const std::function<uint64_t()>& 
requestIdSupplier, uint64_t consumerId,
-                              bool waitResponse, long ackGroupingTimeMs, long 
ackGroupingMaxSize,
+    AckGroupingTrackerEnabled(long ackGroupingTimeMs, long ackGroupingMaxSize, 
bool waitResponse,
                               const ExecutorServicePtr& executor)
-        : AckGroupingTracker(connectionSupplier, requestIdSupplier, 
consumerId, waitResponse),
-          ackGroupingTimeMs_(ackGroupingTimeMs),
+        : ackGroupingTimeMs_(ackGroupingTimeMs),
           ackGroupingMaxSize_(ackGroupingMaxSize),
+          waitResponse_(waitResponse),
           executor_(executor) {
         pendingIndividualCallbacks_.reserve(ackGroupingMaxSize);
     }
 
     ~AckGroupingTrackerEnabled();
 
-    void start() override;
+    void start(const ConsumerImplPtr& consumer) override;
     bool isDuplicate(const MessageId& msgId) override;
     void addAcknowledge(const MessageId& msgId, const ResultCallback& 
callback) override;
     void addAcknowledgeList(const MessageIdList& msgIds, const ResultCallback& 
callback) override;
     void addAcknowledgeCumulative(const MessageId& msgId, const 
ResultCallback& callback) override;
-    void flush();
     void flushAndClean() override;
+    void close() override;
+
+   private:
+    void flush(const ConsumerImplPtr& consumer);
 
    protected:
-    //! Method for scheduling grouping timer.
     void scheduleTimer();
 
-    //! State
-    std::atomic_bool isClosed_{false};
-
     //! Next message ID to be cumulatively cumulatively.
     MessageId nextCumulativeAckMsgId_{MessageId::earliest()};
     bool requireCumulativeAck_{false};
@@ -90,6 +82,8 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
     //! Max number of ACK requests can be grouped.
     const long ackGroupingMaxSize_;
 
+    const bool waitResponse_;
+
     //! ACK request sender's scheduled executor.
     const ExecutorServicePtr executor_;
 
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 325adda..92d25cb 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -74,6 +74,22 @@ static boost::optional<MessageId> getStartMessageId(const 
boost::optional<Messag
     return startMessageId;
 }
 
+static std::shared_ptr<AckGroupingTracker> newAckGroupingTracker(const 
std::string& topic,
+                                                                 const 
ConsumerConfiguration& config,
+                                                                 const 
ClientImplPtr& client) {
+    if (TopicName::get(topic)->isPersistent()) {
+        if (config.getAckGroupingTimeMs() > 0) {
+            return std::make_shared<AckGroupingTrackerEnabled>(
+                config.getAckGroupingTimeMs(), config.getAckGroupingMaxSize(), 
config.isAckReceiptEnabled(),
+                client->getIOExecutorProvider()->get());
+        } else {
+            return std::make_shared<AckGroupingTrackerDisabled>();
+        }
+    } else {
+        return std::make_shared<AckGroupingTracker>();
+    }
+}
+
 ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& 
topic,
                            const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
                            bool isPersistent, const ConsumerInterceptorsPtr& 
interceptors,
@@ -105,12 +121,14 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, 
const std::string& topic
       consumerStr_("[" + topic + ", " + subscriptionName + ", " + 
std::to_string(consumerId_) + "] "),
       messageListenerRunning_(!conf.isStartPaused()),
       negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client, 
*this, conf)),
+      ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)),
       readCompacted_(conf.isReadCompacted()),
       startMessageId_(getStartMessageId(startMessageId, 
conf.isStartMessageIdInclusive())),
       maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
       
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
       
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
-      interceptors_(interceptors) {
+      interceptors_(interceptors),
+      requestIdGenerator_(client->getRequestIdGenerator()) {
     // Initialize un-ACKed messages OT tracker.
     if (conf.getUnAckedMessagesTimeoutMs() != 0) {
         if (conf.getTickDurationInMs() > 0) {
@@ -169,9 +187,8 @@ ConsumerImpl::~ConsumerImpl() {
         LOG_WARN(consumerStr_ << "Destroyed consumer which was not properly 
closed");
 
         ClientConnectionPtr cnx = getCnx().lock();
-        ClientImplPtr client = client_.lock();
-        if (client && cnx) {
-            int requestId = client->newRequestId();
+        if (cnx) {
+            auto requestId = newRequestId();
             cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, 
requestId), requestId);
             cnx->removeConsumer(consumerId_);
             LOG_INFO(consumerStr_ << "Closed consumer for race condition: " << 
consumerId_);
@@ -186,8 +203,6 @@ void ConsumerImpl::setPartitionIndex(int partitionIndex) { 
partitionIndex_ = par
 
 int ConsumerImpl::getPartitionIndex() { return partitionIndex_; }
 
-uint64_t ConsumerImpl::getConsumerId() { return consumerId_; }
-
 Future<Result, ConsumerImplBaseWeakPtr> 
ConsumerImpl::getConsumerCreatedFuture() {
     return consumerCreatedPromise_.getFuture();
 }
@@ -198,38 +213,7 @@ const std::string& ConsumerImpl::getTopic() const { return 
topic(); }
 
 void ConsumerImpl::start() {
     HandlerBase::start();
-
-    std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
-    auto connectionSupplier = [weakSelf]() -> ClientConnectionPtr {
-        auto self = weakSelf.lock();
-        if (!self) {
-            return nullptr;
-        }
-        return self->getCnx().lock();
-    };
-
-    // NOTE: start() is always called in `ClientImpl`'s method, so lock() 
returns not null
-    const auto requestIdGenerator = client_.lock()->getRequestIdGenerator();
-    const auto requestIdSupplier = [requestIdGenerator] { return 
(*requestIdGenerator)++; };
-
-    // Initialize ackGroupingTrackerPtr_ here because the 
get_shared_this_ptr() was not initialized until the
-    // constructor completed.
-    if (TopicName::get(topic())->isPersistent()) {
-        if (config_.getAckGroupingTimeMs() > 0) {
-            ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
-                connectionSupplier, requestIdSupplier, consumerId_, 
config_.isAckReceiptEnabled(),
-                config_.getAckGroupingTimeMs(), 
config_.getAckGroupingMaxSize(),
-                client_.lock()->getIOExecutorProvider()->get()));
-        } else {
-            ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(
-                connectionSupplier, requestIdSupplier, consumerId_, 
config_.isAckReceiptEnabled()));
-        }
-    } else {
-        LOG_INFO(getName() << "ACK will NOT be sent to broker for this 
non-persistent topic.");
-        ackGroupingTrackerPtr_.reset(new 
AckGroupingTracker(connectionSupplier, requestIdSupplier,
-                                                            consumerId_, 
config_.isAckReceiptEnabled()));
-    }
-    ackGroupingTrackerPtr_->start();
+    ackGroupingTrackerPtr_->start(get_shared_this_ptr());
 }
 
 void ConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { 
cnx.removeConsumer(consumerId_); }
@@ -265,7 +249,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& c
     unAckedMessageTrackerPtr_->clear();
 
     ClientImplPtr client = client_.lock();
-    long requestId = client->newRequestId();
+    auto requestId = newRequestId();
     SharedBuffer cmd = Commands::newSubscribe(
         topic(), subscription_, consumerId_, requestId, getSubType(), 
getConsumerName(), subscriptionMode_,
         subscribeMessageId, readCompacted_, config_.getProperties(), 
config_.getSubscriptionProperties(),
@@ -344,7 +328,7 @@ Result ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result
             // Creating the consumer has timed out. We need to ensure the 
broker closes the consumer
             // in case it was indeed created, otherwise it might prevent new 
subscribe operation,
             // since we are not closing the connection
-            int requestId = client_.lock()->newRequestId();
+            auto requestId = newRequestId();
             cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, 
requestId), requestId);
         }
 
@@ -396,7 +380,7 @@ void ConsumerImpl::unsubscribeAsync(const ResultCallback& 
originalCallback) {
         LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << 
consumerId_);
         ClientImplPtr client = client_.lock();
         lock.unlock();
-        int requestId = client->newRequestId();
+        auto requestId = newRequestId();
         SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId);
         auto self = get_shared_this_ptr();
         cnx->sendRequestWithId(cmd, requestId)
@@ -591,17 +575,16 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
     LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
                         << metadata.has_num_messages_in_batch());
 
-    uint32_t numOfMessageReceived = m.impl_->metadata.num_messages_in_batch();
-    auto ackGroupingTrackerPtr = ackGroupingTrackerPtr_;
-    if (ackGroupingTrackerPtr == nullptr) {  // The consumer is closing
+    const auto state = state_.load(std::memory_order_relaxed);
+    if (state == Closing || state == Closed) {
         return;
     }
-    if (ackGroupingTrackerPtr->isDuplicate(m.getMessageId())) {
+    uint32_t numOfMessageReceived = m.impl_->metadata.num_messages_in_batch();
+    if (ackGroupingTrackerPtr_->isDuplicate(m.getMessageId())) {
         LOG_DEBUG(getName() << " Ignoring message as it was ACKed earlier by 
same consumer.");
         increaseAvailablePermits(cnx, numOfMessageReceived);
         return;
     }
-    ackGroupingTrackerPtr.reset();
 
     if (metadata.has_num_messages_in_batch()) {
         BitSet::Data words(msg.ack_set_size());
@@ -1340,12 +1323,7 @@ void ConsumerImpl::closeAsync(const ResultCallback& 
originalCallback) {
     incomingMessages_.close();
 
     // Flush pending grouped ACK requests.
-    if (ackGroupingTrackerPtr_.use_count() != 1) {
-        LOG_ERROR("AckGroupingTracker is shared by other "
-                  << (ackGroupingTrackerPtr_.use_count() - 1)
-                  << " threads, which will prevent flushing the ACKs");
-    }
-    ackGroupingTrackerPtr_.reset();
+    ackGroupingTrackerPtr_->close();
     negativeAcksTracker_->close();
 
     ClientConnectionPtr cnx = getCnx().lock();
@@ -1364,7 +1342,7 @@ void ConsumerImpl::closeAsync(const ResultCallback& 
originalCallback) {
 
     cancelTimers();
 
-    int requestId = client->newRequestId();
+    auto requestId = newRequestId();
     auto self = get_shared_this_ptr();
     cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), 
requestId)
         .addListener([self, callback](Result result, const ResponseData&) { 
callback(result); });
@@ -1375,7 +1353,7 @@ const std::string& ConsumerImpl::getName() const { return 
consumerStr_; }
 void ConsumerImpl::shutdown() { internalShutdown(); }
 
 void ConsumerImpl::internalShutdown() {
-    ackGroupingTrackerPtr_.reset();
+    ackGroupingTrackerPtr_->close();
     incomingMessages_.clear();
     possibleSendToDeadLetterTopicMessages_.clear();
     resetCnx();
@@ -1499,8 +1477,7 @@ void ConsumerImpl::getBrokerConsumerStatsAsync(const 
BrokerConsumerStatsCallback
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
         if (cnx->getServerProtocolVersion() >= proto::v8) {
-            ClientImplPtr client = client_.lock();
-            uint64_t requestId = client->newRequestId();
+            auto requestId = newRequestId();
             LOG_DEBUG(getName() << " Sending ConsumerStats Command for 
Consumer - " << getConsumerId()
                                 << ", requestId - " << requestId);
 
@@ -1542,12 +1519,7 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, 
const ResultCallback& callb
         return;
     }
 
-    ClientImplPtr client = client_.lock();
-    if (!client) {
-        LOG_ERROR(getName() << "Client is expired when seekAsync " << msgId);
-        return;
-    }
-    const auto requestId = client->newRequestId();
+    const auto requestId = newRequestId();
     seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
msgId), SeekArg{msgId}, callback);
 }
 
@@ -1561,12 +1533,7 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, const 
ResultCallback& callback)
         return;
     }
 
-    ClientImplPtr client = client_.lock();
-    if (!client) {
-        LOG_ERROR(getName() << "Client is expired when seekAsync " << 
timestamp);
-        return;
-    }
-    const auto requestId = client->newRequestId();
+    const auto requestId = newRequestId();
     seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
timestamp), SeekArg{timestamp},
                       callback);
 }
@@ -1658,8 +1625,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const 
BackoffPtr& backoff, Time
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
         if (cnx->getServerProtocolVersion() >= proto::v12) {
-            ClientImplPtr client = client_.lock();
-            uint64_t requestId = client->newRequestId();
+            auto requestId = newRequestId();
             LOG_DEBUG(getName() << " Sending getLastMessageId Command for 
Consumer - " << getConsumerId()
                                 << ", requestId - " << requestId);
 
@@ -1926,4 +1892,100 @@ void ConsumerImpl::processPossibleToDLQ(const 
MessageId& messageId, const Proces
     }
 }
 
+void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const 
MessageId& msgId,
+                                  CommandAck_AckType ackType, const 
ResultCallback& callback) {
+    const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
+    if (config_.isAckReceiptEnabled()) {
+        auto requestId = newRequestId();
+        cnx->sendRequestWithId(
+               Commands::newAck(consumerId_, msgId.ledgerId(), 
msgId.entryId(), ackSet, ackType, requestId),
+               requestId)
+            .addListener([callback](Result result, const ResponseData&) {
+                if (callback) {
+                    callback(result);
+                }
+            });
+    } else {
+        cnx->sendCommand(Commands::newAck(consumerId_, msgId.ledgerId(), 
msgId.entryId(), ackSet, ackType));
+        if (callback) {
+            callback(ResultOk);
+        }
+    }
+}
+
+void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const 
std::set<MessageId>& msgIds,
+                                  const ResultCallback& callback) {
+    std::set<MessageId> ackMsgIds;
+
+    for (const auto& msgId : msgIds) {
+        if (auto chunkMessageId =
+                
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId)))
 {
+            auto msgIdList = chunkMessageId->getChunkedMessageIds();
+            ackMsgIds.insert(msgIdList.begin(), msgIdList.end());
+        } else {
+            ackMsgIds.insert(msgId);
+        }
+    }
+    if 
(Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion()))
 {
+        if (config_.isAckReceiptEnabled()) {
+            auto requestId = newRequestId();
+            cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, 
ackMsgIds, requestId), requestId)
+                .addListener([callback](Result result, const ResponseData&) {
+                    if (callback) {
+                        callback(result);
+                    }
+                });
+        } else {
+            cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, 
ackMsgIds));
+            if (callback) {
+                callback(ResultOk);
+            }
+        }
+    } else {
+        auto count = std::make_shared<std::atomic<size_t>>(ackMsgIds.size());
+        auto wrappedCallback = [callback, count](Result result) {
+            if (--*count == 0 && callback) {
+                callback(result);
+            }
+        };
+        for (auto&& msgId : ackMsgIds) {
+            doImmediateAck(msgId, wrappedCallback, 
CommandAck_AckType_Individual);
+        }
+    }
+}
+
+void ConsumerImpl::doImmediateAck(const MessageId& msgId, const 
ResultCallback& callback,
+                                  CommandAck_AckType ackType) {
+    const auto cnx = getCnx().lock();
+    if (!cnx) {
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
+        return;
+    }
+    if (ackType == CommandAck_AckType_Individual) {
+        // If it's individual ack, we need to acknowledge all message IDs in a 
chunked message Id
+        // If it's cumulative ack, we only need to ack the last message ID of 
a chunked message.
+        // ChunkedMessageId return last chunk message ID by default, so we 
don't need to handle it.
+        if (auto chunkMessageId =
+                
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId)))
 {
+            auto msgIdList = chunkMessageId->getChunkedMessageIds();
+            doImmediateAck(cnx, std::set<MessageId>(msgIdList.begin(), 
msgIdList.end()), callback);
+            return;
+        }
+    }
+    doImmediateAck(cnx, msgId, ackType, callback);
+}
+
+void ConsumerImpl::doImmediateAck(const std::set<MessageId>& msgIds, const 
ResultCallback& callback) {
+    const auto cnx = getCnx().lock();
+    if (!cnx) {
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
+        return;
+    }
+    doImmediateAck(cnx, msgIds, callback);
+}
+
 } /* namespace pulsar */
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 055b487..5e06723 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -27,6 +27,7 @@
 #include <functional>
 #include <list>
 #include <memory>
+#include <set>
 #include <utility>
 
 #include "BrokerConsumerStatsImpl.h"
@@ -96,7 +97,7 @@ class ConsumerImpl : public ConsumerImplBase {
     void setPartitionIndex(int partitionIndex);
     int getPartitionIndex();
     void sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int 
numMessages);
-    uint64_t getConsumerId();
+    uint64_t getConsumerId() const noexcept { return consumerId_; }
     void messageReceived(const ClientConnectionPtr& cnx, const 
proto::CommandMessage& msg,
                          bool& isChecksumValid, proto::BrokerEntryMetadata& 
brokerEntryMetadata,
                          proto::MessageMetadata& msgMetadata, SharedBuffer& 
payload);
@@ -124,6 +125,10 @@ class ConsumerImpl : public ConsumerImplBase {
     void shutdown() override;
     void internalShutdown();
     bool isClosed() override;
+    bool isClosingOrClosed() const noexcept {
+        const auto state = state_.load(std::memory_order_relaxed);
+        return state == Closing || state == Closed;
+    }
     bool isOpen() override;
     Result pauseMessageListener() override;
     Result resumeMessageListener() override;
@@ -152,6 +157,9 @@ class ConsumerImpl : public ConsumerImplBase {
     void beforeConnectionChange(ClientConnection& cnx) override;
     void onNegativeAcksSend(const std::set<MessageId>& messageIds);
 
+    void doImmediateAck(const MessageId& msgId, const ResultCallback& 
callback, CommandAck_AckType ackType);
+    void doImmediateAck(const std::set<MessageId>& msgIds, const 
ResultCallback& callback);
+
    protected:
     // overrided methods from HandlerBase
     Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx) 
override;
@@ -237,7 +245,7 @@ class ConsumerImpl : public ConsumerImplBase {
     std::queue<ReceiveCallback> pendingReceives_;
     std::atomic_int availablePermits_;
     const int receiverQueueRefillThreshold_;
-    uint64_t consumerId_;
+    const uint64_t consumerId_;
     const std::string consumerStr_;
     int32_t partitionIndex_ = -1;
     Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;
@@ -246,7 +254,7 @@ class ConsumerImpl : public ConsumerImplBase {
     UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
     BrokerConsumerStatsImpl brokerConsumerStats_;
     std::shared_ptr<NegativeAcksTracker> negativeAcksTracker_;
-    AckGroupingTrackerPtr ackGroupingTrackerPtr_;
+    const AckGroupingTrackerPtr ackGroupingTrackerPtr_;
 
     MessageCryptoPtr msgCrypto_;
     const bool readCompacted_;
@@ -340,6 +348,9 @@ class ConsumerImpl : public ConsumerImplBase {
     std::atomic_bool expireChunkMessageTaskScheduled_{false};
 
     ConsumerInterceptorsPtr interceptors_;
+    const std::shared_ptr<std::atomic<uint64_t>> requestIdGenerator_;
+
+    uint64_t newRequestId() const { return (*requestIdGenerator_)++; }
 
     void triggerCheckExpiredChunkedTimer();
     void discardChunkMessages(const std::string& uuid, const MessageId& 
messageId, bool autoAck);
@@ -379,6 +390,11 @@ class ConsumerImpl : public ConsumerImplBase {
         }
     }
 
+    void doImmediateAck(const ClientConnectionPtr& cnx, const MessageId& 
msgId, CommandAck_AckType ackType,
+                        const ResultCallback& callback);
+    void doImmediateAck(const ClientConnectionPtr& cnx, const 
std::set<MessageId>& msgIds,
+                        const ResultCallback& callback);
+
     friend class PulsarFriend;
     friend class MultiTopicsConsumerImpl;
 
diff --git a/tests/AcknowledgeTest.cc b/tests/AcknowledgeTest.cc
index 0e2183e..464d5d2 100644
--- a/tests/AcknowledgeTest.cc
+++ b/tests/AcknowledgeTest.cc
@@ -375,6 +375,34 @@ TEST_F(AcknowledgeTest, testAckReceiptEnabled) {
     client.close();
 }
 
+TEST_F(AcknowledgeTest, testCloseConsumer) {
+    Client client(lookupUrl);
+    const auto topic = "test-close-consumer" + unique_str();
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setAckGroupingTimeMs(60000);
+    consumerConfig.setAckGroupingMaxSize(100);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumerConfig, 
consumer));
+
+    producer.send(MessageBuilder().setContent("msg-0").build());
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+    consumer.acknowledgeAsync(
+        msg, nullptr);  // it just adds the msg id to the pending ack list due 
to the ack grouping configs
+    consumer.close();   // it will flush the pending ACK and prevent any 
further ack
+    ASSERT_EQ(ResultAlreadyClosed, consumer.acknowledge(msg));
+    ASSERT_EQ(ResultAlreadyClosed, consumer.acknowledgeCumulative(msg));
+    ASSERT_EQ(ResultAlreadyClosed, 
consumer.acknowledge(std::vector<MessageId>{msg.getMessageId()}));
+
+    producer.send(MessageBuilder().setContent("msg-1").build());
+    // Recreate the consumer to verify the first message is acknowledged
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumerConfig, 
consumer));
+    ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+    ASSERT_EQ("msg-1", msg.getDataAsString());
+}
+
 INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest,
                          testing::Combine(testing::Values(100, 0), 
testing::Values(true, false)),
                          [](const testing::TestParamInfo<std::tuple<int, 
bool>>& info) {
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index 4330609..5cf478b 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -38,7 +38,6 @@
 #include "lib/AckGroupingTrackerEnabled.h"
 #include "lib/ClientConnection.h"
 #include "lib/ClientImpl.h"
-#include "lib/Commands.h"
 #include "lib/ConsumerImpl.h"
 #include "lib/Future.h"
 #include "lib/Latch.h"
@@ -3633,7 +3632,7 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerDefaultBehavior) {
     ASSERT_EQ(configConsumer.getAckGroupingTimeMs(), 100);
     ASSERT_EQ(configConsumer.getAckGroupingMaxSize(), 1000);
 
-    AckGroupingTracker tracker{nullptr, nullptr, 0, false};
+    AckGroupingTracker tracker;
     Message msg;
     ASSERT_FALSE(tracker.isDuplicate(msg.getMessageId()));
 }
@@ -3672,10 +3671,8 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerSingleAckBehavior) {
 
     // Send ACK.
     auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
-    AckGroupingTrackerDisabled tracker([&consumerImpl]() { return 
consumerImpl.getCnx().lock(); },
-                                       [&clientImplPtr] { return 
clientImplPtr->newRequestId(); },
-                                       consumerImpl.getConsumerId(), false);
-    tracker.start();
+    AckGroupingTrackerDisabled tracker;
+    tracker.start(PulsarFriend::getConsumerImplPtr(consumer));
     for (auto msgIdx = 0; msgIdx < numMsg; ++msgIdx) {
         auto connPtr = connWeakPtr.lock();
         ASSERT_NE(connPtr, nullptr);
@@ -3707,8 +3704,6 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerMultiAckBehavior) {
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
 
-    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
-
     // Sending and receiving messages.
     for (auto count = 0; count < numMsg; ++count) {
         Message msg = MessageBuilder().setContent(std::string("MSG-") + 
std::to_string(count)).build();
@@ -3724,10 +3719,8 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerMultiAckBehavior) {
 
     // Send ACK.
     auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
-    AckGroupingTrackerDisabled tracker([&consumerImpl]() { return 
consumerImpl.getCnx().lock(); },
-                                       [&clientImplPtr] { return 
clientImplPtr->newRequestId(); },
-                                       consumerImpl.getConsumerId(), false);
-    tracker.start();
+    AckGroupingTrackerDisabled tracker;
+    tracker.start(PulsarFriend::getConsumerImplPtr(consumer));
     tracker.addAcknowledgeList(recvMsgId, nullptr);
     consumer.close();
 
@@ -3755,7 +3748,6 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerDisabledIndividualAck) {
 
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
-    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
 
     // Sending and receiving messages.
     for (auto count = 0; count < numMsg; ++count) {
@@ -3771,8 +3763,8 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerDisabledIndividualAck) {
     }
 
     // Send ACK.
-    AckGroupingTrackerDisabled tracker([&consumerImpl] { return 
consumerImpl.getCnx().lock(); }, nullptr,
-                                       consumerImpl.getConsumerId(), false);
+    AckGroupingTrackerDisabled tracker;
+    tracker.start(PulsarFriend::getConsumerImplPtr(consumer));
     for (auto &msgId : recvMsgId) {
         tracker.addAcknowledge(msgId, nullptr);
     }
@@ -3802,7 +3794,6 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerDisabledCumulativeAck) {
 
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
-    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
 
     // Sending and receiving messages.
     for (auto count = 0; count < numMsg; ++count) {
@@ -3818,8 +3809,8 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerDisabledCumulativeAck) {
     }
 
     // Send ACK.
-    AckGroupingTrackerDisabled tracker([&consumerImpl] { return 
consumerImpl.getCnx().lock(); }, nullptr,
-                                       consumerImpl.getConsumerId(), false);
+    AckGroupingTrackerDisabled tracker;
+    tracker.start(PulsarFriend::getConsumerImplPtr(consumer));
     auto &latestMsgId = *std::max_element(recvMsgId.begin(), recvMsgId.end());
     tracker.addAcknowledgeCumulative(latestMsgId, nullptr);
     consumer.close();
@@ -3861,7 +3852,6 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerEnabledIndividualAck) {
 
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
-    auto consumerImpl = PulsarFriend::getConsumerImplPtr(consumer);
 
     // Sending and receiving messages.
     for (auto count = 0; count < numMsg; ++count) {
@@ -3877,9 +3867,8 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerEnabledIndividualAck) {
     }
 
     auto tracker = std::make_shared<AckGroupingTrackerEnabledMock>(
-        [&consumerImpl] { return consumerImpl->getCnx().lock(); }, nullptr, 
consumerImpl->getConsumerId(),
-        false, ackGroupingTimeMs, ackGroupingMaxSize, 
clientImplPtr->getIOExecutorProvider()->get());
-    tracker->start();
+        ackGroupingTimeMs, ackGroupingMaxSize, false, 
clientImplPtr->getIOExecutorProvider()->get());
+    tracker->start(PulsarFriend::getConsumerImplPtr(consumer));
     ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
     ASSERT_EQ(tracker->getAckGroupingTimeMs(), ackGroupingTimeMs);
     ASSERT_EQ(tracker->getAckGroupingMaxSize(), ackGroupingMaxSize);
@@ -3939,9 +3928,8 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerEnabledCumulativeAck) {
     std::sort(recvMsgId.begin(), recvMsgId.end());
 
     auto tracker0 = std::make_shared<AckGroupingTrackerEnabledMock>(
-        [&consumerImpl0] { return consumerImpl0->getCnx().lock(); }, nullptr, 
consumerImpl0->getConsumerId(),
-        false, ackGroupingTimeMs, ackGroupingMaxSize, 
clientImplPtr->getIOExecutorProvider()->get());
-    tracker0->start();
+        ackGroupingTimeMs, ackGroupingMaxSize, false, 
clientImplPtr->getIOExecutorProvider()->get());
+    tracker0->start(PulsarFriend::getConsumerImplPtr(consumer));
     ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), MessageId::earliest());
     ASSERT_FALSE(tracker0->requireCumulativeAck());
 
@@ -3976,11 +3964,10 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerEnabledCumulativeAck) {
     auto ret = consumer.receive(msg, 1000);
     ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " << 
msg.getDataAsString();
     auto tracker1 = std::make_shared<AckGroupingTrackerEnabledMock>(
-        [&consumerImpl1] { return consumerImpl1->getCnx().lock(); }, nullptr, 
consumerImpl1->getConsumerId(),
-        false, ackGroupingTimeMs, ackGroupingMaxSize, 
clientImplPtr->getIOExecutorProvider()->get());
-    tracker1->start();
+        ackGroupingTimeMs, ackGroupingMaxSize, false, 
clientImplPtr->getIOExecutorProvider()->get());
+    tracker1->start(PulsarFriend::getConsumerImplPtr(consumer));
     tracker1->addAcknowledgeCumulative(recvMsgId[numMsg - 1], nullptr);
-    tracker1.reset();
+    tracker1->close();
     consumer.close();
 
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index dfbc276..3aa1dd3 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -25,6 +25,7 @@
 #include <atomic>
 #include <chrono>
 #include <ctime>
+#include <future>
 #include <map>
 #include <mutex>
 #include <set>
@@ -1542,12 +1543,19 @@ TEST(ConsumerTest, 
testConsumerListenerShouldNotSegfaultAfterClose) {
     consumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
     Latch latchFirstReceiveMsg(1);
     Latch latchAfterClosed(1);
-    consumerConfig.setMessageListener(
-        [&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const 
Message& msg) {
-            latchFirstReceiveMsg.countdown();
-            LOG_INFO("Consume message: " << msg.getDataAsString());
-            latchAfterClosed.wait();
-        });
+
+    std::promise<std::vector<Result>> ackResultsPromise;
+    consumerConfig.setMessageListener([&latchFirstReceiveMsg, 
&latchAfterClosed, &ackResultsPromise](
+                                          Consumer consumer, const Message& 
msg) {
+        latchFirstReceiveMsg.countdown();
+        LOG_INFO("Consume message: " << msg.getDataAsString());
+        latchAfterClosed.wait();
+        std::vector<Result> results(3);
+        results[0] = consumer.acknowledge(msg);
+        results[1] = consumer.acknowledgeCumulative(msg);
+        results[2] = 
consumer.acknowledge(std::vector<MessageId>{msg.getMessageId()});
+        ackResultsPromise.set_value(results);
+    });
     auto result = client.subscribe(topicName, "test-sub", consumerConfig, 
consumer);
     ASSERT_EQ(ResultOk, result);
 
@@ -1555,6 +1563,11 @@ TEST(ConsumerTest, 
testConsumerListenerShouldNotSegfaultAfterClose) {
     latchFirstReceiveMsg.wait();
     ASSERT_EQ(ResultOk, consumer.close());
     latchAfterClosed.countdown();
+    const auto ackResults = ackResultsPromise.get_future().get();
+    ASSERT_EQ(3, ackResults.size());
+    for (size_t i = 0; i < ackResults.size(); i++) {
+        ASSERT_EQ(ResultAlreadyClosed, ackResults[i]) << "ack result[" << i << 
"] " << ackResults[i];
+    }
 
     ASSERT_EQ(ResultOk, producer.close());
     ASSERT_EQ(ResultOk, client.close());

Reply via email to