This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 1f91575 Support waiting for the ACK response (#232)
1f91575 is described below
commit 1f9157554fc42c9445185b934e17d9a5fc898f7a
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Mar 29 17:45:12 2023 +0800
Support waiting for the ACK response (#232)
* Support waiting for the ACK response
Fixes https://github.com/apache/pulsar-client-cpp/issues/102
### Modifications
Add a consumer configuration to set `ackReceiptEnabled`, if it's true,
when sending a CommandAck request, generate the request id and set the
`request_id` field` so that the broker will respond with a
`CommandAckResponse`. Here we pass the shared pointer to an atomic
integer so that we no longer need to hold a weak pointer to
`ClientImpl`.
Pass the user-defined callback to the ACK grouping tracker, when the
`ackReceiptEnabled` is true, trigger the callback only after receiving
the ACK response.
### Verifications
- Add `testAckReceiptEnabled` to verify when `ackReceiptEnabled` is
true, the callback will be triggered after `ackGroupingMaxTime`.
- Support configuring `ackReceiptEnabled` for existing `TEST_P`
parameterized tests in `AcknowledgeTest`.
* Use multiple overloads for newAck
* Fix callback is not called
---
include/pulsar/ConsumerConfiguration.h | 16 ++++++
lib/AckGroupingTracker.cc | 89 ++++++++++++++++++++++------------
lib/AckGroupingTracker.h | 51 ++++++++++---------
lib/AckGroupingTrackerDisabled.cc | 20 +++-----
lib/AckGroupingTrackerDisabled.h | 21 ++------
lib/AckGroupingTrackerEnabled.cc | 88 ++++++++++++++++-----------------
lib/AckGroupingTrackerEnabled.h | 44 ++++++++---------
lib/ClientConnection.cc | 26 ++++++++++
lib/ClientConnection.h | 2 +
lib/ClientImpl.cc | 6 +--
lib/ClientImpl.h | 4 +-
lib/Commands.cc | 54 +++++++++++++++++----
lib/Commands.h | 7 +++
lib/ConsumerConfiguration.cc | 7 +++
lib/ConsumerConfigurationImpl.h | 1 +
lib/ConsumerImpl.cc | 45 +++++++++++------
tests/AcknowledgeTest.cc | 73 ++++++++++++++++++++++++++--
tests/BasicEndToEndTest.cc | 80 +++++++++++-------------------
tests/ConsumerConfigurationTest.cc | 4 ++
19 files changed, 394 insertions(+), 244 deletions(-)
diff --git a/include/pulsar/ConsumerConfiguration.h
b/include/pulsar/ConsumerConfiguration.h
index ee0c634..0e6634d 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -629,6 +629,22 @@ class PULSAR_PUBLIC ConsumerConfiguration {
const std::vector<ConsumerInterceptorPtr>& getInterceptors() const;
+ /**
+ * Whether to receive the ACK receipt from broker.
+ *
+ * By default, when Consumer::acknowledge is called, it won't wait until
the corresponding response from
+ * broker. After it's enabled, the `acknowledge` method will return a
Result that indicates if the
+ * acknowledgment succeeded.
+ *
+ * Default: false
+ */
+ ConsumerConfiguration& setAckReceiptEnabled(bool ackReceiptEnabled);
+
+ /**
+ * The associated getter of setAckReceiptEnabled.
+ */
+ bool isAckReceiptEnabled() const;
+
friend class PulsarWrapper;
friend class PulsarFriend;
diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc
index 064d4ea..9a47135 100644
--- a/lib/AckGroupingTracker.cc
+++ b/lib/AckGroupingTracker.cc
@@ -19,6 +19,9 @@
#include "AckGroupingTracker.h"
+#include <atomic>
+#include <limits>
+
#include "BitSet.h"
#include "ClientConnection.h"
#include "Commands.h"
@@ -29,24 +32,33 @@ namespace pulsar {
DECLARE_LOG_OBJECT();
-inline void sendAck(ClientConnectionPtr cnx, uint64_t consumerId, const
MessageId& msgId,
- CommandAck_AckType ackType) {
- const auto& bitSet = Commands::getMessageIdImpl(msgId)->getBitSet();
- auto cmd = Commands::newAck(consumerId, msgId.ledgerId(), msgId.entryId(),
bitSet, ackType, -1);
- cnx->sendCommand(cmd);
- LOG_DEBUG("ACK request is sent for message - [" << msgId.ledgerId() << ",
" << msgId.entryId() << "]");
-}
-
-bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr,
uint64_t consumerId,
- const MessageId& msgId,
CommandAck_AckType ackType) {
- auto cnx = connWeakPtr.lock();
- if (cnx == nullptr) {
- LOG_DEBUG("Connection is not ready, ACK failed for message - [" <<
msgId.ledgerId() << ", "
- <<
msgId.entryId() << "]");
- return false;
+void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback
callback,
+ CommandAck_AckType ackType) const {
+ const auto cnx = connectionSupplier_();
+ if (!cnx) {
+ LOG_DEBUG("Connection is not ready, ACK failed for " << msgId);
+ if (callback) {
+ callback(ResultAlreadyClosed);
+ }
+ return;
+ }
+ const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
+ if (waitResponse_) {
+ const auto requestId = requestIdSupplier_();
+ cnx->sendRequestWithId(
+ Commands::newAck(consumerId_, msgId.ledgerId(),
msgId.entryId(), ackSet, ackType, requestId),
+ requestId)
+ .addListener([callback](Result result, const ResponseData&) {
+ if (callback) {
+ callback(result);
+ }
+ });
+ } else {
+ cnx->sendCommand(Commands::newAck(consumerId_, msgId.ledgerId(),
msgId.entryId(), ackSet, ackType));
+ if (callback) {
+ callback(ResultOk);
+ }
}
- sendAck(cnx, consumerId, msgId, ackType);
- return true;
}
static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>&
msgIds) {
@@ -62,25 +74,42 @@ static std::ostream& operator<<(std::ostream& os, const
std::set<MessageId>& msg
return os;
}
-bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr,
uint64_t consumerId,
- const std::set<MessageId>& msgIds) {
- auto cnx = connWeakPtr.lock();
- if (cnx == nullptr) {
- LOG_DEBUG("Connection is not ready, ACK failed.");
- return false;
+void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds,
ResultCallback callback) const {
+ const auto cnx = connectionSupplier_();
+ if (!cnx) {
+ LOG_DEBUG("Connection is not ready, ACK failed for " << msgIds);
+ if (callback) {
+ callback(ResultAlreadyClosed);
+ }
+ return;
}
if
(Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion()))
{
- auto cmd = Commands::newMultiMessageAck(consumerId, msgIds);
- cnx->sendCommand(cmd);
- LOG_DEBUG("ACK request is sent for " << msgIds.size() << " messages: "
<< msgIds);
+ if (waitResponse_) {
+ const auto requestId = requestIdSupplier_();
+ cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_,
msgIds, requestId), requestId)
+ .addListener([callback](Result result, const ResponseData&) {
+ if (callback) {
+ callback(result);
+ }
+ });
+ } else {
+ cnx->sendCommand(Commands::newMultiMessageAck(consumerId_,
msgIds));
+ if (callback) {
+ callback(ResultOk);
+ }
+ }
} else {
- // Broker does not support multi-message ACK, use multiple individual
ACKs instead.
- for (const auto& msgId : msgIds) {
- sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
+ auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
+ auto wrappedCallback = [callback, count](Result result) {
+ if (--*count == 0 && callback) {
+ callback(result);
+ }
+ };
+ for (auto&& msgId : msgIds) {
+ doImmediateAck(msgId, wrappedCallback,
CommandAck_AckType_Individual);
}
}
- return true;
}
} // namespace pulsar
diff --git a/lib/AckGroupingTracker.h b/lib/AckGroupingTracker.h
index 0a986a0..2b48142 100644
--- a/lib/AckGroupingTracker.h
+++ b/lib/AckGroupingTracker.h
@@ -20,8 +20,10 @@
#define LIB_ACKGROUPINGTRACKER_H_
#include <pulsar/MessageId.h>
+#include <pulsar/Result.h>
#include <cstdint>
+#include <functional>
#include <set>
#include "ProtoApiEnums.h"
@@ -29,7 +31,9 @@
namespace pulsar {
class ClientConnection;
+using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
+using ResultCallback = std::function<void(Result)>;
/**
* @class AckGroupingTracker
@@ -38,7 +42,13 @@ using ClientConnectionWeakPtr =
std::weak_ptr<ClientConnection>;
*/
class AckGroupingTracker : public
std::enable_shared_from_this<AckGroupingTracker> {
public:
- AckGroupingTracker() = default;
+ AckGroupingTracker(std::function<ClientConnectionPtr()> connectionSupplier,
+ std::function<uint64_t()> requestIdSupplier, uint64_t
consumerId, bool waitResponse)
+ : connectionSupplier_(connectionSupplier),
+ requestIdSupplier_(requestIdSupplier),
+ consumerId_(consumerId),
+ waitResponse_(waitResponse) {}
+
virtual ~AckGroupingTracker() = default;
/**
@@ -59,20 +69,23 @@ class AckGroupingTracker : public
std::enable_shared_from_this<AckGroupingTracke
/**
* Adding message ID into ACK group for individual ACK.
* @param[in] msgId ID of the message to be ACKed.
+ * @param[in] callback the callback that is triggered when the message is
acknowledged
*/
- virtual void addAcknowledge(const MessageId& msgId) {}
+ virtual void addAcknowledge(const MessageId& msgId, ResultCallback
callback) {}
/**
* Adding message ID list into ACK group for individual ACK.
* @param[in] msgIds of the message to be ACKed.
+ * @param[in] callback the callback that is triggered when the messages
are acknowledged
*/
- virtual void addAcknowledgeList(const MessageIdList& msgIds) {}
+ virtual void addAcknowledgeList(const MessageIdList& msgIds,
ResultCallback callback) {}
/**
* Adding message ID into ACK group for cumulative ACK.
* @param[in] msgId ID of the message to be ACKed.
+ * @param[in] callback the callback that is triggered when the message is
acknowledged
*/
- virtual void addAcknowledgeCumulative(const MessageId& msgId) {}
+ virtual void addAcknowledgeCumulative(const MessageId& msgId,
ResultCallback callback) {}
/**
* Flush all the pending grouped ACKs (as flush() does), and stop period
ACKs sending.
@@ -91,27 +104,17 @@ class AckGroupingTracker : public
std::enable_shared_from_this<AckGroupingTracke
virtual void flushAndClean() {}
protected:
- /**
- * Immediately send ACK request to broker.
- * @param[in] connWeakPtr weak pointer of the client connection.
- * @param[in] consumerId ID of the consumer that performs this ACK.
- * @param[in] msgId message ID to be ACKed.
- * @param[in] ackType ACK type, e.g. cumulative.
- * @return true if the ACK is sent successfully, otherwise false.
- */
- bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t
consumerId, const MessageId& msgId,
- CommandAck_AckType ackType);
+ void doImmediateAck(const MessageId& msgId, ResultCallback callback,
CommandAck_AckType ackType) const;
+ void doImmediateAck(const std::set<MessageId>& msgIds, ResultCallback
callback) const;
+
+ private:
+ const std::function<ClientConnectionPtr()> connectionSupplier_;
+ const std::function<uint64_t()> requestIdSupplier_;
+ const uint64_t consumerId_;
+
+ protected:
+ const bool waitResponse_;
- /**
- * Immediately send a set of ACK requests one by one to the broker, it
only supports individual
- * ACK.
- * @param[in] connWeakPtr weak pointer of the client connection.
- * @param[in] consumerId ID of the consumer that performs this ACK.
- * @param[in] msgIds message IDs to be ACKed.
- * @return true if the ACK is sent successfully, otherwise false.
- */
- bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t
consumerId,
- const std::set<MessageId>& msgIds);
}; // class AckGroupingTracker
using AckGroupingTrackerPtr = std::shared_ptr<AckGroupingTracker>;
diff --git a/lib/AckGroupingTrackerDisabled.cc
b/lib/AckGroupingTrackerDisabled.cc
index 2fe2b2e..b29e1f7 100644
--- a/lib/AckGroupingTrackerDisabled.cc
+++ b/lib/AckGroupingTrackerDisabled.cc
@@ -20,32 +20,24 @@
#include "AckGroupingTrackerDisabled.h"
#include "HandlerBase.h"
-#include "LogUtils.h"
#include "ProtoApiEnums.h"
namespace pulsar {
-DECLARE_LOG_OBJECT();
-
-AckGroupingTrackerDisabled::AckGroupingTrackerDisabled(HandlerBase& handler,
uint64_t consumerId)
- : AckGroupingTracker(), handler_(handler), consumerId_(consumerId) {
- LOG_INFO("ACK grouping is disabled.");
-}
-
-void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId) {
- this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId,
CommandAck_AckType_Individual);
+void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId,
ResultCallback callback) {
+ doImmediateAck(msgId, callback, CommandAck_AckType_Individual);
}
-void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList&
msgIds) {
+void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList&
msgIds, ResultCallback callback) {
std::set<MessageId> msgIdSet;
for (auto&& msgId : msgIds) {
msgIdSet.emplace(msgId);
}
- this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgIdSet);
+ doImmediateAck(msgIdSet, callback);
}
-void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId&
msgId) {
- this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId,
CommandAck_AckType_Cumulative);
+void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId&
msgId, ResultCallback callback) {
+ doImmediateAck(msgId, callback, CommandAck_AckType_Cumulative);
}
} // namespace pulsar
diff --git a/lib/AckGroupingTrackerDisabled.h b/lib/AckGroupingTrackerDisabled.h
index 7b41686..9619360 100644
--- a/lib/AckGroupingTrackerDisabled.h
+++ b/lib/AckGroupingTrackerDisabled.h
@@ -34,25 +34,12 @@ class HandlerBase;
*/
class AckGroupingTrackerDisabled : public AckGroupingTracker {
public:
+ using AckGroupingTracker::AckGroupingTracker;
virtual ~AckGroupingTrackerDisabled() = default;
- /**
- * Constructing ACK grouping tracker for peresistent topics that disabled
ACK grouping.
- * @param[in] handler the connection handler.
- * @param[in] consumerId consumer ID that this tracker belongs to.
- */
- AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId);
-
- void addAcknowledge(const MessageId& msgId) override;
- void addAcknowledgeList(const MessageIdList& msgIds) override;
- void addAcknowledgeCumulative(const MessageId& msgId) override;
-
- private:
- //! The connection handler.
- HandlerBase& handler_;
-
- //! ID of the consumer that this tracker belongs to.
- uint64_t consumerId_;
+ void addAcknowledge(const MessageId& msgId, ResultCallback callback)
override;
+ void addAcknowledgeList(const MessageIdList& msgIds, ResultCallback
callback) override;
+ void addAcknowledgeCumulative(const MessageId& msgId, ResultCallback
callback) override;
}; // class AckGroupingTrackerDisabled
} // namespace pulsar
diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc
index 467e4e5..d90cc87 100644
--- a/lib/AckGroupingTrackerEnabled.cc
+++ b/lib/AckGroupingTrackerEnabled.cc
@@ -27,13 +27,10 @@
#include "Commands.h"
#include "ExecutorService.h"
#include "HandlerBase.h"
-#include "LogUtils.h"
#include "MessageIdUtil.h"
namespace pulsar {
-DECLARE_LOG_OBJECT();
-
// Define a customized compare logic whose difference with the default compare
logic of MessageId is:
// When two MessageId objects are in the same entry, if only one of them is a
message in the batch, treat
// it as a smaller one.
@@ -47,27 +44,6 @@ static int compare(const MessageId& lhs, const MessageId&
rhs) {
}
}
-AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
- const HandlerBasePtr&
handlerPtr, uint64_t consumerId,
- long ackGroupingTimeMs,
long ackGroupingMaxSize)
- : AckGroupingTracker(),
- isClosed_(false),
- handlerWeakPtr_(handlerPtr),
- consumerId_(consumerId),
- nextCumulativeAckMsgId_(MessageId::earliest()),
- requireCumulativeAck_(false),
- mutexCumulativeAckMsgId_(),
- pendingIndividualAcks_(),
- rmutexPendingIndAcks_(),
- ackGroupingTimeMs_(ackGroupingTimeMs),
- ackGroupingMaxSize_(ackGroupingMaxSize),
- executor_(clientPtr->getIOExecutorProvider()->get()),
- timer_(),
- mutexTimer_() {
- LOG_DEBUG("ACK grouping is enabled, grouping time " << ackGroupingTimeMs
<< "ms, grouping max size "
- << ackGroupingMaxSize);
-}
-
void AckGroupingTrackerEnabled::start() { this->scheduleTimer(); }
bool AckGroupingTrackerEnabled::isDuplicate(const MessageId& msgId) {
@@ -84,29 +60,55 @@ bool AckGroupingTrackerEnabled::isDuplicate(const
MessageId& msgId) {
return this->pendingIndividualAcks_.count(msgId) > 0;
}
-void AckGroupingTrackerEnabled::addAcknowledge(const MessageId& msgId) {
+void AckGroupingTrackerEnabled::addAcknowledge(const MessageId& msgId,
ResultCallback callback) {
std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
this->pendingIndividualAcks_.insert(msgId);
+ if (waitResponse_) {
+ this->pendingIndividualCallbacks_.emplace_back(callback);
+ } else if (callback) {
+ callback(ResultOk);
+ }
if (this->ackGroupingMaxSize_ > 0 && this->pendingIndividualAcks_.size()
>= this->ackGroupingMaxSize_) {
this->flush();
}
}
-void AckGroupingTrackerEnabled::addAcknowledgeList(const MessageIdList&
msgIds) {
+void AckGroupingTrackerEnabled::addAcknowledgeList(const MessageIdList&
msgIds, ResultCallback callback) {
std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
for (const auto& msgId : msgIds) {
this->pendingIndividualAcks_.emplace(msgId);
}
+ if (waitResponse_) {
+ this->pendingIndividualCallbacks_.emplace_back(callback);
+ } else if (callback) {
+ callback(ResultOk);
+ }
if (this->ackGroupingMaxSize_ > 0 && this->pendingIndividualAcks_.size()
>= this->ackGroupingMaxSize_) {
this->flush();
}
}
-void AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId&
msgId) {
- std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
+void AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId&
msgId, ResultCallback callback) {
+ std::unique_lock<std::mutex> lock(this->mutexCumulativeAckMsgId_);
if (compare(msgId, this->nextCumulativeAckMsgId_) > 0) {
this->nextCumulativeAckMsgId_ = msgId;
this->requireCumulativeAck_ = true;
+ // Trigger the previous pending callback
+ if (latestCumulativeCallback_) {
+ latestCumulativeCallback_(ResultOk);
+ }
+ if (waitResponse_) {
+ // Move the callback to latestCumulativeCallback_ so that it will
be triggered when receiving the
+ // AckResponse or being replaced by a newer MessageId
+ latestCumulativeCallback_ = callback;
+ callback = nullptr;
+ } else {
+ latestCumulativeCallback_ = nullptr;
+ }
+ }
+ lock.unlock();
+ if (callback) {
+ callback(ResultOk);
}
}
@@ -121,27 +123,13 @@ void AckGroupingTrackerEnabled::close() {
}
void AckGroupingTrackerEnabled::flush() {
- auto handler = handlerWeakPtr_.lock();
- if (!handler) {
- LOG_DEBUG("Reference to the HandlerBase is not valid.");
- return;
- }
- auto cnx = handler->getCnx().lock();
- if (cnx == nullptr) {
- LOG_DEBUG("Connection is not ready, grouping ACK failed.");
- return;
- }
-
// Send ACK for cumulative ACK requests.
{
std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
if (this->requireCumulativeAck_) {
- if (!this->doImmediateAck(cnx, this->consumerId_,
this->nextCumulativeAckMsgId_,
- CommandAck_AckType_Cumulative)) {
- // Failed to send ACK.
- LOG_WARN("Failed to send cumulative ACK.");
- return;
- }
+ this->doImmediateAck(this->nextCumulativeAckMsgId_,
this->latestCumulativeCallback_,
+ CommandAck_AckType_Cumulative);
+ this->latestCumulativeCallback_ = nullptr;
this->requireCumulativeAck_ = false;
}
}
@@ -149,7 +137,14 @@ void AckGroupingTrackerEnabled::flush() {
// Send ACK for individual ACK requests.
std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
if (!this->pendingIndividualAcks_.empty()) {
- this->doImmediateAck(cnx, consumerId_, this->pendingIndividualAcks_);
+ std::vector<ResultCallback> callbacks;
+ callbacks.swap(this->pendingIndividualCallbacks_);
+ auto callback = [callbacks](Result result) {
+ for (auto&& callback : callbacks) {
+ callback(result);
+ }
+ };
+ this->doImmediateAck(this->pendingIndividualAcks_, callback);
this->pendingIndividualAcks_.clear();
}
}
@@ -159,6 +154,7 @@ void AckGroupingTrackerEnabled::flushAndClean() {
{
std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
this->nextCumulativeAckMsgId_ = MessageId::earliest();
+ this->latestCumulativeCallback_ = nullptr;
this->requireCumulativeAck_ = false;
}
std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
diff --git a/lib/AckGroupingTrackerEnabled.h b/lib/AckGroupingTrackerEnabled.h
index 1874fd4..ec1d66b 100644
--- a/lib/AckGroupingTrackerEnabled.h
+++ b/lib/AckGroupingTrackerEnabled.h
@@ -46,24 +46,24 @@ using HandlerBaseWeakPtr = std::weak_ptr<HandlerBase>;
*/
class AckGroupingTrackerEnabled : public AckGroupingTracker {
public:
- virtual ~AckGroupingTrackerEnabled() { this->close(); }
+ AckGroupingTrackerEnabled(std::function<ClientConnectionPtr()>
connectionSupplier,
+ std::function<uint64_t()> requestIdSupplier,
uint64_t consumerId,
+ bool waitResponse, long ackGroupingTimeMs, long
ackGroupingMaxSize,
+ const ExecutorServicePtr& executor)
+ : AckGroupingTracker(connectionSupplier, requestIdSupplier,
consumerId, waitResponse),
+ ackGroupingTimeMs_(ackGroupingTimeMs),
+ ackGroupingMaxSize_(ackGroupingMaxSize),
+ executor_(executor) {
+ pendingIndividualCallbacks_.reserve(ackGroupingMaxSize);
+ }
- /**
- * Constructing ACK grouping tracker for peresistent topics.
- * @param[in] clientPtr pointer to client object.
- * @param[in] handlerPtr the shared pointer to connection handler.
- * @param[in] consumerId consumer ID that this tracker belongs to.
- * @param[in] ackGroupingTimeMs ACK grouping time window in milliseconds.
- * @param[in] ackGroupingMaxSize max. number of ACK requests can be
grouped.
- */
- AckGroupingTrackerEnabled(ClientImplPtr clientPtr, const HandlerBasePtr&
handlerPtr, uint64_t consumerId,
- long ackGroupingTimeMs, long ackGroupingMaxSize);
+ virtual ~AckGroupingTrackerEnabled() { this->close(); }
void start() override;
bool isDuplicate(const MessageId& msgId) override;
- void addAcknowledge(const MessageId& msgId) override;
- void addAcknowledgeList(const MessageIdList& msgIds) override;
- void addAcknowledgeCumulative(const MessageId& msgId) override;
+ void addAcknowledge(const MessageId& msgId, ResultCallback callback)
override;
+ void addAcknowledgeList(const MessageIdList& msgIds, ResultCallback
callback) override;
+ void addAcknowledgeCumulative(const MessageId& msgId, ResultCallback
callback) override;
void close() override;
void flush() override;
void flushAndClean() override;
@@ -73,21 +73,17 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker
{
void scheduleTimer();
//! State
- std::atomic_bool isClosed_;
-
- //! The connection handler.
- HandlerBaseWeakPtr handlerWeakPtr_;
-
- //! ID of the consumer that this tracker belongs to.
- uint64_t consumerId_;
+ std::atomic_bool isClosed_{false};
//! Next message ID to be cumulatively cumulatively.
- MessageId nextCumulativeAckMsgId_;
- bool requireCumulativeAck_;
+ MessageId nextCumulativeAckMsgId_{MessageId::earliest()};
+ bool requireCumulativeAck_{false};
+ ResultCallback latestCumulativeCallback_;
std::mutex mutexCumulativeAckMsgId_;
//! Individual ACK requests that have not been sent to broker.
std::set<MessageId> pendingIndividualAcks_;
+ std::vector<ResultCallback> pendingIndividualCallbacks_;
std::recursive_mutex rmutexPendingIndAcks_;
//! Time window in milliseconds for grouping ACK requests.
@@ -97,7 +93,7 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
const long ackGroupingMaxSize_;
//! ACK request sender's scheduled executor.
- ExecutorServicePtr executor_;
+ const ExecutorServicePtr executor_;
//! Pointer to a deadline timer.
DeadlineTimerPtr timer_;
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 08cbcc9..5837c50 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -907,6 +907,10 @@ void ClientConnection::handleIncomingCommand(BaseCommand&
incomingCmd) {
handleGetSchemaResponse(incomingCmd.getschemaresponse());
break;
+ case BaseCommand::ACK_RESPONSE:
+ handleAckResponse(incomingCmd.ackresponse());
+ break;
+
default:
LOG_WARN(cnxString_ << "Received invalid message from
server");
close();
@@ -1788,4 +1792,26 @@ void ClientConnection::handleGetSchemaResponse(const
proto::CommandGetSchemaResp
}
}
+void ClientConnection::handleAckResponse(const proto::CommandAckResponse&
response) {
+ LOG_DEBUG(cnxString_ << "Received AckResponse from server. req_id: " <<
response.request_id());
+
+ Lock lock(mutex_);
+ auto it = pendingRequests_.find(response.request_id());
+ if (it == pendingRequests_.cend()) {
+ lock.unlock();
+ LOG_WARN("Cannot find the cached AckResponse whose req_id is " <<
response.request_id());
+ return;
+ }
+
+ auto promise = it->second.promise;
+ pendingRequests_.erase(it);
+ lock.unlock();
+
+ if (response.has_error()) {
+ promise.setFailed(getResult(response.error(), ""));
+ } else {
+ promise.setValue({});
+ }
+}
+
} // namespace pulsar
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index eae18f9..95bb1ab 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -75,6 +75,7 @@ struct OpSendMsg;
namespace proto {
class BaseCommand;
class CommandActiveConsumerChange;
+class CommandAckResponse;
class CommandMessage;
class CommandCloseConsumer;
class CommandCloseProducer;
@@ -390,6 +391,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void handleGetLastMessageIdResponse(const
proto::CommandGetLastMessageIdResponse&);
void handleGetTopicOfNamespaceResponse(const
proto::CommandGetTopicsOfNamespaceResponse&);
void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
+ void handleAckResponse(const proto::CommandAckResponse&);
};
} // namespace pulsar
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 08e4a10..3b9606a 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -92,7 +92,6 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const
ClientConfiguration&
pool_(clientConfiguration_, ioExecutorProvider_,
clientConfiguration_.getAuthPtr(), poolConnections),
producerIdGenerator_(0),
consumerIdGenerator_(0),
- requestIdGenerator_(0),
closingError(ResultOk) {
std::unique_ptr<LoggerFactory> loggerFactory =
clientConfiguration_.impl_->takeLogger();
if (!loggerFactory) {
@@ -713,10 +712,7 @@ uint64_t ClientImpl::newConsumerId() {
return consumerIdGenerator_++;
}
-uint64_t ClientImpl::newRequestId() {
- Lock lock(mutex_);
- return requestIdGenerator_++;
-}
+uint64_t ClientImpl::newRequestId() { return (*requestIdGenerator_)++; }
uint64_t ClientImpl::getNumberOfProducers() {
uint64_t numberOfAliveProducers = 0;
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index de10d0e..dd7ffdd 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -121,6 +121,8 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
void cleanupConsumer(ConsumerImplBase* address) {
consumers_.remove(address); }
+ std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const {
return requestIdGenerator_; }
+
friend class PulsarFriend;
private:
@@ -175,7 +177,7 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
uint64_t producerIdGenerator_;
uint64_t consumerIdGenerator_;
- uint64_t requestIdGenerator_;
+ std::shared_ptr<std::atomic<uint64_t>>
requestIdGenerator_{std::make_shared<std::atomic<uint64_t>>(0)};
SynchronizedHashMap<ProducerImplBase*, ProducerImplBaseWeakPtr> producers_;
SynchronizedHashMap<ConsumerImplBase*, ConsumerImplBaseWeakPtr> consumers_;
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 0089b86..829be57 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -440,39 +440,73 @@ SharedBuffer Commands::newProducer(const std::string&
topic, uint64_t producerId
return writeMessageWithSize(cmd);
}
-SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t
entryId, const BitSet& ackSet,
- CommandAck_AckType ackType,
CommandAck_ValidationError validationError) {
- BaseCommand cmd;
- cmd.set_type(BaseCommand::ACK);
- CommandAck* ack = cmd.mutable_ack();
+static void configureCommandAck(CommandAck* ack, uint64_t consumerId, int64_t
ledgerId, int64_t entryId,
+ const BitSet& ackSet, CommandAck_AckType
ackType) {
ack->set_consumer_id(consumerId);
ack->set_ack_type(static_cast<proto::CommandAck_AckType>(ackType));
- if (proto::CommandAck_AckType_IsValid(validationError)) {
-
ack->set_validation_error((proto::CommandAck_ValidationError)validationError);
- }
auto* msgId = ack->add_message_id();
msgId->set_ledgerid(ledgerId);
msgId->set_entryid(entryId);
for (auto x : ackSet) {
msgId->add_ack_set(x);
}
+}
+
+SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t
entryId, const BitSet& ackSet,
+ CommandAck_AckType ackType) {
+ BaseCommand cmd;
+ cmd.set_type(BaseCommand::ACK);
+ configureCommandAck(cmd.mutable_ack(), consumerId, ledgerId, entryId,
ackSet, ackType);
return writeMessageWithSize(cmd);
}
-SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const
std::set<MessageId>& msgIds) {
+SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t
entryId, const BitSet& ackSet,
+ CommandAck_AckType ackType,
CommandAck_ValidationError validationError) {
BaseCommand cmd;
cmd.set_type(BaseCommand::ACK);
CommandAck* ack = cmd.mutable_ack();
+
ack->set_validation_error((proto::CommandAck_ValidationError)validationError);
+ configureCommandAck(ack, consumerId, ledgerId, entryId, ackSet, ackType);
+ return writeMessageWithSize(cmd);
+}
+
+SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t
entryId, const BitSet& ackSet,
+ CommandAck_AckType ackType, uint64_t requestId) {
+ BaseCommand cmd;
+ cmd.set_type(BaseCommand::ACK);
+ CommandAck* ack = cmd.mutable_ack();
+ ack->set_request_id(requestId);
+ configureCommandAck(ack, consumerId, ledgerId, entryId, ackSet, ackType);
+ return writeMessageWithSize(cmd);
+}
+
+static void configureCommandAck(CommandAck* ack, uint64_t consumerId, const
std::set<MessageId>& msgIds) {
ack->set_consumer_id(consumerId);
ack->set_ack_type(proto::CommandAck_AckType_Individual);
for (const auto& msgId : msgIds) {
auto newMsgId = ack->add_message_id();
newMsgId->set_ledgerid(msgId.ledgerId());
newMsgId->set_entryid(msgId.entryId());
- for (auto x : getMessageIdImpl(msgId)->getBitSet()) {
+ for (auto x : Commands::getMessageIdImpl(msgId)->getBitSet()) {
newMsgId->add_ack_set(x);
}
}
+}
+
+SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const
std::set<MessageId>& msgIds) {
+ BaseCommand cmd;
+ cmd.set_type(BaseCommand::ACK);
+ configureCommandAck(cmd.mutable_ack(), consumerId, msgIds);
+ return writeMessageWithSize(cmd);
+}
+
+SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const
std::set<MessageId>& msgIds,
+ uint64_t requestId) {
+ BaseCommand cmd;
+ cmd.set_type(BaseCommand::ACK);
+ CommandAck* ack = cmd.mutable_ack();
+ ack->set_request_id(requestId);
+ configureCommandAck(ack, consumerId, msgIds);
return writeMessageWithSize(cmd);
}
diff --git a/lib/Commands.h b/lib/Commands.h
index 7d13e2a..0331173 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -116,9 +116,16 @@ class Commands {
ProducerAccessMode accessMode,
boost::optional<uint64_t> topicEpoch,
const std::string&
initialSubscriptionName);
+ static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t
entryId, const BitSet& ackSet,
+ CommandAck_AckType ackType);
+ static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t
entryId, const BitSet& ackSet,
+ CommandAck_AckType ackType, uint64_t requestId);
static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t
entryId, const BitSet& ackSet,
CommandAck_AckType ackType,
CommandAck_ValidationError validationError);
+
static SharedBuffer newMultiMessageAck(uint64_t consumerId, const
std::set<MessageId>& msgIds);
+ static SharedBuffer newMultiMessageAck(uint64_t consumerId, const
std::set<MessageId>& msgIds,
+ uint64_t requestId);
static SharedBuffer newFlow(uint64_t consumerId, uint32_t messagePermits);
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index 1497a2d..4764b46 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -310,4 +310,11 @@ const std::vector<ConsumerInterceptorPtr>&
ConsumerConfiguration::getInterceptor
return impl_->interceptors;
}
+ConsumerConfiguration& ConsumerConfiguration::setAckReceiptEnabled(bool
ackReceiptEnabled) {
+ impl_->ackReceiptEnabled = ackReceiptEnabled;
+ return *this;
+}
+
+bool ConsumerConfiguration::isAckReceiptEnabled() const { return
impl_->ackReceiptEnabled; }
+
} // namespace pulsar
diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h
index 4ada424..e84aa0a 100644
--- a/lib/ConsumerConfigurationImpl.h
+++ b/lib/ConsumerConfigurationImpl.h
@@ -59,6 +59,7 @@ struct ConsumerConfigurationImpl {
long expireTimeOfIncompleteChunkedMessageMs{60000};
bool batchIndexAckEnabled{false};
std::vector<ConsumerInterceptorPtr> interceptors;
+ bool ackReceiptEnabled{false};
};
} // namespace pulsar
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 5400e1a..ab31e0b 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -85,7 +85,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const
std::string& topic,
consumerName_(config_.getConsumerName()),
messageListenerRunning_(true),
negativeAcksTracker_(client, *this, conf),
- ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
readCompacted_(conf.isReadCompacted()),
startMessageId_(startMessageId),
maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
@@ -182,18 +181,35 @@ const std::string& ConsumerImpl::getTopic() const {
return *topic_; }
void ConsumerImpl::start() {
HandlerBase::start();
+ std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
+ auto connectionSupplier = [weakSelf]() -> ClientConnectionPtr {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return nullptr;
+ }
+ return self->getCnx().lock();
+ };
+
+ // NOTE: start() is always called in `ClientImpl`'s method, so lock()
returns not null
+ const auto requestIdGenerator = client_.lock()->getRequestIdGenerator();
+ const auto requestIdSupplier = [requestIdGenerator] { return
(*requestIdGenerator)++; };
+
// Initialize ackGroupingTrackerPtr_ here because the
get_shared_this_ptr() was not initialized until the
// constructor completed.
if (TopicName::get(*topic_)->isPersistent()) {
if (config_.getAckGroupingTimeMs() > 0) {
ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
- client_.lock(), get_shared_this_ptr(), consumerId_,
config_.getAckGroupingTimeMs(),
- config_.getAckGroupingMaxSize()));
+ connectionSupplier, requestIdSupplier, consumerId_,
config_.isAckReceiptEnabled(),
+ config_.getAckGroupingTimeMs(),
config_.getAckGroupingMaxSize(),
+ client_.lock()->getIOExecutorProvider()->get()));
} else {
- ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this,
consumerId_));
+ ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(
+ connectionSupplier, requestIdSupplier, consumerId_,
config_.isAckReceiptEnabled()));
}
} else {
LOG_INFO(getName() << "ACK will NOT be sent to broker for this
non-persistent topic.");
+ ackGroupingTrackerPtr_.reset(new
AckGroupingTracker(connectionSupplier, requestIdSupplier,
+ consumerId_,
config_.isAckReceiptEnabled()));
}
ackGroupingTrackerPtr_->start();
}
@@ -1089,12 +1105,13 @@ void ConsumerImpl::acknowledgeAsync(const MessageId&
msgId, ResultCallback callb
const auto& msgIdToAck = pair.first;
const bool readyToAck = pair.second;
if (readyToAck) {
- ackGroupingTrackerPtr_->addAcknowledge(msgIdToAck);
+ ackGroupingTrackerPtr_->addAcknowledge(msgIdToAck, callback);
+ } else {
+ if (callback) {
+ callback(ResultOk);
+ }
}
interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultOk,
msgId);
- if (callback) {
- callback(ResultOk);
- }
}
void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList,
ResultCallback callback) {
@@ -1111,10 +1128,7 @@ void ConsumerImpl::acknowledgeAsync(const MessageIdList&
messageIdList, ResultCa
// with the Java client.
interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultOk,
msgId);
}
- this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdListToAck);
- if (callback) {
- callback(ResultOk);
- }
+ this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdListToAck,
callback);
}
void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId,
ResultCallback callback) {
@@ -1132,12 +1146,11 @@ void ConsumerImpl::acknowledgeCumulativeAsync(const
MessageId& msgId, ResultCall
if (readyToAck) {
consumerStatsBasePtr_->messageAcknowledged(ResultOk,
CommandAck_AckType_Cumulative, 1);
unAckedMessageTrackerPtr_->removeMessagesTill(msgIdToAck);
- ackGroupingTrackerPtr_->addAcknowledgeCumulative(msgIdToAck);
- }
- interceptors_->onAcknowledgeCumulative(Consumer(shared_from_this()),
ResultOk, msgId);
- if (callback) {
+ ackGroupingTrackerPtr_->addAcknowledgeCumulative(msgIdToAck, callback);
+ } else if (callback) {
callback(ResultOk);
}
+ interceptors_->onAcknowledgeCumulative(Consumer(shared_from_this()),
ResultOk, msgId);
}
bool ConsumerImpl::isCumulativeAcknowledgementAllowed(ConsumerType
consumerType) {
diff --git a/tests/AcknowledgeTest.cc b/tests/AcknowledgeTest.cc
index 0818466..9ca1310 100644
--- a/tests/AcknowledgeTest.cc
+++ b/tests/AcknowledgeTest.cc
@@ -19,6 +19,7 @@
#include <gtest/gtest.h>
#include <pulsar/Client.h>
+#include <atomic>
#include <chrono>
#include <set>
#include <thread>
@@ -39,7 +40,8 @@ static std::string adminUrl = "http://localhost:8080/";
extern std::string unique_str();
-class AcknowledgeTest : public testing::TestWithParam<int> {};
+class AcknowledgeTest
+ : public testing::TestWithParam<std::tuple<int /* ack grouping time */,
bool /* ack with receipt */>> {};
TEST_P(AcknowledgeTest, testAckMsgList) {
Client client(lookupUrl);
@@ -55,7 +57,8 @@ TEST_P(AcknowledgeTest, testAckMsgList) {
ConsumerConfiguration consumerConfig;
consumerConfig.setAckGroupingMaxSize(numMsg);
- consumerConfig.setAckGroupingTimeMs(GetParam());
+ consumerConfig.setAckGroupingTimeMs(std::get<0>(GetParam()));
+ consumerConfig.setAckReceiptEnabled(std::get<1>(GetParam()));
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig,
consumer));
@@ -118,7 +121,8 @@ TEST_P(AcknowledgeTest, testAckMsgListWithMultiConsumer) {
ConsumerConfiguration consumerConfig;
// set ack grouping max size is 10
consumerConfig.setAckGroupingMaxSize(10);
- consumerConfig.setAckGroupingTimeMs(GetParam());
+ consumerConfig.setAckGroupingTimeMs(std::get<0>(GetParam()));
+ consumerConfig.setAckReceiptEnabled(std::get<1>(GetParam()));
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig,
consumer));
@@ -315,4 +319,65 @@ TEST_F(AcknowledgeTest, testInvalidMessageId) {
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msg));
}
-INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest,
testing::Values(100, 0));
+TEST_F(AcknowledgeTest, testAckReceiptEnabled) {
+ Client client(lookupUrl);
+ const std::string topic = "test-ack-receipt-enabled" + unique_str();
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ std::vector<MessageId> msgIds;
+ for (int i = 0; i < 5; i++) {
+ MessageId msgId;
+ ASSERT_EQ(ResultOk,
+ producer.send(MessageBuilder().setContent("msg-" +
std::to_string(i)).build(), msgId));
+ msgIds.emplace_back(msgId);
+ }
+
+ constexpr long ackGroupingTimeMs = 200;
+ Consumer consumer;
+ ConsumerConfiguration conf;
+ conf.setAckGroupingTimeMs(ackGroupingTimeMs);
+ conf.setAckReceiptEnabled(true);
+ conf.setBatchIndexAckEnabled(true);
+ conf.setSubscriptionInitialPosition(InitialPositionEarliest);
+
+ using namespace std::chrono;
+ // The ACK grouping timer starts after it's subscribed successfully. To
ensure the acknowledgments
+ // complete after `ackGroupingTimeMs`, record the start timestamp before
subscribing
+ auto now = high_resolution_clock::now();
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", conf, consumer));
+
+ std::atomic<decltype(std::this_thread::get_id())> threadId[3];
+ std::atomic_long durationMs[3];
+ std::atomic<Result> result[3];
+ Latch latch{3};
+
+ auto createCallback = [&](int i) -> ResultCallback {
+ return [i, now, &threadId, &durationMs, &result, &latch](Result
result0) {
+ threadId[i] = std::this_thread::get_id();
+ durationMs[i] =
+
duration_cast<std::chrono::milliseconds>(high_resolution_clock::now() -
now).count();
+ result[i] = result0;
+ latch.countdown();
+ };
+ };
+ consumer.acknowledgeAsync(msgIds[1], createCallback(0));
+ consumer.acknowledgeCumulativeAsync(msgIds[2], createCallback(1));
+ consumer.acknowledgeAsync(msgIds, createCallback(2));
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(3)));
+ for (int i = 0; i < 3; i++) {
+ LOG_INFO("Ack time: " << durationMs[i] << "ms");
+ EXPECT_TRUE(durationMs[i] > ackGroupingTimeMs && durationMs[i] <
ackGroupingTimeMs + 100);
+ EXPECT_NE(threadId[i], std::this_thread::get_id());
+ EXPECT_EQ(result[i], ResultOk);
+ }
+
+ client.close();
+}
+
+INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest,
+ testing::Combine(testing::Values(100, 0),
testing::Values(true, false)),
+ [](const testing::TestParamInfo<std::tuple<int,
bool>>& info) {
+ return std::to_string(std::get<0>(info.param)) +
"_" +
+ std::to_string(std::get<1>(info.param));
+ });
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index e668f6f..5e28d21 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -3513,42 +3513,12 @@ TEST(BasicEndToEndTest, testSendCallback) {
client.close();
}
-class AckGroupingTrackerMock : public AckGroupingTracker {
- public:
- explicit AckGroupingTrackerMock(bool mockAck) : mockAck_(mockAck) {}
-
- bool callDoImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t
consumerId, const MessageId &msgId,
- CommandAck_AckType ackType) {
- if (!this->mockAck_) {
- // Not mocking ACK, expose this method.
- return this->doImmediateAck(connWeakPtr, consumerId, msgId,
ackType);
- } else {
- // Mocking ACK.
- return true;
- }
- }
-
- bool callDoImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t
consumerId,
- const std::set<MessageId> &msgIds) {
- if (!this->mockAck_) {
- // Not mocking ACK, expose this method.
- return this->doImmediateAck(connWeakPtr, consumerId, msgIds);
- } else {
- // Mocking ACK.
- return true;
- }
- }
-
- private:
- bool mockAck_;
-}; // class AckGroupingTrackerMock
-
TEST(BasicEndToEndTest, testAckGroupingTrackerDefaultBehavior) {
ConsumerConfiguration configConsumer;
ASSERT_EQ(configConsumer.getAckGroupingTimeMs(), 100);
ASSERT_EQ(configConsumer.getAckGroupingMaxSize(), 1000);
- AckGroupingTracker tracker;
+ AckGroupingTracker tracker{nullptr, nullptr, 0, false};
Message msg;
ASSERT_FALSE(tracker.isDuplicate(msg.getMessageId()));
}
@@ -3586,13 +3556,15 @@ TEST(BasicEndToEndTest,
testAckGroupingTrackerSingleAckBehavior) {
}
// Send ACK.
- AckGroupingTrackerMock tracker(false);
+ auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+ AckGroupingTrackerDisabled tracker([&consumerImpl]() { return
consumerImpl.getCnx().lock(); },
+ [&clientImplPtr] { return
clientImplPtr->newRequestId(); },
+ consumerImpl.getConsumerId(), false);
tracker.start();
for (auto msgIdx = 0; msgIdx < numMsg; ++msgIdx) {
auto connPtr = connWeakPtr.lock();
ASSERT_NE(connPtr, nullptr);
- ASSERT_TRUE(tracker.callDoImmediateAck(connWeakPtr,
consumerImpl.getConsumerId(), recvMsgId[msgIdx],
- CommandAck_AckType_Individual));
+ tracker.addAcknowledge(recvMsgId[msgIdx], nullptr);
}
Message msg;
ASSERT_EQ(ResultTimeout, consumer.receive(msg, 1000));
@@ -3621,7 +3593,6 @@ TEST(BasicEndToEndTest,
testAckGroupingTrackerMultiAckBehavior) {
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
- auto connWeakPtr = PulsarFriend::getClientConnection(consumerImpl);
// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
@@ -3637,11 +3608,12 @@ TEST(BasicEndToEndTest,
testAckGroupingTrackerMultiAckBehavior) {
}
// Send ACK.
- AckGroupingTrackerMock tracker(false);
+ auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+ AckGroupingTrackerDisabled tracker([&consumerImpl]() { return
consumerImpl.getCnx().lock(); },
+ [&clientImplPtr] { return
clientImplPtr->newRequestId(); },
+ consumerImpl.getConsumerId(), false);
tracker.start();
- std::set<MessageId> restMsgId(recvMsgId.begin(), recvMsgId.end());
- ASSERT_EQ(restMsgId.size(), numMsg);
- ASSERT_TRUE(tracker.callDoImmediateAck(connWeakPtr,
consumerImpl.getConsumerId(), restMsgId));
+ tracker.addAcknowledgeList(recvMsgId, nullptr);
consumer.close();
std::this_thread::sleep_for(std::chrono::seconds(1));
@@ -3684,9 +3656,10 @@ TEST(BasicEndToEndTest,
testAckGroupingTrackerDisabledIndividualAck) {
}
// Send ACK.
- AckGroupingTrackerDisabled tracker(consumerImpl,
consumerImpl.getConsumerId());
+ AckGroupingTrackerDisabled tracker([&consumerImpl] { return
consumerImpl.getCnx().lock(); }, nullptr,
+ consumerImpl.getConsumerId(), false);
for (auto &msgId : recvMsgId) {
- tracker.addAcknowledge(msgId);
+ tracker.addAcknowledge(msgId, nullptr);
}
consumer.close();
@@ -3730,9 +3703,10 @@ TEST(BasicEndToEndTest,
testAckGroupingTrackerDisabledCumulativeAck) {
}
// Send ACK.
- AckGroupingTrackerDisabled tracker(consumerImpl,
consumerImpl.getConsumerId());
+ AckGroupingTrackerDisabled tracker([&consumerImpl] { return
consumerImpl.getCnx().lock(); }, nullptr,
+ consumerImpl.getConsumerId(), false);
auto &latestMsgId = *std::max_element(recvMsgId.begin(), recvMsgId.end());
- tracker.addAcknowledgeCumulative(latestMsgId);
+ tracker.addAcknowledgeCumulative(latestMsgId, nullptr);
consumer.close();
std::this_thread::sleep_for(std::chrono::seconds(1));
@@ -3745,10 +3719,7 @@ TEST(BasicEndToEndTest,
testAckGroupingTrackerDisabledCumulativeAck) {
class AckGroupingTrackerEnabledMock : public AckGroupingTrackerEnabled {
public:
- AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, const
HandlerBasePtr &handlerPtr,
- uint64_t consumerId, long ackGroupingTimeMs,
long ackGroupingMaxSize)
- : AckGroupingTrackerEnabled(clientPtr, handlerPtr, consumerId,
ackGroupingTimeMs,
- ackGroupingMaxSize) {}
+ using AckGroupingTrackerEnabled::AckGroupingTrackerEnabled;
const std::set<MessageId> &getPendingIndividualAcks() { return
this->pendingIndividualAcks_; }
const long getAckGroupingTimeMs() { return this->ackGroupingTimeMs_; }
const long getAckGroupingMaxSize() { return this->ackGroupingMaxSize_; }
@@ -3791,14 +3762,15 @@ TEST(BasicEndToEndTest,
testAckGroupingTrackerEnabledIndividualAck) {
}
auto tracker = std::make_shared<AckGroupingTrackerEnabledMock>(
- clientImplPtr, consumerImpl, consumerImpl->getConsumerId(),
ackGroupingTimeMs, ackGroupingMaxSize);
+ [&consumerImpl] { return consumerImpl->getCnx().lock(); }, nullptr,
consumerImpl->getConsumerId(),
+ false, ackGroupingTimeMs, ackGroupingMaxSize,
clientImplPtr->getIOExecutorProvider()->get());
tracker->start();
ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
ASSERT_EQ(tracker->getAckGroupingTimeMs(), ackGroupingTimeMs);
ASSERT_EQ(tracker->getAckGroupingMaxSize(), ackGroupingMaxSize);
for (auto &msgId : recvMsgId) {
ASSERT_FALSE(tracker->isDuplicate(msgId));
- tracker->addAcknowledge(msgId);
+ tracker->addAcknowledge(msgId, nullptr);
ASSERT_TRUE(tracker->isDuplicate(msgId));
}
ASSERT_EQ(tracker->getPendingIndividualAcks().size(), recvMsgId.size());
@@ -3852,7 +3824,8 @@ TEST(BasicEndToEndTest,
testAckGroupingTrackerEnabledCumulativeAck) {
std::sort(recvMsgId.begin(), recvMsgId.end());
auto tracker0 = std::make_shared<AckGroupingTrackerEnabledMock>(
- clientImplPtr, consumerImpl0, consumerImpl0->getConsumerId(),
ackGroupingTimeMs, ackGroupingMaxSize);
+ [&consumerImpl0] { return consumerImpl0->getCnx().lock(); }, nullptr,
consumerImpl0->getConsumerId(),
+ false, ackGroupingTimeMs, ackGroupingMaxSize,
clientImplPtr->getIOExecutorProvider()->get());
tracker0->start();
ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), MessageId::earliest());
ASSERT_FALSE(tracker0->requireCumulativeAck());
@@ -3861,7 +3834,7 @@ TEST(BasicEndToEndTest,
testAckGroupingTrackerEnabledCumulativeAck) {
for (auto idx = 0; idx <= numMsg / 2; ++idx) {
ASSERT_FALSE(tracker0->isDuplicate(recvMsgId[idx]));
}
- tracker0->addAcknowledgeCumulative(targetMsgId);
+ tracker0->addAcknowledgeCumulative(targetMsgId, nullptr);
for (auto idx = 0; idx <= numMsg / 2; ++idx) {
ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx]));
}
@@ -3888,9 +3861,10 @@ TEST(BasicEndToEndTest,
testAckGroupingTrackerEnabledCumulativeAck) {
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " <<
msg.getDataAsString();
auto tracker1 = std::make_shared<AckGroupingTrackerEnabledMock>(
- clientImplPtr, consumerImpl1, consumerImpl1->getConsumerId(),
ackGroupingTimeMs, ackGroupingMaxSize);
+ [&consumerImpl1] { return consumerImpl1->getCnx().lock(); }, nullptr,
consumerImpl1->getConsumerId(),
+ false, ackGroupingTimeMs, ackGroupingMaxSize,
clientImplPtr->getIOExecutorProvider()->get());
tracker1->start();
- tracker1->addAcknowledgeCumulative(recvMsgId[numMsg - 1]);
+ tracker1->addAcknowledgeCumulative(recvMsgId[numMsg - 1], nullptr);
tracker1->close();
consumer.close();
diff --git a/tests/ConsumerConfigurationTest.cc
b/tests/ConsumerConfigurationTest.cc
index 9b91722..f378501 100644
--- a/tests/ConsumerConfigurationTest.cc
+++ b/tests/ConsumerConfigurationTest.cc
@@ -70,6 +70,7 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) {
ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumBytes(), 10 * 1024 * 1024);
ASSERT_EQ(conf.getBatchReceivePolicy().getTimeoutMs(), 100);
ASSERT_EQ(conf.isBatchIndexAckEnabled(), false);
+ ASSERT_EQ(conf.isAckReceiptEnabled(), false);
}
TEST(ConsumerConfigurationTest, testCustomConfig) {
@@ -168,6 +169,9 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
conf.setBatchIndexAckEnabled(true);
ASSERT_EQ(conf.isBatchIndexAckEnabled(), true);
+
+ conf.setAckReceiptEnabled(true);
+ ASSERT_TRUE(conf.isAckReceiptEnabled());
}
TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {