This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f91575  Support waiting for the ACK response (#232)
1f91575 is described below

commit 1f9157554fc42c9445185b934e17d9a5fc898f7a
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Mar 29 17:45:12 2023 +0800

    Support waiting for the ACK response (#232)
    
    * Support waiting for the ACK response
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/102
    
    ### Modifications
    
    Add a consumer configuration to set `ackReceiptEnabled`, if it's true,
    when sending a CommandAck request, generate the request id and set the
    `request_id` field` so that the broker will respond with a
    `CommandAckResponse`. Here we pass the shared pointer to an atomic
    integer so that we no longer need to hold a weak pointer to
    `ClientImpl`.
    
    Pass the user-defined callback to the ACK grouping tracker, when the
    `ackReceiptEnabled` is true, trigger the callback only after receiving
    the ACK response.
    
    ### Verifications
    
    - Add `testAckReceiptEnabled` to verify when `ackReceiptEnabled` is
      true, the callback will be triggered after `ackGroupingMaxTime`.
    - Support configuring `ackReceiptEnabled` for existing `TEST_P`
      parameterized tests in `AcknowledgeTest`.
    
    * Use multiple overloads for newAck
    
    * Fix callback is not called
---
 include/pulsar/ConsumerConfiguration.h | 16 ++++++
 lib/AckGroupingTracker.cc              | 89 ++++++++++++++++++++++------------
 lib/AckGroupingTracker.h               | 51 ++++++++++---------
 lib/AckGroupingTrackerDisabled.cc      | 20 +++-----
 lib/AckGroupingTrackerDisabled.h       | 21 ++------
 lib/AckGroupingTrackerEnabled.cc       | 88 ++++++++++++++++-----------------
 lib/AckGroupingTrackerEnabled.h        | 44 ++++++++---------
 lib/ClientConnection.cc                | 26 ++++++++++
 lib/ClientConnection.h                 |  2 +
 lib/ClientImpl.cc                      |  6 +--
 lib/ClientImpl.h                       |  4 +-
 lib/Commands.cc                        | 54 +++++++++++++++++----
 lib/Commands.h                         |  7 +++
 lib/ConsumerConfiguration.cc           |  7 +++
 lib/ConsumerConfigurationImpl.h        |  1 +
 lib/ConsumerImpl.cc                    | 45 +++++++++++------
 tests/AcknowledgeTest.cc               | 73 ++++++++++++++++++++++++++--
 tests/BasicEndToEndTest.cc             | 80 +++++++++++-------------------
 tests/ConsumerConfigurationTest.cc     |  4 ++
 19 files changed, 394 insertions(+), 244 deletions(-)

diff --git a/include/pulsar/ConsumerConfiguration.h 
b/include/pulsar/ConsumerConfiguration.h
index ee0c634..0e6634d 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -629,6 +629,22 @@ class PULSAR_PUBLIC ConsumerConfiguration {
 
     const std::vector<ConsumerInterceptorPtr>& getInterceptors() const;
 
+    /**
+     * Whether to receive the ACK receipt from broker.
+     *
+     * By default, when Consumer::acknowledge is called, it won't wait until 
the corresponding response from
+     * broker. After it's enabled, the `acknowledge` method will return a 
Result that indicates if the
+     * acknowledgment succeeded.
+     *
+     * Default: false
+     */
+    ConsumerConfiguration& setAckReceiptEnabled(bool ackReceiptEnabled);
+
+    /**
+     * The associated getter of setAckReceiptEnabled.
+     */
+    bool isAckReceiptEnabled() const;
+
     friend class PulsarWrapper;
     friend class PulsarFriend;
 
diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc
index 064d4ea..9a47135 100644
--- a/lib/AckGroupingTracker.cc
+++ b/lib/AckGroupingTracker.cc
@@ -19,6 +19,9 @@
 
 #include "AckGroupingTracker.h"
 
+#include <atomic>
+#include <limits>
+
 #include "BitSet.h"
 #include "ClientConnection.h"
 #include "Commands.h"
@@ -29,24 +32,33 @@ namespace pulsar {
 
 DECLARE_LOG_OBJECT();
 
-inline void sendAck(ClientConnectionPtr cnx, uint64_t consumerId, const 
MessageId& msgId,
-                    CommandAck_AckType ackType) {
-    const auto& bitSet = Commands::getMessageIdImpl(msgId)->getBitSet();
-    auto cmd = Commands::newAck(consumerId, msgId.ledgerId(), msgId.entryId(), 
bitSet, ackType, -1);
-    cnx->sendCommand(cmd);
-    LOG_DEBUG("ACK request is sent for message - [" << msgId.ledgerId() << ", 
" << msgId.entryId() << "]");
-}
-
-bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, 
uint64_t consumerId,
-                                        const MessageId& msgId, 
CommandAck_AckType ackType) {
-    auto cnx = connWeakPtr.lock();
-    if (cnx == nullptr) {
-        LOG_DEBUG("Connection is not ready, ACK failed for message - [" << 
msgId.ledgerId() << ", "
-                                                                        << 
msgId.entryId() << "]");
-        return false;
+void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback 
callback,
+                                        CommandAck_AckType ackType) const {
+    const auto cnx = connectionSupplier_();
+    if (!cnx) {
+        LOG_DEBUG("Connection is not ready, ACK failed for " << msgId);
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
+        return;
+    }
+    const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
+    if (waitResponse_) {
+        const auto requestId = requestIdSupplier_();
+        cnx->sendRequestWithId(
+               Commands::newAck(consumerId_, msgId.ledgerId(), 
msgId.entryId(), ackSet, ackType, requestId),
+               requestId)
+            .addListener([callback](Result result, const ResponseData&) {
+                if (callback) {
+                    callback(result);
+                }
+            });
+    } else {
+        cnx->sendCommand(Commands::newAck(consumerId_, msgId.ledgerId(), 
msgId.entryId(), ackSet, ackType));
+        if (callback) {
+            callback(ResultOk);
+        }
     }
-    sendAck(cnx, consumerId, msgId, ackType);
-    return true;
 }
 
 static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& 
msgIds) {
@@ -62,25 +74,42 @@ static std::ostream& operator<<(std::ostream& os, const 
std::set<MessageId>& msg
     return os;
 }
 
-bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, 
uint64_t consumerId,
-                                        const std::set<MessageId>& msgIds) {
-    auto cnx = connWeakPtr.lock();
-    if (cnx == nullptr) {
-        LOG_DEBUG("Connection is not ready, ACK failed.");
-        return false;
+void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, 
ResultCallback callback) const {
+    const auto cnx = connectionSupplier_();
+    if (!cnx) {
+        LOG_DEBUG("Connection is not ready, ACK failed for " << msgIds);
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
+        return;
     }
 
     if 
(Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion()))
 {
-        auto cmd = Commands::newMultiMessageAck(consumerId, msgIds);
-        cnx->sendCommand(cmd);
-        LOG_DEBUG("ACK request is sent for " << msgIds.size() << " messages: " 
<< msgIds);
+        if (waitResponse_) {
+            const auto requestId = requestIdSupplier_();
+            cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, 
msgIds, requestId), requestId)
+                .addListener([callback](Result result, const ResponseData&) {
+                    if (callback) {
+                        callback(result);
+                    }
+                });
+        } else {
+            cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, 
msgIds));
+            if (callback) {
+                callback(ResultOk);
+            }
+        }
     } else {
-        // Broker does not support multi-message ACK, use multiple individual 
ACKs instead.
-        for (const auto& msgId : msgIds) {
-            sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
+        auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
+        auto wrappedCallback = [callback, count](Result result) {
+            if (--*count == 0 && callback) {
+                callback(result);
+            }
+        };
+        for (auto&& msgId : msgIds) {
+            doImmediateAck(msgId, wrappedCallback, 
CommandAck_AckType_Individual);
         }
     }
-    return true;
 }
 
 }  // namespace pulsar
diff --git a/lib/AckGroupingTracker.h b/lib/AckGroupingTracker.h
index 0a986a0..2b48142 100644
--- a/lib/AckGroupingTracker.h
+++ b/lib/AckGroupingTracker.h
@@ -20,8 +20,10 @@
 #define LIB_ACKGROUPINGTRACKER_H_
 
 #include <pulsar/MessageId.h>
+#include <pulsar/Result.h>
 
 #include <cstdint>
+#include <functional>
 #include <set>
 
 #include "ProtoApiEnums.h"
@@ -29,7 +31,9 @@
 namespace pulsar {
 
 class ClientConnection;
+using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
 using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
+using ResultCallback = std::function<void(Result)>;
 
 /**
  * @class AckGroupingTracker
@@ -38,7 +42,13 @@ using ClientConnectionWeakPtr = 
std::weak_ptr<ClientConnection>;
  */
 class AckGroupingTracker : public 
std::enable_shared_from_this<AckGroupingTracker> {
    public:
-    AckGroupingTracker() = default;
+    AckGroupingTracker(std::function<ClientConnectionPtr()> connectionSupplier,
+                       std::function<uint64_t()> requestIdSupplier, uint64_t 
consumerId, bool waitResponse)
+        : connectionSupplier_(connectionSupplier),
+          requestIdSupplier_(requestIdSupplier),
+          consumerId_(consumerId),
+          waitResponse_(waitResponse) {}
+
     virtual ~AckGroupingTracker() = default;
 
     /**
@@ -59,20 +69,23 @@ class AckGroupingTracker : public 
std::enable_shared_from_this<AckGroupingTracke
     /**
      * Adding message ID into ACK group for individual ACK.
      * @param[in] msgId ID of the message to be ACKed.
+     * @param[in] callback the callback that is triggered when the message is 
acknowledged
      */
-    virtual void addAcknowledge(const MessageId& msgId) {}
+    virtual void addAcknowledge(const MessageId& msgId, ResultCallback 
callback) {}
 
     /**
      * Adding message ID list into ACK group for individual ACK.
      * @param[in] msgIds of the message to be ACKed.
+     * @param[in] callback the callback that is triggered when the messages 
are acknowledged
      */
-    virtual void addAcknowledgeList(const MessageIdList& msgIds) {}
+    virtual void addAcknowledgeList(const MessageIdList& msgIds, 
ResultCallback callback) {}
 
     /**
      * Adding message ID into ACK group for cumulative ACK.
      * @param[in] msgId ID of the message to be ACKed.
+     * @param[in] callback the callback that is triggered when the message is 
acknowledged
      */
-    virtual void addAcknowledgeCumulative(const MessageId& msgId) {}
+    virtual void addAcknowledgeCumulative(const MessageId& msgId, 
ResultCallback callback) {}
 
     /**
      * Flush all the pending grouped ACKs (as flush() does), and stop period 
ACKs sending.
@@ -91,27 +104,17 @@ class AckGroupingTracker : public 
std::enable_shared_from_this<AckGroupingTracke
     virtual void flushAndClean() {}
 
    protected:
-    /**
-     * Immediately send ACK request to broker.
-     * @param[in] connWeakPtr weak pointer of the client connection.
-     * @param[in] consumerId ID of the consumer that performs this ACK.
-     * @param[in] msgId message ID to be ACKed.
-     * @param[in] ackType ACK type, e.g. cumulative.
-     * @return true if the ACK is sent successfully, otherwise false.
-     */
-    bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t 
consumerId, const MessageId& msgId,
-                        CommandAck_AckType ackType);
+    void doImmediateAck(const MessageId& msgId, ResultCallback callback, 
CommandAck_AckType ackType) const;
+    void doImmediateAck(const std::set<MessageId>& msgIds, ResultCallback 
callback) const;
+
+   private:
+    const std::function<ClientConnectionPtr()> connectionSupplier_;
+    const std::function<uint64_t()> requestIdSupplier_;
+    const uint64_t consumerId_;
+
+   protected:
+    const bool waitResponse_;
 
-    /**
-     * Immediately send a set of ACK requests one by one to the broker, it 
only supports individual
-     * ACK.
-     * @param[in] connWeakPtr weak pointer of the client connection.
-     * @param[in] consumerId ID of the consumer that performs this ACK.
-     * @param[in] msgIds message IDs to be ACKed.
-     * @return true if the ACK is sent successfully, otherwise false.
-     */
-    bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t 
consumerId,
-                        const std::set<MessageId>& msgIds);
 };  // class AckGroupingTracker
 
 using AckGroupingTrackerPtr = std::shared_ptr<AckGroupingTracker>;
diff --git a/lib/AckGroupingTrackerDisabled.cc 
b/lib/AckGroupingTrackerDisabled.cc
index 2fe2b2e..b29e1f7 100644
--- a/lib/AckGroupingTrackerDisabled.cc
+++ b/lib/AckGroupingTrackerDisabled.cc
@@ -20,32 +20,24 @@
 #include "AckGroupingTrackerDisabled.h"
 
 #include "HandlerBase.h"
-#include "LogUtils.h"
 #include "ProtoApiEnums.h"
 
 namespace pulsar {
 
-DECLARE_LOG_OBJECT();
-
-AckGroupingTrackerDisabled::AckGroupingTrackerDisabled(HandlerBase& handler, 
uint64_t consumerId)
-    : AckGroupingTracker(), handler_(handler), consumerId_(consumerId) {
-    LOG_INFO("ACK grouping is disabled.");
-}
-
-void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId) {
-    this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, 
CommandAck_AckType_Individual);
+void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId, 
ResultCallback callback) {
+    doImmediateAck(msgId, callback, CommandAck_AckType_Individual);
 }
 
-void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& 
msgIds) {
+void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& 
msgIds, ResultCallback callback) {
     std::set<MessageId> msgIdSet;
     for (auto&& msgId : msgIds) {
         msgIdSet.emplace(msgId);
     }
-    this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgIdSet);
+    doImmediateAck(msgIdSet, callback);
 }
 
-void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& 
msgId) {
-    this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, 
CommandAck_AckType_Cumulative);
+void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& 
msgId, ResultCallback callback) {
+    doImmediateAck(msgId, callback, CommandAck_AckType_Cumulative);
 }
 
 }  // namespace pulsar
diff --git a/lib/AckGroupingTrackerDisabled.h b/lib/AckGroupingTrackerDisabled.h
index 7b41686..9619360 100644
--- a/lib/AckGroupingTrackerDisabled.h
+++ b/lib/AckGroupingTrackerDisabled.h
@@ -34,25 +34,12 @@ class HandlerBase;
  */
 class AckGroupingTrackerDisabled : public AckGroupingTracker {
    public:
+    using AckGroupingTracker::AckGroupingTracker;
     virtual ~AckGroupingTrackerDisabled() = default;
 
-    /**
-     * Constructing ACK grouping tracker for peresistent topics that disabled 
ACK grouping.
-     * @param[in] handler the connection handler.
-     * @param[in] consumerId consumer ID that this tracker belongs to.
-     */
-    AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId);
-
-    void addAcknowledge(const MessageId& msgId) override;
-    void addAcknowledgeList(const MessageIdList& msgIds) override;
-    void addAcknowledgeCumulative(const MessageId& msgId) override;
-
-   private:
-    //! The connection handler.
-    HandlerBase& handler_;
-
-    //! ID of the consumer that this tracker belongs to.
-    uint64_t consumerId_;
+    void addAcknowledge(const MessageId& msgId, ResultCallback callback) 
override;
+    void addAcknowledgeList(const MessageIdList& msgIds, ResultCallback 
callback) override;
+    void addAcknowledgeCumulative(const MessageId& msgId, ResultCallback 
callback) override;
 };  // class AckGroupingTrackerDisabled
 
 }  // namespace pulsar
diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc
index 467e4e5..d90cc87 100644
--- a/lib/AckGroupingTrackerEnabled.cc
+++ b/lib/AckGroupingTrackerEnabled.cc
@@ -27,13 +27,10 @@
 #include "Commands.h"
 #include "ExecutorService.h"
 #include "HandlerBase.h"
-#include "LogUtils.h"
 #include "MessageIdUtil.h"
 
 namespace pulsar {
 
-DECLARE_LOG_OBJECT();
-
 // Define a customized compare logic whose difference with the default compare 
logic of MessageId is:
 // When two MessageId objects are in the same entry, if only one of them is a 
message in the batch, treat
 // it as a smaller one.
@@ -47,27 +44,6 @@ static int compare(const MessageId& lhs, const MessageId& 
rhs) {
     }
 }
 
-AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
-                                                     const HandlerBasePtr& 
handlerPtr, uint64_t consumerId,
-                                                     long ackGroupingTimeMs, 
long ackGroupingMaxSize)
-    : AckGroupingTracker(),
-      isClosed_(false),
-      handlerWeakPtr_(handlerPtr),
-      consumerId_(consumerId),
-      nextCumulativeAckMsgId_(MessageId::earliest()),
-      requireCumulativeAck_(false),
-      mutexCumulativeAckMsgId_(),
-      pendingIndividualAcks_(),
-      rmutexPendingIndAcks_(),
-      ackGroupingTimeMs_(ackGroupingTimeMs),
-      ackGroupingMaxSize_(ackGroupingMaxSize),
-      executor_(clientPtr->getIOExecutorProvider()->get()),
-      timer_(),
-      mutexTimer_() {
-    LOG_DEBUG("ACK grouping is enabled, grouping time " << ackGroupingTimeMs 
<< "ms, grouping max size "
-                                                        << ackGroupingMaxSize);
-}
-
 void AckGroupingTrackerEnabled::start() { this->scheduleTimer(); }
 
 bool AckGroupingTrackerEnabled::isDuplicate(const MessageId& msgId) {
@@ -84,29 +60,55 @@ bool AckGroupingTrackerEnabled::isDuplicate(const 
MessageId& msgId) {
     return this->pendingIndividualAcks_.count(msgId) > 0;
 }
 
-void AckGroupingTrackerEnabled::addAcknowledge(const MessageId& msgId) {
+void AckGroupingTrackerEnabled::addAcknowledge(const MessageId& msgId, 
ResultCallback callback) {
     std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
     this->pendingIndividualAcks_.insert(msgId);
+    if (waitResponse_) {
+        this->pendingIndividualCallbacks_.emplace_back(callback);
+    } else if (callback) {
+        callback(ResultOk);
+    }
     if (this->ackGroupingMaxSize_ > 0 && this->pendingIndividualAcks_.size() 
>= this->ackGroupingMaxSize_) {
         this->flush();
     }
 }
 
-void AckGroupingTrackerEnabled::addAcknowledgeList(const MessageIdList& 
msgIds) {
+void AckGroupingTrackerEnabled::addAcknowledgeList(const MessageIdList& 
msgIds, ResultCallback callback) {
     std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
     for (const auto& msgId : msgIds) {
         this->pendingIndividualAcks_.emplace(msgId);
     }
+    if (waitResponse_) {
+        this->pendingIndividualCallbacks_.emplace_back(callback);
+    } else if (callback) {
+        callback(ResultOk);
+    }
     if (this->ackGroupingMaxSize_ > 0 && this->pendingIndividualAcks_.size() 
>= this->ackGroupingMaxSize_) {
         this->flush();
     }
 }
 
-void AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId& 
msgId) {
-    std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
+void AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId& 
msgId, ResultCallback callback) {
+    std::unique_lock<std::mutex> lock(this->mutexCumulativeAckMsgId_);
     if (compare(msgId, this->nextCumulativeAckMsgId_) > 0) {
         this->nextCumulativeAckMsgId_ = msgId;
         this->requireCumulativeAck_ = true;
+        // Trigger the previous pending callback
+        if (latestCumulativeCallback_) {
+            latestCumulativeCallback_(ResultOk);
+        }
+        if (waitResponse_) {
+            // Move the callback to latestCumulativeCallback_ so that it will 
be triggered when receiving the
+            // AckResponse or being replaced by a newer MessageId
+            latestCumulativeCallback_ = callback;
+            callback = nullptr;
+        } else {
+            latestCumulativeCallback_ = nullptr;
+        }
+    }
+    lock.unlock();
+    if (callback) {
+        callback(ResultOk);
     }
 }
 
@@ -121,27 +123,13 @@ void AckGroupingTrackerEnabled::close() {
 }
 
 void AckGroupingTrackerEnabled::flush() {
-    auto handler = handlerWeakPtr_.lock();
-    if (!handler) {
-        LOG_DEBUG("Reference to the HandlerBase is not valid.");
-        return;
-    }
-    auto cnx = handler->getCnx().lock();
-    if (cnx == nullptr) {
-        LOG_DEBUG("Connection is not ready, grouping ACK failed.");
-        return;
-    }
-
     // Send ACK for cumulative ACK requests.
     {
         std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
         if (this->requireCumulativeAck_) {
-            if (!this->doImmediateAck(cnx, this->consumerId_, 
this->nextCumulativeAckMsgId_,
-                                      CommandAck_AckType_Cumulative)) {
-                // Failed to send ACK.
-                LOG_WARN("Failed to send cumulative ACK.");
-                return;
-            }
+            this->doImmediateAck(this->nextCumulativeAckMsgId_, 
this->latestCumulativeCallback_,
+                                 CommandAck_AckType_Cumulative);
+            this->latestCumulativeCallback_ = nullptr;
             this->requireCumulativeAck_ = false;
         }
     }
@@ -149,7 +137,14 @@ void AckGroupingTrackerEnabled::flush() {
     // Send ACK for individual ACK requests.
     std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
     if (!this->pendingIndividualAcks_.empty()) {
-        this->doImmediateAck(cnx, consumerId_, this->pendingIndividualAcks_);
+        std::vector<ResultCallback> callbacks;
+        callbacks.swap(this->pendingIndividualCallbacks_);
+        auto callback = [callbacks](Result result) {
+            for (auto&& callback : callbacks) {
+                callback(result);
+            }
+        };
+        this->doImmediateAck(this->pendingIndividualAcks_, callback);
         this->pendingIndividualAcks_.clear();
     }
 }
@@ -159,6 +154,7 @@ void AckGroupingTrackerEnabled::flushAndClean() {
     {
         std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
         this->nextCumulativeAckMsgId_ = MessageId::earliest();
+        this->latestCumulativeCallback_ = nullptr;
         this->requireCumulativeAck_ = false;
     }
     std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
diff --git a/lib/AckGroupingTrackerEnabled.h b/lib/AckGroupingTrackerEnabled.h
index 1874fd4..ec1d66b 100644
--- a/lib/AckGroupingTrackerEnabled.h
+++ b/lib/AckGroupingTrackerEnabled.h
@@ -46,24 +46,24 @@ using HandlerBaseWeakPtr = std::weak_ptr<HandlerBase>;
  */
 class AckGroupingTrackerEnabled : public AckGroupingTracker {
    public:
-    virtual ~AckGroupingTrackerEnabled() { this->close(); }
+    AckGroupingTrackerEnabled(std::function<ClientConnectionPtr()> 
connectionSupplier,
+                              std::function<uint64_t()> requestIdSupplier, 
uint64_t consumerId,
+                              bool waitResponse, long ackGroupingTimeMs, long 
ackGroupingMaxSize,
+                              const ExecutorServicePtr& executor)
+        : AckGroupingTracker(connectionSupplier, requestIdSupplier, 
consumerId, waitResponse),
+          ackGroupingTimeMs_(ackGroupingTimeMs),
+          ackGroupingMaxSize_(ackGroupingMaxSize),
+          executor_(executor) {
+        pendingIndividualCallbacks_.reserve(ackGroupingMaxSize);
+    }
 
-    /**
-     * Constructing ACK grouping tracker for peresistent topics.
-     * @param[in] clientPtr pointer to client object.
-     * @param[in] handlerPtr the shared pointer to connection handler.
-     * @param[in] consumerId consumer ID that this tracker belongs to.
-     * @param[in] ackGroupingTimeMs ACK grouping time window in milliseconds.
-     * @param[in] ackGroupingMaxSize max. number of ACK requests can be 
grouped.
-     */
-    AckGroupingTrackerEnabled(ClientImplPtr clientPtr, const HandlerBasePtr& 
handlerPtr, uint64_t consumerId,
-                              long ackGroupingTimeMs, long ackGroupingMaxSize);
+    virtual ~AckGroupingTrackerEnabled() { this->close(); }
 
     void start() override;
     bool isDuplicate(const MessageId& msgId) override;
-    void addAcknowledge(const MessageId& msgId) override;
-    void addAcknowledgeList(const MessageIdList& msgIds) override;
-    void addAcknowledgeCumulative(const MessageId& msgId) override;
+    void addAcknowledge(const MessageId& msgId, ResultCallback callback) 
override;
+    void addAcknowledgeList(const MessageIdList& msgIds, ResultCallback 
callback) override;
+    void addAcknowledgeCumulative(const MessageId& msgId, ResultCallback 
callback) override;
     void close() override;
     void flush() override;
     void flushAndClean() override;
@@ -73,21 +73,17 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker 
{
     void scheduleTimer();
 
     //! State
-    std::atomic_bool isClosed_;
-
-    //! The connection handler.
-    HandlerBaseWeakPtr handlerWeakPtr_;
-
-    //! ID of the consumer that this tracker belongs to.
-    uint64_t consumerId_;
+    std::atomic_bool isClosed_{false};
 
     //! Next message ID to be cumulatively cumulatively.
-    MessageId nextCumulativeAckMsgId_;
-    bool requireCumulativeAck_;
+    MessageId nextCumulativeAckMsgId_{MessageId::earliest()};
+    bool requireCumulativeAck_{false};
+    ResultCallback latestCumulativeCallback_;
     std::mutex mutexCumulativeAckMsgId_;
 
     //! Individual ACK requests that have not been sent to broker.
     std::set<MessageId> pendingIndividualAcks_;
+    std::vector<ResultCallback> pendingIndividualCallbacks_;
     std::recursive_mutex rmutexPendingIndAcks_;
 
     //! Time window in milliseconds for grouping ACK requests.
@@ -97,7 +93,7 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
     const long ackGroupingMaxSize_;
 
     //! ACK request sender's scheduled executor.
-    ExecutorServicePtr executor_;
+    const ExecutorServicePtr executor_;
 
     //! Pointer to a deadline timer.
     DeadlineTimerPtr timer_;
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 08cbcc9..5837c50 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -907,6 +907,10 @@ void ClientConnection::handleIncomingCommand(BaseCommand& 
incomingCmd) {
                     handleGetSchemaResponse(incomingCmd.getschemaresponse());
                     break;
 
+                case BaseCommand::ACK_RESPONSE:
+                    handleAckResponse(incomingCmd.ackresponse());
+                    break;
+
                 default:
                     LOG_WARN(cnxString_ << "Received invalid message from 
server");
                     close();
@@ -1788,4 +1792,26 @@ void ClientConnection::handleGetSchemaResponse(const 
proto::CommandGetSchemaResp
     }
 }
 
+void ClientConnection::handleAckResponse(const proto::CommandAckResponse& 
response) {
+    LOG_DEBUG(cnxString_ << "Received AckResponse from server. req_id: " << 
response.request_id());
+
+    Lock lock(mutex_);
+    auto it = pendingRequests_.find(response.request_id());
+    if (it == pendingRequests_.cend()) {
+        lock.unlock();
+        LOG_WARN("Cannot find the cached AckResponse whose req_id is " << 
response.request_id());
+        return;
+    }
+
+    auto promise = it->second.promise;
+    pendingRequests_.erase(it);
+    lock.unlock();
+
+    if (response.has_error()) {
+        promise.setFailed(getResult(response.error(), ""));
+    } else {
+        promise.setValue({});
+    }
+}
+
 }  // namespace pulsar
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index eae18f9..95bb1ab 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -75,6 +75,7 @@ struct OpSendMsg;
 namespace proto {
 class BaseCommand;
 class CommandActiveConsumerChange;
+class CommandAckResponse;
 class CommandMessage;
 class CommandCloseConsumer;
 class CommandCloseProducer;
@@ -390,6 +391,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     void handleGetLastMessageIdResponse(const 
proto::CommandGetLastMessageIdResponse&);
     void handleGetTopicOfNamespaceResponse(const 
proto::CommandGetTopicsOfNamespaceResponse&);
     void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
+    void handleAckResponse(const proto::CommandAckResponse&);
 };
 }  // namespace pulsar
 
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 08e4a10..3b9606a 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -92,7 +92,6 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const 
ClientConfiguration&
       pool_(clientConfiguration_, ioExecutorProvider_, 
clientConfiguration_.getAuthPtr(), poolConnections),
       producerIdGenerator_(0),
       consumerIdGenerator_(0),
-      requestIdGenerator_(0),
       closingError(ResultOk) {
     std::unique_ptr<LoggerFactory> loggerFactory = 
clientConfiguration_.impl_->takeLogger();
     if (!loggerFactory) {
@@ -713,10 +712,7 @@ uint64_t ClientImpl::newConsumerId() {
     return consumerIdGenerator_++;
 }
 
-uint64_t ClientImpl::newRequestId() {
-    Lock lock(mutex_);
-    return requestIdGenerator_++;
-}
+uint64_t ClientImpl::newRequestId() { return (*requestIdGenerator_)++; }
 
 uint64_t ClientImpl::getNumberOfProducers() {
     uint64_t numberOfAliveProducers = 0;
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index de10d0e..dd7ffdd 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -121,6 +121,8 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
 
     void cleanupConsumer(ConsumerImplBase* address) { 
consumers_.remove(address); }
 
+    std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const { 
return requestIdGenerator_; }
+
     friend class PulsarFriend;
 
    private:
@@ -175,7 +177,7 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
 
     uint64_t producerIdGenerator_;
     uint64_t consumerIdGenerator_;
-    uint64_t requestIdGenerator_;
+    std::shared_ptr<std::atomic<uint64_t>> 
requestIdGenerator_{std::make_shared<std::atomic<uint64_t>>(0)};
 
     SynchronizedHashMap<ProducerImplBase*, ProducerImplBaseWeakPtr> producers_;
     SynchronizedHashMap<ConsumerImplBase*, ConsumerImplBaseWeakPtr> consumers_;
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 0089b86..829be57 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -440,39 +440,73 @@ SharedBuffer Commands::newProducer(const std::string& 
topic, uint64_t producerId
     return writeMessageWithSize(cmd);
 }
 
-SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t 
entryId, const BitSet& ackSet,
-                              CommandAck_AckType ackType, 
CommandAck_ValidationError validationError) {
-    BaseCommand cmd;
-    cmd.set_type(BaseCommand::ACK);
-    CommandAck* ack = cmd.mutable_ack();
+static void configureCommandAck(CommandAck* ack, uint64_t consumerId, int64_t 
ledgerId, int64_t entryId,
+                                const BitSet& ackSet, CommandAck_AckType 
ackType) {
     ack->set_consumer_id(consumerId);
     ack->set_ack_type(static_cast<proto::CommandAck_AckType>(ackType));
-    if (proto::CommandAck_AckType_IsValid(validationError)) {
-        
ack->set_validation_error((proto::CommandAck_ValidationError)validationError);
-    }
     auto* msgId = ack->add_message_id();
     msgId->set_ledgerid(ledgerId);
     msgId->set_entryid(entryId);
     for (auto x : ackSet) {
         msgId->add_ack_set(x);
     }
+}
+
+SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t 
entryId, const BitSet& ackSet,
+                              CommandAck_AckType ackType) {
+    BaseCommand cmd;
+    cmd.set_type(BaseCommand::ACK);
+    configureCommandAck(cmd.mutable_ack(), consumerId, ledgerId, entryId, 
ackSet, ackType);
     return writeMessageWithSize(cmd);
 }
 
-SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const 
std::set<MessageId>& msgIds) {
+SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t 
entryId, const BitSet& ackSet,
+                              CommandAck_AckType ackType, 
CommandAck_ValidationError validationError) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::ACK);
     CommandAck* ack = cmd.mutable_ack();
+    
ack->set_validation_error((proto::CommandAck_ValidationError)validationError);
+    configureCommandAck(ack, consumerId, ledgerId, entryId, ackSet, ackType);
+    return writeMessageWithSize(cmd);
+}
+
+SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t 
entryId, const BitSet& ackSet,
+                              CommandAck_AckType ackType, uint64_t requestId) {
+    BaseCommand cmd;
+    cmd.set_type(BaseCommand::ACK);
+    CommandAck* ack = cmd.mutable_ack();
+    ack->set_request_id(requestId);
+    configureCommandAck(ack, consumerId, ledgerId, entryId, ackSet, ackType);
+    return writeMessageWithSize(cmd);
+}
+
+static void configureCommandAck(CommandAck* ack, uint64_t consumerId, const 
std::set<MessageId>& msgIds) {
     ack->set_consumer_id(consumerId);
     ack->set_ack_type(proto::CommandAck_AckType_Individual);
     for (const auto& msgId : msgIds) {
         auto newMsgId = ack->add_message_id();
         newMsgId->set_ledgerid(msgId.ledgerId());
         newMsgId->set_entryid(msgId.entryId());
-        for (auto x : getMessageIdImpl(msgId)->getBitSet()) {
+        for (auto x : Commands::getMessageIdImpl(msgId)->getBitSet()) {
             newMsgId->add_ack_set(x);
         }
     }
+}
+
+SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const 
std::set<MessageId>& msgIds) {
+    BaseCommand cmd;
+    cmd.set_type(BaseCommand::ACK);
+    configureCommandAck(cmd.mutable_ack(), consumerId, msgIds);
+    return writeMessageWithSize(cmd);
+}
+
+SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const 
std::set<MessageId>& msgIds,
+                                          uint64_t requestId) {
+    BaseCommand cmd;
+    cmd.set_type(BaseCommand::ACK);
+    CommandAck* ack = cmd.mutable_ack();
+    ack->set_request_id(requestId);
+    configureCommandAck(ack, consumerId, msgIds);
     return writeMessageWithSize(cmd);
 }
 
diff --git a/lib/Commands.h b/lib/Commands.h
index 7d13e2a..0331173 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -116,9 +116,16 @@ class Commands {
                                     ProducerAccessMode accessMode, 
boost::optional<uint64_t> topicEpoch,
                                     const std::string& 
initialSubscriptionName);
 
+    static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t 
entryId, const BitSet& ackSet,
+                               CommandAck_AckType ackType);
+    static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t 
entryId, const BitSet& ackSet,
+                               CommandAck_AckType ackType, uint64_t requestId);
     static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t 
entryId, const BitSet& ackSet,
                                CommandAck_AckType ackType, 
CommandAck_ValidationError validationError);
+
     static SharedBuffer newMultiMessageAck(uint64_t consumerId, const 
std::set<MessageId>& msgIds);
+    static SharedBuffer newMultiMessageAck(uint64_t consumerId, const 
std::set<MessageId>& msgIds,
+                                           uint64_t requestId);
 
     static SharedBuffer newFlow(uint64_t consumerId, uint32_t messagePermits);
 
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index 1497a2d..4764b46 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -310,4 +310,11 @@ const std::vector<ConsumerInterceptorPtr>& 
ConsumerConfiguration::getInterceptor
     return impl_->interceptors;
 }
 
+ConsumerConfiguration& ConsumerConfiguration::setAckReceiptEnabled(bool 
ackReceiptEnabled) {
+    impl_->ackReceiptEnabled = ackReceiptEnabled;
+    return *this;
+}
+
+bool ConsumerConfiguration::isAckReceiptEnabled() const { return 
impl_->ackReceiptEnabled; }
+
 }  // namespace pulsar
diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h
index 4ada424..e84aa0a 100644
--- a/lib/ConsumerConfigurationImpl.h
+++ b/lib/ConsumerConfigurationImpl.h
@@ -59,6 +59,7 @@ struct ConsumerConfigurationImpl {
     long expireTimeOfIncompleteChunkedMessageMs{60000};
     bool batchIndexAckEnabled{false};
     std::vector<ConsumerInterceptorPtr> interceptors;
+    bool ackReceiptEnabled{false};
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 5400e1a..ab31e0b 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -85,7 +85,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
       consumerName_(config_.getConsumerName()),
       messageListenerRunning_(true),
       negativeAcksTracker_(client, *this, conf),
-      ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
       readCompacted_(conf.isReadCompacted()),
       startMessageId_(startMessageId),
       maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
@@ -182,18 +181,35 @@ const std::string& ConsumerImpl::getTopic() const { 
return *topic_; }
 void ConsumerImpl::start() {
     HandlerBase::start();
 
+    std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
+    auto connectionSupplier = [weakSelf]() -> ClientConnectionPtr {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return nullptr;
+        }
+        return self->getCnx().lock();
+    };
+
+    // NOTE: start() is always called in `ClientImpl`'s method, so lock() 
returns not null
+    const auto requestIdGenerator = client_.lock()->getRequestIdGenerator();
+    const auto requestIdSupplier = [requestIdGenerator] { return 
(*requestIdGenerator)++; };
+
     // Initialize ackGroupingTrackerPtr_ here because the 
get_shared_this_ptr() was not initialized until the
     // constructor completed.
     if (TopicName::get(*topic_)->isPersistent()) {
         if (config_.getAckGroupingTimeMs() > 0) {
             ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
-                client_.lock(), get_shared_this_ptr(), consumerId_, 
config_.getAckGroupingTimeMs(),
-                config_.getAckGroupingMaxSize()));
+                connectionSupplier, requestIdSupplier, consumerId_, 
config_.isAckReceiptEnabled(),
+                config_.getAckGroupingTimeMs(), 
config_.getAckGroupingMaxSize(),
+                client_.lock()->getIOExecutorProvider()->get()));
         } else {
-            ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, 
consumerId_));
+            ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(
+                connectionSupplier, requestIdSupplier, consumerId_, 
config_.isAckReceiptEnabled()));
         }
     } else {
         LOG_INFO(getName() << "ACK will NOT be sent to broker for this 
non-persistent topic.");
+        ackGroupingTrackerPtr_.reset(new 
AckGroupingTracker(connectionSupplier, requestIdSupplier,
+                                                            consumerId_, 
config_.isAckReceiptEnabled()));
     }
     ackGroupingTrackerPtr_->start();
 }
@@ -1089,12 +1105,13 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& 
msgId, ResultCallback callb
     const auto& msgIdToAck = pair.first;
     const bool readyToAck = pair.second;
     if (readyToAck) {
-        ackGroupingTrackerPtr_->addAcknowledge(msgIdToAck);
+        ackGroupingTrackerPtr_->addAcknowledge(msgIdToAck, callback);
+    } else {
+        if (callback) {
+            callback(ResultOk);
+        }
     }
     interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultOk, 
msgId);
-    if (callback) {
-        callback(ResultOk);
-    }
 }
 
 void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, 
ResultCallback callback) {
@@ -1111,10 +1128,7 @@ void ConsumerImpl::acknowledgeAsync(const MessageIdList& 
messageIdList, ResultCa
         // with the Java client.
         interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultOk, 
msgId);
     }
-    this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdListToAck);
-    if (callback) {
-        callback(ResultOk);
-    }
+    this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdListToAck, 
callback);
 }
 
 void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, 
ResultCallback callback) {
@@ -1132,12 +1146,11 @@ void ConsumerImpl::acknowledgeCumulativeAsync(const 
MessageId& msgId, ResultCall
     if (readyToAck) {
         consumerStatsBasePtr_->messageAcknowledged(ResultOk, 
CommandAck_AckType_Cumulative, 1);
         unAckedMessageTrackerPtr_->removeMessagesTill(msgIdToAck);
-        ackGroupingTrackerPtr_->addAcknowledgeCumulative(msgIdToAck);
-    }
-    interceptors_->onAcknowledgeCumulative(Consumer(shared_from_this()), 
ResultOk, msgId);
-    if (callback) {
+        ackGroupingTrackerPtr_->addAcknowledgeCumulative(msgIdToAck, callback);
+    } else if (callback) {
         callback(ResultOk);
     }
+    interceptors_->onAcknowledgeCumulative(Consumer(shared_from_this()), 
ResultOk, msgId);
 }
 
 bool ConsumerImpl::isCumulativeAcknowledgementAllowed(ConsumerType 
consumerType) {
diff --git a/tests/AcknowledgeTest.cc b/tests/AcknowledgeTest.cc
index 0818466..9ca1310 100644
--- a/tests/AcknowledgeTest.cc
+++ b/tests/AcknowledgeTest.cc
@@ -19,6 +19,7 @@
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
 
+#include <atomic>
 #include <chrono>
 #include <set>
 #include <thread>
@@ -39,7 +40,8 @@ static std::string adminUrl = "http://localhost:8080/";;
 
 extern std::string unique_str();
 
-class AcknowledgeTest : public testing::TestWithParam<int> {};
+class AcknowledgeTest
+    : public testing::TestWithParam<std::tuple<int /* ack grouping time */, 
bool /* ack with receipt */>> {};
 
 TEST_P(AcknowledgeTest, testAckMsgList) {
     Client client(lookupUrl);
@@ -55,7 +57,8 @@ TEST_P(AcknowledgeTest, testAckMsgList) {
 
     ConsumerConfiguration consumerConfig;
     consumerConfig.setAckGroupingMaxSize(numMsg);
-    consumerConfig.setAckGroupingTimeMs(GetParam());
+    consumerConfig.setAckGroupingTimeMs(std::get<0>(GetParam()));
+    consumerConfig.setAckReceiptEnabled(std::get<1>(GetParam()));
     consumerConfig.setUnAckedMessagesTimeoutMs(10000);
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, 
consumer));
@@ -118,7 +121,8 @@ TEST_P(AcknowledgeTest, testAckMsgListWithMultiConsumer) {
     ConsumerConfiguration consumerConfig;
     // set ack grouping max size is 10
     consumerConfig.setAckGroupingMaxSize(10);
-    consumerConfig.setAckGroupingTimeMs(GetParam());
+    consumerConfig.setAckGroupingTimeMs(std::get<0>(GetParam()));
+    consumerConfig.setAckReceiptEnabled(std::get<1>(GetParam()));
     consumerConfig.setUnAckedMessagesTimeoutMs(10000);
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, 
consumer));
@@ -315,4 +319,65 @@ TEST_F(AcknowledgeTest, testInvalidMessageId) {
     ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msg));
 }
 
-INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, 
testing::Values(100, 0));
+TEST_F(AcknowledgeTest, testAckReceiptEnabled) {
+    Client client(lookupUrl);
+    const std::string topic = "test-ack-receipt-enabled" + unique_str();
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    std::vector<MessageId> msgIds;
+    for (int i = 0; i < 5; i++) {
+        MessageId msgId;
+        ASSERT_EQ(ResultOk,
+                  producer.send(MessageBuilder().setContent("msg-" + 
std::to_string(i)).build(), msgId));
+        msgIds.emplace_back(msgId);
+    }
+
+    constexpr long ackGroupingTimeMs = 200;
+    Consumer consumer;
+    ConsumerConfiguration conf;
+    conf.setAckGroupingTimeMs(ackGroupingTimeMs);
+    conf.setAckReceiptEnabled(true);
+    conf.setBatchIndexAckEnabled(true);
+    conf.setSubscriptionInitialPosition(InitialPositionEarliest);
+
+    using namespace std::chrono;
+    // The ACK grouping timer starts after it's subscribed successfully. To 
ensure the acknowledgments
+    // complete after `ackGroupingTimeMs`, record the start timestamp before 
subscribing
+    auto now = high_resolution_clock::now();
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", conf, consumer));
+
+    std::atomic<decltype(std::this_thread::get_id())> threadId[3];
+    std::atomic_long durationMs[3];
+    std::atomic<Result> result[3];
+    Latch latch{3};
+
+    auto createCallback = [&](int i) -> ResultCallback {
+        return [i, now, &threadId, &durationMs, &result, &latch](Result 
result0) {
+            threadId[i] = std::this_thread::get_id();
+            durationMs[i] =
+                
duration_cast<std::chrono::milliseconds>(high_resolution_clock::now() - 
now).count();
+            result[i] = result0;
+            latch.countdown();
+        };
+    };
+    consumer.acknowledgeAsync(msgIds[1], createCallback(0));
+    consumer.acknowledgeCumulativeAsync(msgIds[2], createCallback(1));
+    consumer.acknowledgeAsync(msgIds, createCallback(2));
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(3)));
+    for (int i = 0; i < 3; i++) {
+        LOG_INFO("Ack time: " << durationMs[i] << "ms");
+        EXPECT_TRUE(durationMs[i] > ackGroupingTimeMs && durationMs[i] < 
ackGroupingTimeMs + 100);
+        EXPECT_NE(threadId[i], std::this_thread::get_id());
+        EXPECT_EQ(result[i], ResultOk);
+    }
+
+    client.close();
+}
+
+INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest,
+                         testing::Combine(testing::Values(100, 0), 
testing::Values(true, false)),
+                         [](const testing::TestParamInfo<std::tuple<int, 
bool>>& info) {
+                             return std::to_string(std::get<0>(info.param)) + 
"_" +
+                                    std::to_string(std::get<1>(info.param));
+                         });
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index e668f6f..5e28d21 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -3513,42 +3513,12 @@ TEST(BasicEndToEndTest, testSendCallback) {
     client.close();
 }
 
-class AckGroupingTrackerMock : public AckGroupingTracker {
-   public:
-    explicit AckGroupingTrackerMock(bool mockAck) : mockAck_(mockAck) {}
-
-    bool callDoImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t 
consumerId, const MessageId &msgId,
-                            CommandAck_AckType ackType) {
-        if (!this->mockAck_) {
-            // Not mocking ACK, expose this method.
-            return this->doImmediateAck(connWeakPtr, consumerId, msgId, 
ackType);
-        } else {
-            // Mocking ACK.
-            return true;
-        }
-    }
-
-    bool callDoImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t 
consumerId,
-                            const std::set<MessageId> &msgIds) {
-        if (!this->mockAck_) {
-            // Not mocking ACK, expose this method.
-            return this->doImmediateAck(connWeakPtr, consumerId, msgIds);
-        } else {
-            // Mocking ACK.
-            return true;
-        }
-    }
-
-   private:
-    bool mockAck_;
-};  // class AckGroupingTrackerMock
-
 TEST(BasicEndToEndTest, testAckGroupingTrackerDefaultBehavior) {
     ConsumerConfiguration configConsumer;
     ASSERT_EQ(configConsumer.getAckGroupingTimeMs(), 100);
     ASSERT_EQ(configConsumer.getAckGroupingMaxSize(), 1000);
 
-    AckGroupingTracker tracker;
+    AckGroupingTracker tracker{nullptr, nullptr, 0, false};
     Message msg;
     ASSERT_FALSE(tracker.isDuplicate(msg.getMessageId()));
 }
@@ -3586,13 +3556,15 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerSingleAckBehavior) {
     }
 
     // Send ACK.
-    AckGroupingTrackerMock tracker(false);
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+    AckGroupingTrackerDisabled tracker([&consumerImpl]() { return 
consumerImpl.getCnx().lock(); },
+                                       [&clientImplPtr] { return 
clientImplPtr->newRequestId(); },
+                                       consumerImpl.getConsumerId(), false);
     tracker.start();
     for (auto msgIdx = 0; msgIdx < numMsg; ++msgIdx) {
         auto connPtr = connWeakPtr.lock();
         ASSERT_NE(connPtr, nullptr);
-        ASSERT_TRUE(tracker.callDoImmediateAck(connWeakPtr, 
consumerImpl.getConsumerId(), recvMsgId[msgIdx],
-                                               CommandAck_AckType_Individual));
+        tracker.addAcknowledge(recvMsgId[msgIdx], nullptr);
     }
     Message msg;
     ASSERT_EQ(ResultTimeout, consumer.receive(msg, 1000));
@@ -3621,7 +3593,6 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerMultiAckBehavior) {
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
 
     auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
-    auto connWeakPtr = PulsarFriend::getClientConnection(consumerImpl);
 
     // Sending and receiving messages.
     for (auto count = 0; count < numMsg; ++count) {
@@ -3637,11 +3608,12 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerMultiAckBehavior) {
     }
 
     // Send ACK.
-    AckGroupingTrackerMock tracker(false);
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+    AckGroupingTrackerDisabled tracker([&consumerImpl]() { return 
consumerImpl.getCnx().lock(); },
+                                       [&clientImplPtr] { return 
clientImplPtr->newRequestId(); },
+                                       consumerImpl.getConsumerId(), false);
     tracker.start();
-    std::set<MessageId> restMsgId(recvMsgId.begin(), recvMsgId.end());
-    ASSERT_EQ(restMsgId.size(), numMsg);
-    ASSERT_TRUE(tracker.callDoImmediateAck(connWeakPtr, 
consumerImpl.getConsumerId(), restMsgId));
+    tracker.addAcknowledgeList(recvMsgId, nullptr);
     consumer.close();
 
     std::this_thread::sleep_for(std::chrono::seconds(1));
@@ -3684,9 +3656,10 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerDisabledIndividualAck) {
     }
 
     // Send ACK.
-    AckGroupingTrackerDisabled tracker(consumerImpl, 
consumerImpl.getConsumerId());
+    AckGroupingTrackerDisabled tracker([&consumerImpl] { return 
consumerImpl.getCnx().lock(); }, nullptr,
+                                       consumerImpl.getConsumerId(), false);
     for (auto &msgId : recvMsgId) {
-        tracker.addAcknowledge(msgId);
+        tracker.addAcknowledge(msgId, nullptr);
     }
     consumer.close();
 
@@ -3730,9 +3703,10 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerDisabledCumulativeAck) {
     }
 
     // Send ACK.
-    AckGroupingTrackerDisabled tracker(consumerImpl, 
consumerImpl.getConsumerId());
+    AckGroupingTrackerDisabled tracker([&consumerImpl] { return 
consumerImpl.getCnx().lock(); }, nullptr,
+                                       consumerImpl.getConsumerId(), false);
     auto &latestMsgId = *std::max_element(recvMsgId.begin(), recvMsgId.end());
-    tracker.addAcknowledgeCumulative(latestMsgId);
+    tracker.addAcknowledgeCumulative(latestMsgId, nullptr);
     consumer.close();
 
     std::this_thread::sleep_for(std::chrono::seconds(1));
@@ -3745,10 +3719,7 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerDisabledCumulativeAck) {
 
 class AckGroupingTrackerEnabledMock : public AckGroupingTrackerEnabled {
    public:
-    AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, const 
HandlerBasePtr &handlerPtr,
-                                  uint64_t consumerId, long ackGroupingTimeMs, 
long ackGroupingMaxSize)
-        : AckGroupingTrackerEnabled(clientPtr, handlerPtr, consumerId, 
ackGroupingTimeMs,
-                                    ackGroupingMaxSize) {}
+    using AckGroupingTrackerEnabled::AckGroupingTrackerEnabled;
     const std::set<MessageId> &getPendingIndividualAcks() { return 
this->pendingIndividualAcks_; }
     const long getAckGroupingTimeMs() { return this->ackGroupingTimeMs_; }
     const long getAckGroupingMaxSize() { return this->ackGroupingMaxSize_; }
@@ -3791,14 +3762,15 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerEnabledIndividualAck) {
     }
 
     auto tracker = std::make_shared<AckGroupingTrackerEnabledMock>(
-        clientImplPtr, consumerImpl, consumerImpl->getConsumerId(), 
ackGroupingTimeMs, ackGroupingMaxSize);
+        [&consumerImpl] { return consumerImpl->getCnx().lock(); }, nullptr, 
consumerImpl->getConsumerId(),
+        false, ackGroupingTimeMs, ackGroupingMaxSize, 
clientImplPtr->getIOExecutorProvider()->get());
     tracker->start();
     ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
     ASSERT_EQ(tracker->getAckGroupingTimeMs(), ackGroupingTimeMs);
     ASSERT_EQ(tracker->getAckGroupingMaxSize(), ackGroupingMaxSize);
     for (auto &msgId : recvMsgId) {
         ASSERT_FALSE(tracker->isDuplicate(msgId));
-        tracker->addAcknowledge(msgId);
+        tracker->addAcknowledge(msgId, nullptr);
         ASSERT_TRUE(tracker->isDuplicate(msgId));
     }
     ASSERT_EQ(tracker->getPendingIndividualAcks().size(), recvMsgId.size());
@@ -3852,7 +3824,8 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerEnabledCumulativeAck) {
     std::sort(recvMsgId.begin(), recvMsgId.end());
 
     auto tracker0 = std::make_shared<AckGroupingTrackerEnabledMock>(
-        clientImplPtr, consumerImpl0, consumerImpl0->getConsumerId(), 
ackGroupingTimeMs, ackGroupingMaxSize);
+        [&consumerImpl0] { return consumerImpl0->getCnx().lock(); }, nullptr, 
consumerImpl0->getConsumerId(),
+        false, ackGroupingTimeMs, ackGroupingMaxSize, 
clientImplPtr->getIOExecutorProvider()->get());
     tracker0->start();
     ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), MessageId::earliest());
     ASSERT_FALSE(tracker0->requireCumulativeAck());
@@ -3861,7 +3834,7 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerEnabledCumulativeAck) {
     for (auto idx = 0; idx <= numMsg / 2; ++idx) {
         ASSERT_FALSE(tracker0->isDuplicate(recvMsgId[idx]));
     }
-    tracker0->addAcknowledgeCumulative(targetMsgId);
+    tracker0->addAcknowledgeCumulative(targetMsgId, nullptr);
     for (auto idx = 0; idx <= numMsg / 2; ++idx) {
         ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx]));
     }
@@ -3888,9 +3861,10 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerEnabledCumulativeAck) {
     auto ret = consumer.receive(msg, 1000);
     ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " << 
msg.getDataAsString();
     auto tracker1 = std::make_shared<AckGroupingTrackerEnabledMock>(
-        clientImplPtr, consumerImpl1, consumerImpl1->getConsumerId(), 
ackGroupingTimeMs, ackGroupingMaxSize);
+        [&consumerImpl1] { return consumerImpl1->getCnx().lock(); }, nullptr, 
consumerImpl1->getConsumerId(),
+        false, ackGroupingTimeMs, ackGroupingMaxSize, 
clientImplPtr->getIOExecutorProvider()->get());
     tracker1->start();
-    tracker1->addAcknowledgeCumulative(recvMsgId[numMsg - 1]);
+    tracker1->addAcknowledgeCumulative(recvMsgId[numMsg - 1], nullptr);
     tracker1->close();
     consumer.close();
 
diff --git a/tests/ConsumerConfigurationTest.cc 
b/tests/ConsumerConfigurationTest.cc
index 9b91722..f378501 100644
--- a/tests/ConsumerConfigurationTest.cc
+++ b/tests/ConsumerConfigurationTest.cc
@@ -70,6 +70,7 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) {
     ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumBytes(), 10 * 1024 * 1024);
     ASSERT_EQ(conf.getBatchReceivePolicy().getTimeoutMs(), 100);
     ASSERT_EQ(conf.isBatchIndexAckEnabled(), false);
+    ASSERT_EQ(conf.isAckReceiptEnabled(), false);
 }
 
 TEST(ConsumerConfigurationTest, testCustomConfig) {
@@ -168,6 +169,9 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
 
     conf.setBatchIndexAckEnabled(true);
     ASSERT_EQ(conf.isBatchIndexAckEnabled(), true);
+
+    conf.setAckReceiptEnabled(true);
+    ASSERT_TRUE(conf.isAckReceiptEnabled());
 }
 
 TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {

Reply via email to