This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b40749dd4d [C++] Unblock all threads when Pulsar client is closed 
(#15726)
6b40749dd4d is described below

commit 6b40749dd4d6be5ea93fc987986a65209c5a8a56
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 24 10:44:20 2022 -0700

    [C++] Unblock all threads when Pulsar client is closed (#15726)
    
    * [C++] Unblock all threads when Pulsar client is closed
    
    * Fixed formatting
    
    * Added unit tests for UnboundedBlockingQueue
    
    * Addressed comments
    
    * Fixed indentation
---
 pulsar-client-cpp/include/pulsar/Result.h          |   2 +
 pulsar-client-cpp/include/pulsar/c/result.h        |   3 +-
 pulsar-client-cpp/lib/BlockingQueue.h              | 155 +++---------------
 pulsar-client-cpp/lib/ClientImpl.cc                |   2 +
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  11 +-
 pulsar-client-cpp/lib/ExecutorService.cc           |   4 +-
 pulsar-client-cpp/lib/MemoryLimitController.cc     |  15 +-
 pulsar-client-cpp/lib/MemoryLimitController.h      |   5 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |   9 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |   3 +-
 pulsar-client-cpp/lib/ProducerImpl.cc              |  18 ++-
 pulsar-client-cpp/lib/Result.cc                    |   3 +
 pulsar-client-cpp/lib/Semaphore.cc                 |  16 +-
 pulsar-client-cpp/lib/Semaphore.h                  |   5 +-
 pulsar-client-cpp/lib/UnboundedBlockingQueue.h     |  27 ++--
 pulsar-client-cpp/python/src/enums.cc              |   3 +-
 pulsar-client-cpp/python/src/exceptions.cc         |   1 +
 pulsar-client-cpp/tests/BlockingQueueTest.cc       | 113 ++++++-------
 pulsar-client-cpp/tests/SemaphoreTest.cc           |  23 ++-
 .../tests/UnboundedBlockingQueueTest.cc            | 178 +++++++++++++++++++++
 20 files changed, 367 insertions(+), 229 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Result.h 
b/pulsar-client-cpp/include/pulsar/Result.h
index 91f68584955..a7fdd04f99a 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -88,6 +88,8 @@ enum Result
     ResultProducerFenced,                            /// Producer was fenced 
by broker
 
     ResultMemoryBufferIsFull,  /// Client-wide memory limit has been reached
+
+    ResultInterrupted,  /// Interrupted while waiting to dequeue
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/include/pulsar/c/result.h 
b/pulsar-client-cpp/include/pulsar/c/result.h
index ea9321cfd84..a95d1e5862f 100644
--- a/pulsar-client-cpp/include/pulsar/c/result.h
+++ b/pulsar-client-cpp/include/pulsar/c/result.h
@@ -90,7 +90,8 @@ typedef enum
     pulsar_result_TransactionNotFound,                  /// Transaction not 
found
     pulsar_result_ProducerFenced,                       /// Producer was 
fenced by broker
 
-    pulsar_result_MemoryBufferIsFull  /// Client-wide memory limit has been 
reached
+    pulsar_result_MemoryBufferIsFull,  /// Client-wide memory limit has been 
reached
+    pulsar_result_Interrupted,         /// Interrupted while waiting to dequeue
 } pulsar_result;
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/lib/BlockingQueue.h 
b/pulsar-client-cpp/lib/BlockingQueue.h
index 2814c4f20c5..d09166fdf26 100644
--- a/pulsar-client-cpp/lib/BlockingQueue.h
+++ b/pulsar-client-cpp/lib/BlockingQueue.h
@@ -33,14 +33,14 @@ template <typename Container>
 struct QueueNotEmpty {
     const Container& queue_;
     QueueNotEmpty(const Container& queue) : queue_(queue) {}
-    bool operator()() const { return !queue_.isEmptyNoMutex(); }
+    bool operator()() const { return !queue_.isEmptyNoMutex() || 
queue_.isClosedNoMutex(); }
 };
 
 template <typename Container>
 struct QueueNotFull {
     const Container& queue_;
     QueueNotFull(const Container& queue) : queue_(queue) {}
-    bool operator()() const { return !queue_.isFullNoMutex(); }
+    bool operator()() const { return !queue_.isFullNoMutex() || 
queue_.isClosedNoMutex(); }
 };
 
 template <typename T>
@@ -50,117 +50,21 @@ class BlockingQueue {
     typedef typename Container::iterator iterator;
     typedef typename Container::const_iterator const_iterator;
 
-    class ReservedSpot {
-       public:
-        ReservedSpot() : queue_(), released_(true) {}
+    BlockingQueue(size_t maxSize) : maxSize_(maxSize), mutex_(), 
queue_(maxSize) {}
 
-        ReservedSpot(BlockingQueue<T>& queue) : queue_(&queue), 
released_(false) {}
-
-        ~ReservedSpot() { release(); }
-
-        void release() {
-            if (!released_) {
-                queue_->releaseReservedSpot();
-                released_ = true;
-            }
-        }
-
-       private:
-        BlockingQueue<T>* queue_;
-        bool released_;
-
-        friend class BlockingQueue<T>;
-    };
-
-    BlockingQueue(size_t maxSize) : maxSize_(maxSize), mutex_(), 
queue_(maxSize), reservedSpots_(0) {}
-
-    bool tryReserve(size_t noOfSpots) {
-        assert(noOfSpots <= maxSize_);
-        Lock lock(mutex_);
-        if (noOfSpots <= maxSize_ - (reservedSpots_ + queue_.size())) {
-            reservedSpots_ += noOfSpots;
-            return true;
-        }
-        return false;
-    }
-
-    void reserve(size_t noOfSpots) {
-        assert(noOfSpots <= maxSize_);
-        Lock lock(mutex_);
-        while (noOfSpots--) {
-            queueFullCondition.wait(lock, QueueNotFull<BlockingQueue<T> 
>(*this));
-            reservedSpots_++;
-        }
-    }
-
-    void release(size_t noOfSpots) {
-        Lock lock(mutex_);
-        assert(noOfSpots <= reservedSpots_);
-        bool wasFull = isFullNoMutex();
-        reservedSpots_ -= noOfSpots;
-        lock.unlock();
-
-        if (wasFull) {
-            // Notify that one spot is now available
-            queueFullCondition.notify_all();
-        }
-    }
-
-    ReservedSpot reserve() {
+    bool push(const T& value) {
         Lock lock(mutex_);
-        // If the queue is full, wait for space to be available
-        queueFullCondition.wait(lock, QueueNotFull<BlockingQueue<T> >(*this));
-        reservedSpots_++;
-        return ReservedSpot(*this);
-    }
 
-    void push(const T& value, bool wasReserved = false) {
-        Lock lock(mutex_);
-        if (wasReserved) {
-            reservedSpots_--;
-        }
         // If the queue is full, wait for space to be available
         queueFullCondition.wait(lock, QueueNotFull<BlockingQueue<T> >(*this));
-        bool wasEmpty = queue_.empty();
-        queue_.push_back(value);
-        lock.unlock();
-        if (wasEmpty) {
-            // Notify that an element is pushed
-            queueEmptyCondition.notify_all();
-        }
-    }
-
-    void push(const T& value, ReservedSpot& spot) {
-        Lock lock(mutex_);
-
-        // Since the value already had a spot reserved in the queue, we need to
-        // discount it
-        assert(reservedSpots_ > 0);
-        reservedSpots_--;
-        spot.released_ = true;
-
-        bool wasEmpty = queue_.empty();
-        queue_.push_back(value);
-        lock.unlock();
 
-        if (wasEmpty) {
-            // Notify that an element is pushed
-            queueEmptyCondition.notify_all();
-        }
-    }
-
-    bool tryPush(const T& value) {
-        Lock lock(mutex_);
-
-        // Need to consider queue_.size() + reserved spot
-        if (isFullNoMutex()) {
+        if (isClosedNoMutex()) {
             return false;
         }
 
         bool wasEmpty = queue_.empty();
         queue_.push_back(value);
         lock.unlock();
-
         if (wasEmpty) {
             // Notify that an element is pushed
             queueEmptyCondition.notify_all();
@@ -169,35 +73,28 @@ class BlockingQueue {
         return true;
     }
 
-    void pop() {
+    bool pop(T& value) {
         Lock lock(mutex_);
+
         // If the queue is empty, wait until an element is available to be 
popped
         queueEmptyCondition.wait(lock, QueueNotEmpty<BlockingQueue<T> 
>(*this));
 
-        bool wasFull = isFullNoMutex();
-        queue_.pop_front();
-        lock.unlock();
-
-        if (wasFull) {
-            // Notify that an element is popped
-            queueFullCondition.notify_all();
+        if (isClosedNoMutex()) {
+            return false;
         }
-    }
 
-    void pop(T& value) {
-        Lock lock(mutex_);
-        // If the queue is empty, wait until an element is available to be 
popped
-        queueEmptyCondition.wait(lock, QueueNotEmpty<BlockingQueue<T> 
>(*this));
         value = queue_.front();
-
         bool wasFull = isFullNoMutex();
         queue_.pop_front();
+
         lock.unlock();
 
         if (wasFull) {
             // Notify that an element is popped
             queueFullCondition.notify_all();
         }
+
+        return true;
     }
 
     template <typename Duration>
@@ -207,6 +104,10 @@ class BlockingQueue {
             return false;
         }
 
+        if (isClosedNoMutex()) {
+            return false;
+        }
+
         bool wasFull = isFullNoMutex();
         value = queue_.front();
         queue_.pop_front();
@@ -263,34 +164,28 @@ class BlockingQueue {
 
     iterator end() { return queue_.end(); }
 
-    int reservedSpots() const { return reservedSpots_; }
-
-   private:
-    void releaseReservedSpot() {
+    void close() {
         Lock lock(mutex_);
-        bool wasFull = isFullNoMutex();
-        --reservedSpots_;
-        lock.unlock();
-
-        if (wasFull) {
-            // Notify that one spot is now available
-            queueFullCondition.notify_all();
-        }
+        isClosed_ = true;
+        queueEmptyCondition.notify_all();
+        queueFullCondition.notify_all();
     }
 
+   private:
     bool isEmptyNoMutex() const { return queue_.empty(); }
 
-    bool isFullNoMutex() const { return (queue_.size() + reservedSpots_) == 
maxSize_; }
+    bool isFullNoMutex() const { return queue_.size() == maxSize_; }
+
+    bool isClosedNoMutex() const { return isClosed_; }
 
     const size_t maxSize_;
     mutable std::mutex mutex_;
     std::condition_variable queueFullCondition;
     std::condition_variable queueEmptyCondition;
     Container queue_;
-    int reservedSpots_;
+    bool isClosed_ = false;
 
     typedef std::unique_lock<std::mutex> Lock;
-    friend class QueueReservedSpot;
     friend struct QueueNotEmpty<BlockingQueue<T> >;
     friend struct QueueNotFull<BlockingQueue<T> >;
 };
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc 
b/pulsar-client-cpp/lib/ClientImpl.cc
index 931fadeaae7..ffd8dac74a4 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -493,6 +493,8 @@ void ClientImpl::closeAsync(CloseCallback callback) {
     state_ = Closing;
     lock.unlock();
 
+    memoryLimitController_.close();
+
     SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + 
consumers.size());
     LOG_INFO("Closing Pulsar client with " << producers.size() << " producers 
and " << consumers.size()
                                            << " consumers");
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index d3ba47f3a80..63d2afc29bc 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -722,7 +722,10 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& 
msg) {
     sendFlowPermitsToBroker(currentCnx, 1);
 
     while (true) {
-        incomingMessages_.pop(msg);
+        if (!incomingMessages_.pop(msg)) {
+            return ResultInterrupted;
+        }
+
         {
             // Lock needed to prevent race between connectionOpened and the 
check "msg.impl_->cnx_ ==
             // currentCnx.get())"
@@ -787,7 +790,10 @@ Result ConsumerImpl::receiveHelper(Message& msg) {
         return fetchSingleMessageFromBroker(msg);
     }
 
-    incomingMessages_.pop(msg);
+    if (!incomingMessages_.pop(msg)) {
+        return ResultInterrupted;
+    }
+
     messageProcessed(msg);
     return ResultOk;
 }
@@ -998,6 +1004,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
 
     LOG_INFO(getName() << "Closing consumer for topic " << topic_);
     state_ = Closing;
+    incomingMessages_.close();
 
     // Flush pending grouped ACK requests.
     if (ackGroupingTrackerPtr_) {
diff --git a/pulsar-client-cpp/lib/ExecutorService.cc 
b/pulsar-client-cpp/lib/ExecutorService.cc
index b9b5ed46478..a7390f19cea 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -38,13 +38,13 @@ void ExecutorService::start() {
         if (self->isClosed()) {
             return;
         }
-        LOG_INFO("Run io_service in a single thread");
+        LOG_DEBUG("Run io_service in a single thread");
         boost::system::error_code ec;
         self->getIOService().run(ec);
         if (ec) {
             LOG_ERROR("Failed to run io_service: " << ec.message());
         } else {
-            LOG_INFO("Event loop of ExecutorService exits successfully");
+            LOG_DEBUG("Event loop of ExecutorService exits successfully");
         }
         self->ioServiceDone_ = true;
         self->cond_.notify_all();
diff --git a/pulsar-client-cpp/lib/MemoryLimitController.cc 
b/pulsar-client-cpp/lib/MemoryLimitController.cc
index 4a23f8b1a1d..f55da8e68a4 100644
--- a/pulsar-client-cpp/lib/MemoryLimitController.cc
+++ b/pulsar-client-cpp/lib/MemoryLimitController.cc
@@ -45,16 +45,23 @@ bool MemoryLimitController::tryReserveMemory(uint64_t size) 
{
     }
 }
 
-void MemoryLimitController::reserveMemory(uint64_t size) {
+bool MemoryLimitController::reserveMemory(uint64_t size) {
     if (!tryReserveMemory(size)) {
         std::unique_lock<std::mutex> lock(mutex_);
 
         // Check again, while holding the lock, to ensure we reserve attempt 
and the waiting for the condition
         // are synchronized.
         while (!tryReserveMemory(size)) {
+            if (isClosed_) {
+                // Interrupt the waiting if the client is closing
+                return false;
+            }
+
             condition_.wait(lock);
         }
     }
+
+    return true;
 }
 
 void MemoryLimitController::releaseMemory(uint64_t size) {
@@ -70,4 +77,10 @@ void MemoryLimitController::releaseMemory(uint64_t size) {
 
 uint64_t MemoryLimitController::currentUsage() const { return currentUsage_; }
 
+void MemoryLimitController::close() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    isClosed_ = true;
+    condition_.notify_all();
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MemoryLimitController.h 
b/pulsar-client-cpp/lib/MemoryLimitController.h
index e70a248fed7..38987ea0b68 100644
--- a/pulsar-client-cpp/lib/MemoryLimitController.h
+++ b/pulsar-client-cpp/lib/MemoryLimitController.h
@@ -30,15 +30,18 @@ class MemoryLimitController {
    public:
     explicit MemoryLimitController(uint64_t memoryLimit);
     bool tryReserveMemory(uint64_t size);
-    void reserveMemory(uint64_t size);
+    bool reserveMemory(uint64_t size);
     void releaseMemory(uint64_t size);
     uint64_t currentUsage() const;
 
+    void close();
+
    private:
     const uint64_t memoryLimit_;
     std::atomic<uint64_t> currentUsage_;
     std::mutex mutex_;
     std::condition_variable condition_;
+    bool isClosed_ = false;
 };
 
 }  // namespace pulsar
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 0ae86d5879a..0ad9d606810 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -380,7 +380,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback 
callback) {
         return;
     }
 
-    // fail pending recieve
+    // fail pending receive
     failPendingReceiveCallback();
 }
 
@@ -436,8 +436,8 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer 
consumer, const Message&
         if (messages_.full()) {
             lock.unlock();
         }
-        messages_.push(msg);
-        if (messageListener_) {
+
+        if (messages_.push(msg) && messageListener_) {
             unAckedMessageTrackerPtr_->add(msg.getMessageId());
             listenerExecutor_->postWork(
                 std::bind(&MultiTopicsConsumerImpl::internalListener, 
shared_from_this(), consumer));
@@ -520,6 +520,9 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& 
callback) {
 
 void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
     Message msg;
+
+    messages_.close();
+
     Lock lock(pendingReceiveMutex_);
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index e43b5090e43..23288e2f811 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -417,8 +417,7 @@ void PartitionedConsumerImpl::messageReceived(Consumer 
consumer, const Message&
         if (messages_.full()) {
             lock.unlock();
         }
-        messages_.push(msg);
-        if (messageListener_) {
+        if (messages_.push(msg) && messageListener_) {
             unAckedMessageTrackerPtr_->add(msg.getMessageId());
             listenerExecutor_->postWork(
                 std::bind(&PartitionedConsumerImpl::internalListener, 
shared_from_this(), consumer));
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index a539889ac00..229f7375c5e 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -525,16 +525,16 @@ int ProducerImpl::getNumOfChunks(uint32_t size, uint32_t 
maxMessageSize) {
 
 Result ProducerImpl::canEnqueueRequest(uint32_t payloadSize) {
     if (conf_.getBlockIfQueueFull()) {
-        if (semaphore_) {
-            semaphore_->acquire();
+        if (semaphore_ && !semaphore_->acquire()) {
+            return ResultInterrupted;
+        }
+        if (!memoryLimitController_.reserveMemory(payloadSize)) {
+            return ResultInterrupted;
         }
-        memoryLimitController_.reserveMemory(payloadSize);
         return ResultOk;
     } else {
-        if (semaphore_) {
-            if (!semaphore_->tryAcquire()) {
-                return ResultProducerQueueIsFull;
-            }
+        if (semaphore_ && !semaphore_->tryAcquire()) {
+            return ResultProducerQueueIsFull;
         }
 
         if (!memoryLimitController_.tryReserveMemory(payloadSize)) {
@@ -646,6 +646,10 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
 
     cancelTimers();
 
+    if (semaphore_) {
+        semaphore_->close();
+    }
+
     // ensure any remaining send callbacks are called before calling the close 
callback
     failPendingMessages(ResultAlreadyClosed, false);
 
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index 3feca2e83dc..d3322aa3805 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -159,6 +159,9 @@ const char* strResult(Result result) {
 
         case ResultMemoryBufferIsFull:
             return "ResultMemoryBufferIsFull";
+
+        case ResultInterrupted:
+            return "ResultInterrupted";
     };
     // NOTE : Do not add default case in the switch above. In future if we get 
new cases for
     // ServerError and miss them in the switch above we would like to get 
notified. Adding
diff --git a/pulsar-client-cpp/lib/Semaphore.cc 
b/pulsar-client-cpp/lib/Semaphore.cc
index e11bef28fe3..40d4e46cd1a 100644
--- a/pulsar-client-cpp/lib/Semaphore.cc
+++ b/pulsar-client-cpp/lib/Semaphore.cc
@@ -34,19 +34,25 @@ bool Semaphore::tryAcquire(int n) {
     }
 }
 
-void Semaphore::acquire(int n) {
+bool Semaphore::acquire(int n) {
     std::unique_lock<std::mutex> lock(mutex_);
 
     while (currentUsage_ + n > limit_) {
+        if (isClosed_) {
+            return false;
+        }
         condition_.wait(lock);
     }
 
     currentUsage_ += n;
+    return true;
 }
 
 void Semaphore::release(int n) {
-    std::lock_guard<std::mutex> lock(mutex_);
+    std::unique_lock<std::mutex> lock(mutex_);
     currentUsage_ -= n;
+    lock.unlock();
+
     if (n == 1) {
         condition_.notify_one();
     } else {
@@ -59,4 +65,10 @@ uint32_t Semaphore::currentUsage() const {
     return currentUsage_;
 }
 
+void Semaphore::close() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    isClosed_ = true;
+    condition_.notify_all();
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/Semaphore.h 
b/pulsar-client-cpp/lib/Semaphore.h
index 4186c4dfe79..dcef2ad5b74 100644
--- a/pulsar-client-cpp/lib/Semaphore.h
+++ b/pulsar-client-cpp/lib/Semaphore.h
@@ -31,15 +31,18 @@ class Semaphore {
     explicit Semaphore(uint32_t limit);
 
     bool tryAcquire(int n = 1);
-    void acquire(int n = 1);
+    bool acquire(int n = 1);
     void release(int n = 1);
     uint32_t currentUsage() const;
 
+    void close();
+
    private:
     const uint32_t limit_;
     uint32_t currentUsage_;
     mutable std::mutex mutex_;
     std::condition_variable condition_;
+    bool isClosed_ = false;
 };
 
 }  // namespace pulsar
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/UnboundedBlockingQueue.h 
b/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
index 841be30e1e4..0f7fc2a33af 100644
--- a/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
+++ b/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
@@ -55,21 +55,18 @@ class UnboundedBlockingQueue {
         }
     }
 
-    void pop() {
+    bool pop(T& value) {
         Lock lock(mutex_);
         // If the queue is empty, wait until an element is available to be 
popped
         queueEmptyCondition_.wait(lock, 
QueueNotEmpty<UnboundedBlockingQueue<T> >(*this));
-        queue_.pop_front();
-        lock.unlock();
-    }
 
-    void pop(T& value) {
-        Lock lock(mutex_);
-        // If the queue is empty, wait until an element is available to be 
popped
-        queueEmptyCondition_.wait(lock, 
QueueNotEmpty<UnboundedBlockingQueue<T> >(*this));
+        if (isEmptyNoMutex() || isClosedNoMutex()) {
+            return false;
+        }
+
         value = queue_.front();
         queue_.pop_front();
-        lock.unlock();
+        return true;
     }
 
     template <typename Duration>
@@ -79,6 +76,10 @@ class UnboundedBlockingQueue {
             return false;
         }
 
+        if (isEmptyNoMutex() || isClosedNoMutex()) {
+            return false;
+        }
+
         value = queue_.front();
         queue_.pop_front();
         lock.unlock();
@@ -133,12 +134,20 @@ class UnboundedBlockingQueue {
 
     iterator end() { return queue_.end(); }
 
+    void close() {
+        Lock lock(mutex_);
+        closed_ = true;
+        queueEmptyCondition_.notify_all();
+    }
+
    private:
     bool isEmptyNoMutex() const { return queue_.empty(); }
+    bool isClosedNoMutex() const { return closed_; }
 
     mutable std::mutex mutex_;
     std::condition_variable queueEmptyCondition_;
     Container queue_;
+    bool closed_ = false;
 
     typedef std::unique_lock<std::mutex> Lock;
     friend struct QueueNotEmpty<UnboundedBlockingQueue<T> >;
diff --git a/pulsar-client-cpp/python/src/enums.cc 
b/pulsar-client-cpp/python/src/enums.cc
index 1b21af585ed..92f08a16847 100644
--- a/pulsar-client-cpp/python/src/enums.cc
+++ b/pulsar-client-cpp/python/src/enums.cc
@@ -84,7 +84,8 @@ void export_enums() {
         .value("TransactionConflict", ResultTransactionConflict)
         .value("TransactionNotFound", ResultTransactionNotFound)
         .value("ProducerFenced", ResultProducerFenced)
-        .value("MemoryBufferIsFull", ResultMemoryBufferIsFull);
+        .value("MemoryBufferIsFull", ResultMemoryBufferIsFull)
+        .value("Interrupted", pulsar::ResultInterrupted);
 
     enum_<SchemaType>("SchemaType", "Supported schema types")
         .value("NONE", pulsar::NONE)
diff --git a/pulsar-client-cpp/python/src/exceptions.cc 
b/pulsar-client-cpp/python/src/exceptions.cc
index 8835c5fc86b..efca661b7c0 100644
--- a/pulsar-client-cpp/python/src/exceptions.cc
+++ b/pulsar-client-cpp/python/src/exceptions.cc
@@ -108,4 +108,5 @@ void export_exceptions() {
     exceptions[ResultTransactionNotFound] = 
createExceptionClass("TransactionNotFound", basePulsarException);
     exceptions[ResultProducerFenced] = createExceptionClass("ProducerFenced", 
basePulsarException);
     exceptions[ResultMemoryBufferIsFull] = 
createExceptionClass("MemoryBufferIsFull", basePulsarException);
+    exceptions[ResultInterrupted] = createExceptionClass("Interrupted", 
basePulsarException);
 }
diff --git a/pulsar-client-cpp/tests/BlockingQueueTest.cc 
b/pulsar-client-cpp/tests/BlockingQueueTest.cc
index 42644e9389c..94b0a1bbdfd 100644
--- a/pulsar-client-cpp/tests/BlockingQueueTest.cc
+++ b/pulsar-client-cpp/tests/BlockingQueueTest.cc
@@ -18,6 +18,7 @@
  */
 #include <gtest/gtest.h>
 #include <lib/BlockingQueue.h>
+#include <lib/Latch.h>
 
 #include <future>
 #include <iostream>
@@ -152,72 +153,6 @@ TEST(BlockingQueueTest, testTimeout) {
     ASSERT_FALSE(popReturn);
 }
 
-TEST(BlockingQueueTest, testReservedSpot) {
-    size_t size = 3;
-    BlockingQueue<int> queue(size);
-
-    ASSERT_TRUE(queue.empty());
-    ASSERT_FALSE(queue.full());
-    ASSERT_EQ(0, queue.size());
-
-    queue.push(1);
-    ASSERT_FALSE(queue.empty());
-    ASSERT_FALSE(queue.full());
-    ASSERT_EQ(1, queue.size());
-
-    BlockingQueue<int>::ReservedSpot spot1 = queue.reserve();
-    ASSERT_FALSE(queue.empty());
-    ASSERT_FALSE(queue.full());
-    ASSERT_EQ(1, queue.size());
-
-    queue.push(2, spot1);
-
-    ASSERT_FALSE(queue.empty());
-    ASSERT_FALSE(queue.full());
-    ASSERT_EQ(2, queue.size());
-
-    {
-        BlockingQueue<int>::ReservedSpot spot2 = queue.reserve();
-
-        ASSERT_FALSE(queue.empty());
-        ASSERT_TRUE(queue.full());
-        ASSERT_EQ(2, queue.size());
-    }
-
-    ASSERT_FALSE(queue.empty());
-    ASSERT_FALSE(queue.full());
-    ASSERT_EQ(2, queue.size());
-
-    BlockingQueue<int>::ReservedSpot spot3 = queue.reserve();
-
-    int res;
-    queue.pop(res);
-    ASSERT_EQ(1, res);
-
-    ASSERT_FALSE(queue.empty());
-    ASSERT_FALSE(queue.full());
-    ASSERT_EQ(1, queue.size());
-
-    queue.pop(res);
-    ASSERT_EQ(2, res);
-
-    ASSERT_TRUE(queue.empty());
-    ASSERT_FALSE(queue.full());
-    ASSERT_EQ(0, queue.size());
-
-    spot3.release();
-
-    {
-        BlockingQueue<int>::ReservedSpot spot1 = queue.reserve();
-        BlockingQueue<int>::ReservedSpot spot2 = queue.reserve();
-        BlockingQueue<int>::ReservedSpot spot3 = queue.reserve();
-
-        ASSERT_TRUE(queue.empty());
-        ASSERT_TRUE(queue.full());
-        ASSERT_EQ(0, queue.size());
-    }
-}
-
 TEST(BlockingQueueTest, testPushPopRace) {
     auto test_logic = []() {
         size_t size = 5;
@@ -254,3 +189,49 @@ TEST(BlockingQueueTest, testPushPopRace) {
 
     ASSERT_EXIT(test_logic(), ::testing::ExitedWithCode(0), "Exiting");
 }
+
+TEST(BlockingQueueTest, testCloseInterruptOnEmpty) {
+    BlockingQueue<int> queue(10);
+    pulsar::Latch latch(1);
+
+    auto thread = std::thread([&]() {
+        int v;
+        bool res = queue.pop(v);
+        ASSERT_FALSE(res);
+        latch.countdown();
+    });
+
+    // Sleep to allow for background thread to call pop and be blocked there
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    queue.close();
+    bool wasUnblocked = latch.wait(std::chrono::seconds(5));
+
+    ASSERT_TRUE(wasUnblocked);
+    thread.join();
+}
+
+TEST(BlockingQueueTest, testCloseInterruptOnFull) {
+    BlockingQueue<int> queue(10);
+    pulsar::Latch latch(1);
+
+    auto thread = std::thread([&]() {
+        int i = 0;
+        while (true) {
+            bool res = queue.push(i++);
+            if (!res) {
+                latch.countdown();
+                return;
+            }
+        }
+    });
+
+    // Sleep to allow for background thread to fill the queue and be blocked 
there
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    queue.close();
+    bool wasUnblocked = latch.wait(std::chrono::seconds(5));
+
+    ASSERT_TRUE(wasUnblocked);
+    thread.join();
+}
diff --git a/pulsar-client-cpp/tests/SemaphoreTest.cc 
b/pulsar-client-cpp/tests/SemaphoreTest.cc
index c62fff3dfb4..0cdec79feda 100644
--- a/pulsar-client-cpp/tests/SemaphoreTest.cc
+++ b/pulsar-client-cpp/tests/SemaphoreTest.cc
@@ -126,4 +126,25 @@ TEST(SemaphoreTest, testSingleRelease) {
     t1.join();
     t2.join();
     t3.join();
-}
\ No newline at end of file
+}
+
+TEST(SemaphoreTest, testCloseInterruptOnFull) {
+    Semaphore s(100);
+    s.acquire(100);
+    Latch latch(1);
+
+    auto thread = std::thread([&]() {
+        bool res = s.acquire(1);
+        ASSERT_FALSE(res);
+        latch.countdown();
+    });
+
+    // Sleep to allow for background thread to fill the queue and be blocked 
there
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    s.close();
+    bool wasUnblocked = latch.wait(std::chrono::seconds(5));
+
+    ASSERT_TRUE(wasUnblocked);
+    thread.join();
+}
diff --git a/pulsar-client-cpp/tests/UnboundedBlockingQueueTest.cc 
b/pulsar-client-cpp/tests/UnboundedBlockingQueueTest.cc
new file mode 100644
index 00000000000..819c22e59ab
--- /dev/null
+++ b/pulsar-client-cpp/tests/UnboundedBlockingQueueTest.cc
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <lib/UnboundedBlockingQueue.h>
+#include <lib/Latch.h>
+
+#include <future>
+#include <thread>
+
+class UnboundedProducerWorker {
+   private:
+    std::thread producerThread_;
+    UnboundedBlockingQueue<int>& queue_;
+
+   public:
+    UnboundedProducerWorker(UnboundedBlockingQueue<int>& queue) : 
queue_(queue) {}
+
+    void produce(int number) {
+        producerThread_ = std::thread(&UnboundedProducerWorker::pushNumbers, 
this, number);
+    }
+
+    void pushNumbers(int number) {
+        for (int i = 1; i <= number; i++) {
+            queue_.push(i);
+        }
+    }
+
+    void join() { producerThread_.join(); }
+};
+
+class UnboundedConsumerWorker {
+   private:
+    std::thread consumerThread_;
+    UnboundedBlockingQueue<int>& queue_;
+
+   public:
+    UnboundedConsumerWorker(UnboundedBlockingQueue<int>& queue) : 
queue_(queue) {}
+
+    void consume(int number) {
+        consumerThread_ = std::thread(&UnboundedConsumerWorker::popNumbers, 
this, number);
+    }
+
+    void popNumbers(int number) {
+        for (int i = 1; i <= number; i++) {
+            int poppedElement;
+            queue_.pop(poppedElement);
+        }
+    }
+
+    void join() { consumerThread_.join(); }
+};
+
+TEST(UnboundedBlockingQueueTest, testBasic) {
+    size_t size = 5;
+    UnboundedBlockingQueue<int> queue(size);
+
+    UnboundedProducerWorker producerWorker(queue);
+    producerWorker.produce(5);
+
+    UnboundedConsumerWorker consumerWorker(queue);
+    consumerWorker.consume(5);
+
+    producerWorker.join();
+    consumerWorker.join();
+
+    size_t zero = 0;
+    ASSERT_EQ(zero, queue.size());
+}
+
+TEST(UnboundedBlockingQueueTest, testQueueOperations) {
+    size_t size = 5;
+    UnboundedBlockingQueue<int> queue(size);
+    for (size_t i = 1; i <= size; i++) {
+        queue.push(i);
+    }
+    ASSERT_EQ(queue.size(), size);
+
+    int cnt = 1;
+    for (BlockingQueue<int>::const_iterator it = queue.begin(); it != 
queue.end(); it++) {
+        ASSERT_EQ(cnt, *it);
+        ++cnt;
+    }
+
+    cnt = 1;
+    for (BlockingQueue<int>::iterator it = queue.begin(); it != queue.end(); 
it++) {
+        ASSERT_EQ(cnt, *it);
+        ++cnt;
+    }
+
+    int poppedElement;
+    for (size_t i = 1; i <= size; i++) {
+        queue.pop(poppedElement);
+    }
+
+    ASSERT_FALSE(queue.peek(poppedElement));
+}
+
+TEST(UnboundedBlockingQueueTest, testBlockingProducer) {
+    size_t size = 5;
+    UnboundedBlockingQueue<int> queue(size);
+
+    UnboundedProducerWorker producerWorker(queue);
+    producerWorker.produce(8);
+
+    UnboundedConsumerWorker consumerWorker(queue);
+    consumerWorker.consume(5);
+
+    producerWorker.join();
+    consumerWorker.join();
+
+    size_t three = 3;
+    ASSERT_EQ(three, queue.size());
+}
+
+TEST(UnboundedBlockingQueueTest, testBlockingConsumer) {
+    size_t size = 5;
+    UnboundedBlockingQueue<int> queue(size);
+
+    UnboundedProducerWorker producerWorker(queue);
+    producerWorker.produce(5);
+
+    UnboundedConsumerWorker consumerWorker(queue);
+    consumerWorker.consume(8);
+
+    producerWorker.pushNumbers(3);
+
+    producerWorker.join();
+    consumerWorker.join();
+
+    size_t zero = 0;
+    ASSERT_EQ(zero, queue.size());
+}
+
+TEST(UnboundedBlockingQueueTest, testTimeout) {
+    size_t size = 5;
+    UnboundedBlockingQueue<int> queue(size);
+    int value;
+    bool popReturn = queue.pop(value, std::chrono::seconds(1));
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+    ASSERT_FALSE(popReturn);
+}
+
+TEST(UnboundedBlockingQueueTest, testCloseInterruptOnEmpty) {
+    UnboundedBlockingQueue<int> queue(10);
+    pulsar::Latch latch(1);
+
+    auto thread = std::thread([&]() {
+        int v;
+        bool res = queue.pop(v);
+        ASSERT_FALSE(res);
+        latch.countdown();
+    });
+
+    // Sleep to allow for background thread to call pop and be blocked there
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    queue.close();
+    bool wasUnblocked = latch.wait(std::chrono::seconds(5));
+
+    ASSERT_TRUE(wasUnblocked);
+    thread.join();
+}

Reply via email to