This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6b40749dd4d [C++] Unblock all threads when Pulsar client is closed
(#15726)
6b40749dd4d is described below
commit 6b40749dd4d6be5ea93fc987986a65209c5a8a56
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 24 10:44:20 2022 -0700
[C++] Unblock all threads when Pulsar client is closed (#15726)
* [C++] Unblock all threads when Pulsar client is closed
* Fixed formatting
* Added unit tests for UnboundedBlockingQueue
* Addressed comments
* Fixed indentation
---
pulsar-client-cpp/include/pulsar/Result.h | 2 +
pulsar-client-cpp/include/pulsar/c/result.h | 3 +-
pulsar-client-cpp/lib/BlockingQueue.h | 155 +++---------------
pulsar-client-cpp/lib/ClientImpl.cc | 2 +
pulsar-client-cpp/lib/ConsumerImpl.cc | 11 +-
pulsar-client-cpp/lib/ExecutorService.cc | 4 +-
pulsar-client-cpp/lib/MemoryLimitController.cc | 15 +-
pulsar-client-cpp/lib/MemoryLimitController.h | 5 +-
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 9 +-
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 3 +-
pulsar-client-cpp/lib/ProducerImpl.cc | 18 ++-
pulsar-client-cpp/lib/Result.cc | 3 +
pulsar-client-cpp/lib/Semaphore.cc | 16 +-
pulsar-client-cpp/lib/Semaphore.h | 5 +-
pulsar-client-cpp/lib/UnboundedBlockingQueue.h | 27 ++--
pulsar-client-cpp/python/src/enums.cc | 3 +-
pulsar-client-cpp/python/src/exceptions.cc | 1 +
pulsar-client-cpp/tests/BlockingQueueTest.cc | 113 ++++++-------
pulsar-client-cpp/tests/SemaphoreTest.cc | 23 ++-
.../tests/UnboundedBlockingQueueTest.cc | 178 +++++++++++++++++++++
20 files changed, 367 insertions(+), 229 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/Result.h
b/pulsar-client-cpp/include/pulsar/Result.h
index 91f68584955..a7fdd04f99a 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -88,6 +88,8 @@ enum Result
ResultProducerFenced, /// Producer was fenced
by broker
ResultMemoryBufferIsFull, /// Client-wide memory limit has been reached
+
+ ResultInterrupted, /// Interrupted while waiting to dequeue
};
// Return string representation of result code
diff --git a/pulsar-client-cpp/include/pulsar/c/result.h
b/pulsar-client-cpp/include/pulsar/c/result.h
index ea9321cfd84..a95d1e5862f 100644
--- a/pulsar-client-cpp/include/pulsar/c/result.h
+++ b/pulsar-client-cpp/include/pulsar/c/result.h
@@ -90,7 +90,8 @@ typedef enum
pulsar_result_TransactionNotFound, /// Transaction not
found
pulsar_result_ProducerFenced, /// Producer was
fenced by broker
- pulsar_result_MemoryBufferIsFull /// Client-wide memory limit has been
reached
+ pulsar_result_MemoryBufferIsFull, /// Client-wide memory limit has been
reached
+ pulsar_result_Interrupted, /// Interrupted while waiting to dequeue
} pulsar_result;
// Return string representation of result code
diff --git a/pulsar-client-cpp/lib/BlockingQueue.h
b/pulsar-client-cpp/lib/BlockingQueue.h
index 2814c4f20c5..d09166fdf26 100644
--- a/pulsar-client-cpp/lib/BlockingQueue.h
+++ b/pulsar-client-cpp/lib/BlockingQueue.h
@@ -33,14 +33,14 @@ template <typename Container>
struct QueueNotEmpty {
const Container& queue_;
QueueNotEmpty(const Container& queue) : queue_(queue) {}
- bool operator()() const { return !queue_.isEmptyNoMutex(); }
+ bool operator()() const { return !queue_.isEmptyNoMutex() ||
queue_.isClosedNoMutex(); }
};
template <typename Container>
struct QueueNotFull {
const Container& queue_;
QueueNotFull(const Container& queue) : queue_(queue) {}
- bool operator()() const { return !queue_.isFullNoMutex(); }
+ bool operator()() const { return !queue_.isFullNoMutex() ||
queue_.isClosedNoMutex(); }
};
template <typename T>
@@ -50,117 +50,21 @@ class BlockingQueue {
typedef typename Container::iterator iterator;
typedef typename Container::const_iterator const_iterator;
- class ReservedSpot {
- public:
- ReservedSpot() : queue_(), released_(true) {}
+ BlockingQueue(size_t maxSize) : maxSize_(maxSize), mutex_(),
queue_(maxSize) {}
- ReservedSpot(BlockingQueue<T>& queue) : queue_(&queue),
released_(false) {}
-
- ~ReservedSpot() { release(); }
-
- void release() {
- if (!released_) {
- queue_->releaseReservedSpot();
- released_ = true;
- }
- }
-
- private:
- BlockingQueue<T>* queue_;
- bool released_;
-
- friend class BlockingQueue<T>;
- };
-
- BlockingQueue(size_t maxSize) : maxSize_(maxSize), mutex_(),
queue_(maxSize), reservedSpots_(0) {}
-
- bool tryReserve(size_t noOfSpots) {
- assert(noOfSpots <= maxSize_);
- Lock lock(mutex_);
- if (noOfSpots <= maxSize_ - (reservedSpots_ + queue_.size())) {
- reservedSpots_ += noOfSpots;
- return true;
- }
- return false;
- }
-
- void reserve(size_t noOfSpots) {
- assert(noOfSpots <= maxSize_);
- Lock lock(mutex_);
- while (noOfSpots--) {
- queueFullCondition.wait(lock, QueueNotFull<BlockingQueue<T>
>(*this));
- reservedSpots_++;
- }
- }
-
- void release(size_t noOfSpots) {
- Lock lock(mutex_);
- assert(noOfSpots <= reservedSpots_);
- bool wasFull = isFullNoMutex();
- reservedSpots_ -= noOfSpots;
- lock.unlock();
-
- if (wasFull) {
- // Notify that one spot is now available
- queueFullCondition.notify_all();
- }
- }
-
- ReservedSpot reserve() {
+ bool push(const T& value) {
Lock lock(mutex_);
- // If the queue is full, wait for space to be available
- queueFullCondition.wait(lock, QueueNotFull<BlockingQueue<T> >(*this));
- reservedSpots_++;
- return ReservedSpot(*this);
- }
- void push(const T& value, bool wasReserved = false) {
- Lock lock(mutex_);
- if (wasReserved) {
- reservedSpots_--;
- }
// If the queue is full, wait for space to be available
queueFullCondition.wait(lock, QueueNotFull<BlockingQueue<T> >(*this));
- bool wasEmpty = queue_.empty();
- queue_.push_back(value);
- lock.unlock();
- if (wasEmpty) {
- // Notify that an element is pushed
- queueEmptyCondition.notify_all();
- }
- }
-
- void push(const T& value, ReservedSpot& spot) {
- Lock lock(mutex_);
-
- // Since the value already had a spot reserved in the queue, we need to
- // discount it
- assert(reservedSpots_ > 0);
- reservedSpots_--;
- spot.released_ = true;
-
- bool wasEmpty = queue_.empty();
- queue_.push_back(value);
- lock.unlock();
- if (wasEmpty) {
- // Notify that an element is pushed
- queueEmptyCondition.notify_all();
- }
- }
-
- bool tryPush(const T& value) {
- Lock lock(mutex_);
-
- // Need to consider queue_.size() + reserved spot
- if (isFullNoMutex()) {
+ if (isClosedNoMutex()) {
return false;
}
bool wasEmpty = queue_.empty();
queue_.push_back(value);
lock.unlock();
-
if (wasEmpty) {
// Notify that an element is pushed
queueEmptyCondition.notify_all();
@@ -169,35 +73,28 @@ class BlockingQueue {
return true;
}
- void pop() {
+ bool pop(T& value) {
Lock lock(mutex_);
+
// If the queue is empty, wait until an element is available to be
popped
queueEmptyCondition.wait(lock, QueueNotEmpty<BlockingQueue<T>
>(*this));
- bool wasFull = isFullNoMutex();
- queue_.pop_front();
- lock.unlock();
-
- if (wasFull) {
- // Notify that an element is popped
- queueFullCondition.notify_all();
+ if (isClosedNoMutex()) {
+ return false;
}
- }
- void pop(T& value) {
- Lock lock(mutex_);
- // If the queue is empty, wait until an element is available to be
popped
- queueEmptyCondition.wait(lock, QueueNotEmpty<BlockingQueue<T>
>(*this));
value = queue_.front();
-
bool wasFull = isFullNoMutex();
queue_.pop_front();
+
lock.unlock();
if (wasFull) {
// Notify that an element is popped
queueFullCondition.notify_all();
}
+
+ return true;
}
template <typename Duration>
@@ -207,6 +104,10 @@ class BlockingQueue {
return false;
}
+ if (isClosedNoMutex()) {
+ return false;
+ }
+
bool wasFull = isFullNoMutex();
value = queue_.front();
queue_.pop_front();
@@ -263,34 +164,28 @@ class BlockingQueue {
iterator end() { return queue_.end(); }
- int reservedSpots() const { return reservedSpots_; }
-
- private:
- void releaseReservedSpot() {
+ void close() {
Lock lock(mutex_);
- bool wasFull = isFullNoMutex();
- --reservedSpots_;
- lock.unlock();
-
- if (wasFull) {
- // Notify that one spot is now available
- queueFullCondition.notify_all();
- }
+ isClosed_ = true;
+ queueEmptyCondition.notify_all();
+ queueFullCondition.notify_all();
}
+ private:
bool isEmptyNoMutex() const { return queue_.empty(); }
- bool isFullNoMutex() const { return (queue_.size() + reservedSpots_) ==
maxSize_; }
+ bool isFullNoMutex() const { return queue_.size() == maxSize_; }
+
+ bool isClosedNoMutex() const { return isClosed_; }
const size_t maxSize_;
mutable std::mutex mutex_;
std::condition_variable queueFullCondition;
std::condition_variable queueEmptyCondition;
Container queue_;
- int reservedSpots_;
+ bool isClosed_ = false;
typedef std::unique_lock<std::mutex> Lock;
- friend class QueueReservedSpot;
friend struct QueueNotEmpty<BlockingQueue<T> >;
friend struct QueueNotFull<BlockingQueue<T> >;
};
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc
b/pulsar-client-cpp/lib/ClientImpl.cc
index 931fadeaae7..ffd8dac74a4 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -493,6 +493,8 @@ void ClientImpl::closeAsync(CloseCallback callback) {
state_ = Closing;
lock.unlock();
+ memoryLimitController_.close();
+
SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() +
consumers.size());
LOG_INFO("Closing Pulsar client with " << producers.size() << " producers
and " << consumers.size()
<< " consumers");
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index d3ba47f3a80..63d2afc29bc 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -722,7 +722,10 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message&
msg) {
sendFlowPermitsToBroker(currentCnx, 1);
while (true) {
- incomingMessages_.pop(msg);
+ if (!incomingMessages_.pop(msg)) {
+ return ResultInterrupted;
+ }
+
{
// Lock needed to prevent race between connectionOpened and the
check "msg.impl_->cnx_ ==
// currentCnx.get())"
@@ -787,7 +790,10 @@ Result ConsumerImpl::receiveHelper(Message& msg) {
return fetchSingleMessageFromBroker(msg);
}
- incomingMessages_.pop(msg);
+ if (!incomingMessages_.pop(msg)) {
+ return ResultInterrupted;
+ }
+
messageProcessed(msg);
return ResultOk;
}
@@ -998,6 +1004,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
LOG_INFO(getName() << "Closing consumer for topic " << topic_);
state_ = Closing;
+ incomingMessages_.close();
// Flush pending grouped ACK requests.
if (ackGroupingTrackerPtr_) {
diff --git a/pulsar-client-cpp/lib/ExecutorService.cc
b/pulsar-client-cpp/lib/ExecutorService.cc
index b9b5ed46478..a7390f19cea 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -38,13 +38,13 @@ void ExecutorService::start() {
if (self->isClosed()) {
return;
}
- LOG_INFO("Run io_service in a single thread");
+ LOG_DEBUG("Run io_service in a single thread");
boost::system::error_code ec;
self->getIOService().run(ec);
if (ec) {
LOG_ERROR("Failed to run io_service: " << ec.message());
} else {
- LOG_INFO("Event loop of ExecutorService exits successfully");
+ LOG_DEBUG("Event loop of ExecutorService exits successfully");
}
self->ioServiceDone_ = true;
self->cond_.notify_all();
diff --git a/pulsar-client-cpp/lib/MemoryLimitController.cc
b/pulsar-client-cpp/lib/MemoryLimitController.cc
index 4a23f8b1a1d..f55da8e68a4 100644
--- a/pulsar-client-cpp/lib/MemoryLimitController.cc
+++ b/pulsar-client-cpp/lib/MemoryLimitController.cc
@@ -45,16 +45,23 @@ bool MemoryLimitController::tryReserveMemory(uint64_t size)
{
}
}
-void MemoryLimitController::reserveMemory(uint64_t size) {
+bool MemoryLimitController::reserveMemory(uint64_t size) {
if (!tryReserveMemory(size)) {
std::unique_lock<std::mutex> lock(mutex_);
// Check again, while holding the lock, to ensure we reserve attempt
and the waiting for the condition
// are synchronized.
while (!tryReserveMemory(size)) {
+ if (isClosed_) {
+ // Interrupt the waiting if the client is closing
+ return false;
+ }
+
condition_.wait(lock);
}
}
+
+ return true;
}
void MemoryLimitController::releaseMemory(uint64_t size) {
@@ -70,4 +77,10 @@ void MemoryLimitController::releaseMemory(uint64_t size) {
uint64_t MemoryLimitController::currentUsage() const { return currentUsage_; }
+void MemoryLimitController::close() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ isClosed_ = true;
+ condition_.notify_all();
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MemoryLimitController.h
b/pulsar-client-cpp/lib/MemoryLimitController.h
index e70a248fed7..38987ea0b68 100644
--- a/pulsar-client-cpp/lib/MemoryLimitController.h
+++ b/pulsar-client-cpp/lib/MemoryLimitController.h
@@ -30,15 +30,18 @@ class MemoryLimitController {
public:
explicit MemoryLimitController(uint64_t memoryLimit);
bool tryReserveMemory(uint64_t size);
- void reserveMemory(uint64_t size);
+ bool reserveMemory(uint64_t size);
void releaseMemory(uint64_t size);
uint64_t currentUsage() const;
+ void close();
+
private:
const uint64_t memoryLimit_;
std::atomic<uint64_t> currentUsage_;
std::mutex mutex_;
std::condition_variable condition_;
+ bool isClosed_ = false;
};
} // namespace pulsar
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 0ae86d5879a..0ad9d606810 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -380,7 +380,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback
callback) {
return;
}
- // fail pending recieve
+ // fail pending receive
failPendingReceiveCallback();
}
@@ -436,8 +436,8 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer
consumer, const Message&
if (messages_.full()) {
lock.unlock();
}
- messages_.push(msg);
- if (messageListener_) {
+
+ if (messages_.push(msg) && messageListener_) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
listenerExecutor_->postWork(
std::bind(&MultiTopicsConsumerImpl::internalListener,
shared_from_this(), consumer));
@@ -520,6 +520,9 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback&
callback) {
void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
Message msg;
+
+ messages_.close();
+
Lock lock(pendingReceiveMutex_);
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index e43b5090e43..23288e2f811 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -417,8 +417,7 @@ void PartitionedConsumerImpl::messageReceived(Consumer
consumer, const Message&
if (messages_.full()) {
lock.unlock();
}
- messages_.push(msg);
- if (messageListener_) {
+ if (messages_.push(msg) && messageListener_) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
listenerExecutor_->postWork(
std::bind(&PartitionedConsumerImpl::internalListener,
shared_from_this(), consumer));
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc
b/pulsar-client-cpp/lib/ProducerImpl.cc
index a539889ac00..229f7375c5e 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -525,16 +525,16 @@ int ProducerImpl::getNumOfChunks(uint32_t size, uint32_t
maxMessageSize) {
Result ProducerImpl::canEnqueueRequest(uint32_t payloadSize) {
if (conf_.getBlockIfQueueFull()) {
- if (semaphore_) {
- semaphore_->acquire();
+ if (semaphore_ && !semaphore_->acquire()) {
+ return ResultInterrupted;
+ }
+ if (!memoryLimitController_.reserveMemory(payloadSize)) {
+ return ResultInterrupted;
}
- memoryLimitController_.reserveMemory(payloadSize);
return ResultOk;
} else {
- if (semaphore_) {
- if (!semaphore_->tryAcquire()) {
- return ResultProducerQueueIsFull;
- }
+ if (semaphore_ && !semaphore_->tryAcquire()) {
+ return ResultProducerQueueIsFull;
}
if (!memoryLimitController_.tryReserveMemory(payloadSize)) {
@@ -646,6 +646,10 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
cancelTimers();
+ if (semaphore_) {
+ semaphore_->close();
+ }
+
// ensure any remaining send callbacks are called before calling the close
callback
failPendingMessages(ResultAlreadyClosed, false);
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index 3feca2e83dc..d3322aa3805 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -159,6 +159,9 @@ const char* strResult(Result result) {
case ResultMemoryBufferIsFull:
return "ResultMemoryBufferIsFull";
+
+ case ResultInterrupted:
+ return "ResultInterrupted";
};
// NOTE : Do not add default case in the switch above. In future if we get
new cases for
// ServerError and miss them in the switch above we would like to get
notified. Adding
diff --git a/pulsar-client-cpp/lib/Semaphore.cc
b/pulsar-client-cpp/lib/Semaphore.cc
index e11bef28fe3..40d4e46cd1a 100644
--- a/pulsar-client-cpp/lib/Semaphore.cc
+++ b/pulsar-client-cpp/lib/Semaphore.cc
@@ -34,19 +34,25 @@ bool Semaphore::tryAcquire(int n) {
}
}
-void Semaphore::acquire(int n) {
+bool Semaphore::acquire(int n) {
std::unique_lock<std::mutex> lock(mutex_);
while (currentUsage_ + n > limit_) {
+ if (isClosed_) {
+ return false;
+ }
condition_.wait(lock);
}
currentUsage_ += n;
+ return true;
}
void Semaphore::release(int n) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::unique_lock<std::mutex> lock(mutex_);
currentUsage_ -= n;
+ lock.unlock();
+
if (n == 1) {
condition_.notify_one();
} else {
@@ -59,4 +65,10 @@ uint32_t Semaphore::currentUsage() const {
return currentUsage_;
}
+void Semaphore::close() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ isClosed_ = true;
+ condition_.notify_all();
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/Semaphore.h
b/pulsar-client-cpp/lib/Semaphore.h
index 4186c4dfe79..dcef2ad5b74 100644
--- a/pulsar-client-cpp/lib/Semaphore.h
+++ b/pulsar-client-cpp/lib/Semaphore.h
@@ -31,15 +31,18 @@ class Semaphore {
explicit Semaphore(uint32_t limit);
bool tryAcquire(int n = 1);
- void acquire(int n = 1);
+ bool acquire(int n = 1);
void release(int n = 1);
uint32_t currentUsage() const;
+ void close();
+
private:
const uint32_t limit_;
uint32_t currentUsage_;
mutable std::mutex mutex_;
std::condition_variable condition_;
+ bool isClosed_ = false;
};
} // namespace pulsar
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
b/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
index 841be30e1e4..0f7fc2a33af 100644
--- a/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
+++ b/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
@@ -55,21 +55,18 @@ class UnboundedBlockingQueue {
}
}
- void pop() {
+ bool pop(T& value) {
Lock lock(mutex_);
// If the queue is empty, wait until an element is available to be
popped
queueEmptyCondition_.wait(lock,
QueueNotEmpty<UnboundedBlockingQueue<T> >(*this));
- queue_.pop_front();
- lock.unlock();
- }
- void pop(T& value) {
- Lock lock(mutex_);
- // If the queue is empty, wait until an element is available to be
popped
- queueEmptyCondition_.wait(lock,
QueueNotEmpty<UnboundedBlockingQueue<T> >(*this));
+ if (isEmptyNoMutex() || isClosedNoMutex()) {
+ return false;
+ }
+
value = queue_.front();
queue_.pop_front();
- lock.unlock();
+ return true;
}
template <typename Duration>
@@ -79,6 +76,10 @@ class UnboundedBlockingQueue {
return false;
}
+ if (isEmptyNoMutex() || isClosedNoMutex()) {
+ return false;
+ }
+
value = queue_.front();
queue_.pop_front();
lock.unlock();
@@ -133,12 +134,20 @@ class UnboundedBlockingQueue {
iterator end() { return queue_.end(); }
+ void close() {
+ Lock lock(mutex_);
+ closed_ = true;
+ queueEmptyCondition_.notify_all();
+ }
+
private:
bool isEmptyNoMutex() const { return queue_.empty(); }
+ bool isClosedNoMutex() const { return closed_; }
mutable std::mutex mutex_;
std::condition_variable queueEmptyCondition_;
Container queue_;
+ bool closed_ = false;
typedef std::unique_lock<std::mutex> Lock;
friend struct QueueNotEmpty<UnboundedBlockingQueue<T> >;
diff --git a/pulsar-client-cpp/python/src/enums.cc
b/pulsar-client-cpp/python/src/enums.cc
index 1b21af585ed..92f08a16847 100644
--- a/pulsar-client-cpp/python/src/enums.cc
+++ b/pulsar-client-cpp/python/src/enums.cc
@@ -84,7 +84,8 @@ void export_enums() {
.value("TransactionConflict", ResultTransactionConflict)
.value("TransactionNotFound", ResultTransactionNotFound)
.value("ProducerFenced", ResultProducerFenced)
- .value("MemoryBufferIsFull", ResultMemoryBufferIsFull);
+ .value("MemoryBufferIsFull", ResultMemoryBufferIsFull)
+ .value("Interrupted", pulsar::ResultInterrupted);
enum_<SchemaType>("SchemaType", "Supported schema types")
.value("NONE", pulsar::NONE)
diff --git a/pulsar-client-cpp/python/src/exceptions.cc
b/pulsar-client-cpp/python/src/exceptions.cc
index 8835c5fc86b..efca661b7c0 100644
--- a/pulsar-client-cpp/python/src/exceptions.cc
+++ b/pulsar-client-cpp/python/src/exceptions.cc
@@ -108,4 +108,5 @@ void export_exceptions() {
exceptions[ResultTransactionNotFound] =
createExceptionClass("TransactionNotFound", basePulsarException);
exceptions[ResultProducerFenced] = createExceptionClass("ProducerFenced",
basePulsarException);
exceptions[ResultMemoryBufferIsFull] =
createExceptionClass("MemoryBufferIsFull", basePulsarException);
+ exceptions[ResultInterrupted] = createExceptionClass("Interrupted",
basePulsarException);
}
diff --git a/pulsar-client-cpp/tests/BlockingQueueTest.cc
b/pulsar-client-cpp/tests/BlockingQueueTest.cc
index 42644e9389c..94b0a1bbdfd 100644
--- a/pulsar-client-cpp/tests/BlockingQueueTest.cc
+++ b/pulsar-client-cpp/tests/BlockingQueueTest.cc
@@ -18,6 +18,7 @@
*/
#include <gtest/gtest.h>
#include <lib/BlockingQueue.h>
+#include <lib/Latch.h>
#include <future>
#include <iostream>
@@ -152,72 +153,6 @@ TEST(BlockingQueueTest, testTimeout) {
ASSERT_FALSE(popReturn);
}
-TEST(BlockingQueueTest, testReservedSpot) {
- size_t size = 3;
- BlockingQueue<int> queue(size);
-
- ASSERT_TRUE(queue.empty());
- ASSERT_FALSE(queue.full());
- ASSERT_EQ(0, queue.size());
-
- queue.push(1);
- ASSERT_FALSE(queue.empty());
- ASSERT_FALSE(queue.full());
- ASSERT_EQ(1, queue.size());
-
- BlockingQueue<int>::ReservedSpot spot1 = queue.reserve();
- ASSERT_FALSE(queue.empty());
- ASSERT_FALSE(queue.full());
- ASSERT_EQ(1, queue.size());
-
- queue.push(2, spot1);
-
- ASSERT_FALSE(queue.empty());
- ASSERT_FALSE(queue.full());
- ASSERT_EQ(2, queue.size());
-
- {
- BlockingQueue<int>::ReservedSpot spot2 = queue.reserve();
-
- ASSERT_FALSE(queue.empty());
- ASSERT_TRUE(queue.full());
- ASSERT_EQ(2, queue.size());
- }
-
- ASSERT_FALSE(queue.empty());
- ASSERT_FALSE(queue.full());
- ASSERT_EQ(2, queue.size());
-
- BlockingQueue<int>::ReservedSpot spot3 = queue.reserve();
-
- int res;
- queue.pop(res);
- ASSERT_EQ(1, res);
-
- ASSERT_FALSE(queue.empty());
- ASSERT_FALSE(queue.full());
- ASSERT_EQ(1, queue.size());
-
- queue.pop(res);
- ASSERT_EQ(2, res);
-
- ASSERT_TRUE(queue.empty());
- ASSERT_FALSE(queue.full());
- ASSERT_EQ(0, queue.size());
-
- spot3.release();
-
- {
- BlockingQueue<int>::ReservedSpot spot1 = queue.reserve();
- BlockingQueue<int>::ReservedSpot spot2 = queue.reserve();
- BlockingQueue<int>::ReservedSpot spot3 = queue.reserve();
-
- ASSERT_TRUE(queue.empty());
- ASSERT_TRUE(queue.full());
- ASSERT_EQ(0, queue.size());
- }
-}
-
TEST(BlockingQueueTest, testPushPopRace) {
auto test_logic = []() {
size_t size = 5;
@@ -254,3 +189,49 @@ TEST(BlockingQueueTest, testPushPopRace) {
ASSERT_EXIT(test_logic(), ::testing::ExitedWithCode(0), "Exiting");
}
+
+TEST(BlockingQueueTest, testCloseInterruptOnEmpty) {
+ BlockingQueue<int> queue(10);
+ pulsar::Latch latch(1);
+
+ auto thread = std::thread([&]() {
+ int v;
+ bool res = queue.pop(v);
+ ASSERT_FALSE(res);
+ latch.countdown();
+ });
+
+ // Sleep to allow for background thread to call pop and be blocked there
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ queue.close();
+ bool wasUnblocked = latch.wait(std::chrono::seconds(5));
+
+ ASSERT_TRUE(wasUnblocked);
+ thread.join();
+}
+
+TEST(BlockingQueueTest, testCloseInterruptOnFull) {
+ BlockingQueue<int> queue(10);
+ pulsar::Latch latch(1);
+
+ auto thread = std::thread([&]() {
+ int i = 0;
+ while (true) {
+ bool res = queue.push(i++);
+ if (!res) {
+ latch.countdown();
+ return;
+ }
+ }
+ });
+
+ // Sleep to allow for background thread to fill the queue and be blocked
there
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ queue.close();
+ bool wasUnblocked = latch.wait(std::chrono::seconds(5));
+
+ ASSERT_TRUE(wasUnblocked);
+ thread.join();
+}
diff --git a/pulsar-client-cpp/tests/SemaphoreTest.cc
b/pulsar-client-cpp/tests/SemaphoreTest.cc
index c62fff3dfb4..0cdec79feda 100644
--- a/pulsar-client-cpp/tests/SemaphoreTest.cc
+++ b/pulsar-client-cpp/tests/SemaphoreTest.cc
@@ -126,4 +126,25 @@ TEST(SemaphoreTest, testSingleRelease) {
t1.join();
t2.join();
t3.join();
-}
\ No newline at end of file
+}
+
+TEST(SemaphoreTest, testCloseInterruptOnFull) {
+ Semaphore s(100);
+ s.acquire(100);
+ Latch latch(1);
+
+ auto thread = std::thread([&]() {
+ bool res = s.acquire(1);
+ ASSERT_FALSE(res);
+ latch.countdown();
+ });
+
+ // Sleep to allow for background thread to fill the queue and be blocked
there
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ s.close();
+ bool wasUnblocked = latch.wait(std::chrono::seconds(5));
+
+ ASSERT_TRUE(wasUnblocked);
+ thread.join();
+}
diff --git a/pulsar-client-cpp/tests/UnboundedBlockingQueueTest.cc
b/pulsar-client-cpp/tests/UnboundedBlockingQueueTest.cc
new file mode 100644
index 00000000000..819c22e59ab
--- /dev/null
+++ b/pulsar-client-cpp/tests/UnboundedBlockingQueueTest.cc
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <lib/UnboundedBlockingQueue.h>
+#include <lib/Latch.h>
+
+#include <future>
+#include <thread>
+
+class UnboundedProducerWorker {
+ private:
+ std::thread producerThread_;
+ UnboundedBlockingQueue<int>& queue_;
+
+ public:
+ UnboundedProducerWorker(UnboundedBlockingQueue<int>& queue) :
queue_(queue) {}
+
+ void produce(int number) {
+ producerThread_ = std::thread(&UnboundedProducerWorker::pushNumbers,
this, number);
+ }
+
+ void pushNumbers(int number) {
+ for (int i = 1; i <= number; i++) {
+ queue_.push(i);
+ }
+ }
+
+ void join() { producerThread_.join(); }
+};
+
+class UnboundedConsumerWorker {
+ private:
+ std::thread consumerThread_;
+ UnboundedBlockingQueue<int>& queue_;
+
+ public:
+ UnboundedConsumerWorker(UnboundedBlockingQueue<int>& queue) :
queue_(queue) {}
+
+ void consume(int number) {
+ consumerThread_ = std::thread(&UnboundedConsumerWorker::popNumbers,
this, number);
+ }
+
+ void popNumbers(int number) {
+ for (int i = 1; i <= number; i++) {
+ int poppedElement;
+ queue_.pop(poppedElement);
+ }
+ }
+
+ void join() { consumerThread_.join(); }
+};
+
+TEST(UnboundedBlockingQueueTest, testBasic) {
+ size_t size = 5;
+ UnboundedBlockingQueue<int> queue(size);
+
+ UnboundedProducerWorker producerWorker(queue);
+ producerWorker.produce(5);
+
+ UnboundedConsumerWorker consumerWorker(queue);
+ consumerWorker.consume(5);
+
+ producerWorker.join();
+ consumerWorker.join();
+
+ size_t zero = 0;
+ ASSERT_EQ(zero, queue.size());
+}
+
+TEST(UnboundedBlockingQueueTest, testQueueOperations) {
+ size_t size = 5;
+ UnboundedBlockingQueue<int> queue(size);
+ for (size_t i = 1; i <= size; i++) {
+ queue.push(i);
+ }
+ ASSERT_EQ(queue.size(), size);
+
+ int cnt = 1;
+ for (BlockingQueue<int>::const_iterator it = queue.begin(); it !=
queue.end(); it++) {
+ ASSERT_EQ(cnt, *it);
+ ++cnt;
+ }
+
+ cnt = 1;
+ for (BlockingQueue<int>::iterator it = queue.begin(); it != queue.end();
it++) {
+ ASSERT_EQ(cnt, *it);
+ ++cnt;
+ }
+
+ int poppedElement;
+ for (size_t i = 1; i <= size; i++) {
+ queue.pop(poppedElement);
+ }
+
+ ASSERT_FALSE(queue.peek(poppedElement));
+}
+
+TEST(UnboundedBlockingQueueTest, testBlockingProducer) {
+ size_t size = 5;
+ UnboundedBlockingQueue<int> queue(size);
+
+ UnboundedProducerWorker producerWorker(queue);
+ producerWorker.produce(8);
+
+ UnboundedConsumerWorker consumerWorker(queue);
+ consumerWorker.consume(5);
+
+ producerWorker.join();
+ consumerWorker.join();
+
+ size_t three = 3;
+ ASSERT_EQ(three, queue.size());
+}
+
+TEST(UnboundedBlockingQueueTest, testBlockingConsumer) {
+ size_t size = 5;
+ UnboundedBlockingQueue<int> queue(size);
+
+ UnboundedProducerWorker producerWorker(queue);
+ producerWorker.produce(5);
+
+ UnboundedConsumerWorker consumerWorker(queue);
+ consumerWorker.consume(8);
+
+ producerWorker.pushNumbers(3);
+
+ producerWorker.join();
+ consumerWorker.join();
+
+ size_t zero = 0;
+ ASSERT_EQ(zero, queue.size());
+}
+
+TEST(UnboundedBlockingQueueTest, testTimeout) {
+ size_t size = 5;
+ UnboundedBlockingQueue<int> queue(size);
+ int value;
+ bool popReturn = queue.pop(value, std::chrono::seconds(1));
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+ ASSERT_FALSE(popReturn);
+}
+
+TEST(UnboundedBlockingQueueTest, testCloseInterruptOnEmpty) {
+ UnboundedBlockingQueue<int> queue(10);
+ pulsar::Latch latch(1);
+
+ auto thread = std::thread([&]() {
+ int v;
+ bool res = queue.pop(v);
+ ASSERT_FALSE(res);
+ latch.countdown();
+ });
+
+ // Sleep to allow for background thread to call pop and be blocked there
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ queue.close();
+ bool wasUnblocked = latch.wait(std::chrono::seconds(5));
+
+ ASSERT_TRUE(wasUnblocked);
+ thread.join();
+}