This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new afeac78  Fix an issue where zero queue consumers are unable to receive 
messages after topic unloading (#473)
afeac78 is described below

commit afeac788d1e951951516b7aabee4c70af579dda2
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Sat Feb 8 14:46:42 2025 +0900

    Fix an issue where zero queue consumers are unable to receive messages 
after topic unloading (#473)
---
 lib/ConsumerImpl.cc        |  68 +++++++++++++++++++++++-------
 tests/ZeroQueueSizeTest.cc | 100 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 154 insertions(+), 14 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index d540849..250845b 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -310,15 +310,23 @@ Result ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result
     if (result == ResultOk) {
         LOG_INFO(getName() << "Created consumer on broker " << 
cnx->cnxString());
         {
-            Lock lock(mutex_);
+            Lock mutexLock(mutex_);
             setCnx(cnx);
             incomingMessages_.clear();
             possibleSendToDeadLetterTopicMessages_.clear();
             state_ = Ready;
             backoff_.reset();
-            // Complicated logic since we don't have a isLocked() function for 
mutex
-            if (waitingForZeroQueueSizeMessage) {
-                sendFlowPermitsToBroker(cnx, 1);
+            if (!messageListener_ && config_.getReceiverQueueSize() == 0) {
+                // Complicated logic since we don't have a isLocked() function 
for mutex
+                if (waitingForZeroQueueSizeMessage) {
+                    sendFlowPermitsToBroker(cnx, 1);
+                }
+                // Note that the order of lock acquisition must be mutex_ -> 
pendingReceiveMutex_,
+                // otherwise a deadlock will occur.
+                Lock pendingReceiveMutexLock(pendingReceiveMutex_);
+                if (!pendingReceives_.empty()) {
+                    sendFlowPermitsToBroker(cnx, pendingReceives_.size());
+                }
             }
             availablePermits_ = 0;
         }
@@ -915,7 +923,6 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& 
msg) {
     }
 
     // Using RAII for locking
-    ClientConnectionPtr currentCnx = getCnx().lock();
     Lock lock(mutexForReceiveWithZeroQueueSize);
 
     // Just being cautious
@@ -924,9 +931,18 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& 
msg) {
             getName() << "The incoming message queue should never be greater 
than 0 when Queue size is 0");
         incomingMessages_.clear();
     }
-    waitingForZeroQueueSizeMessage = true;
 
-    sendFlowPermitsToBroker(currentCnx, 1);
+    {
+        // Lock mutex_ to prevent a race condition with handleCreateConsumer.
+        // If handleCreateConsumer is executed after setting 
waitingForZeroQueueSizeMessage to true and
+        // before calling sendFlowPermitsToBroker, the result may be that a 
flow permit is sent twice.
+        Lock lock(mutex_);
+        waitingForZeroQueueSizeMessage = true;
+        // If connection_ is nullptr, sendFlowPermitsToBroker does nothing.
+        // In other words, a flow permit will not be sent until setCnx(cnx) is 
executed in
+        // handleCreateConsumer.
+        sendFlowPermitsToBroker(getCnx().lock(), 1);
+    }
 
     while (true) {
         if (!incomingMessages_.pop(msg)) {
@@ -939,6 +955,7 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& 
msg) {
             Lock localLock(mutex_);
             // if message received due to an old flow - discard it and wait 
for the message from the
             // latest flow command
+            ClientConnectionPtr currentCnx = getCnx().lock();
             if (msg.impl_->cnx_ == currentCnx.get()) {
                 waitingForZeroQueueSizeMessage = false;
                 // Can't use break here else it may trigger a race with 
connection opened.
@@ -966,19 +983,42 @@ void ConsumerImpl::receiveAsync(ReceiveCallback callback) 
{
         return;
     }
 
-    Lock lock(pendingReceiveMutex_);
+    if (messageListener_) {
+        LOG_ERROR(getName() << "Can not receive when a listener has been set");
+        callback(ResultInvalidConfiguration, msg);
+        return;
+    }
+
+    Lock mutexlock(mutex_, std::defer_lock);
+    if (config_.getReceiverQueueSize() == 0) {
+        // Lock mutex_ to prevent a race condition with handleCreateConsumer.
+        // If handleCreateConsumer is executed after pushing the callback to 
pendingReceives_ and
+        // before calling sendFlowPermitsToBroker, the result may be that a 
flow permit is sent twice.
+        // Note that the order of lock acquisition must be mutex_ -> 
pendingReceiveMutex_,
+        // otherwise a deadlock will occur.
+        mutexlock.lock();
+    }
+
+    Lock pendingReceiveMutexLock(pendingReceiveMutex_);
     if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
-        lock.unlock();
+        pendingReceiveMutexLock.unlock();
+        if (config_.getReceiverQueueSize() == 0) {
+            mutexlock.unlock();
+        }
         messageProcessed(msg);
         msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
         callback(ResultOk, msg);
+    } else if (config_.getReceiverQueueSize() == 0) {
+        pendingReceives_.push(callback);
+        // If connection_ is nullptr, sendFlowPermitsToBroker does nothing.
+        // In other words, a flow permit will not be sent until setCnx(cnx) is 
executed in
+        // handleCreateConsumer.
+        sendFlowPermitsToBroker(getCnx().lock(), 1);
+        pendingReceiveMutexLock.unlock();
+        mutexlock.unlock();
     } else {
         pendingReceives_.push(callback);
-        lock.unlock();
-
-        if (config_.getReceiverQueueSize() == 0) {
-            sendFlowPermitsToBroker(getCnx().lock(), 1);
-        }
+        pendingReceiveMutexLock.unlock();
     }
 }
 
diff --git a/tests/ZeroQueueSizeTest.cc b/tests/ZeroQueueSizeTest.cc
index 644f42c..b3ed066 100644
--- a/tests/ZeroQueueSizeTest.cc
+++ b/tests/ZeroQueueSizeTest.cc
@@ -27,6 +27,7 @@
 #include <mutex>
 
 #include "ConsumerTest.h"
+#include "HttpHelper.h"
 #include "lib/Latch.h"
 #include "lib/LogUtils.h"
 
@@ -37,6 +38,7 @@ using namespace pulsar;
 static int totalMessages = 10;
 static int globalCount = 0;
 static std::string lookupUrl = "pulsar://localhost:6650";
+static std::string adminUrl = "http://localhost:8080";;
 static std::string contentBase = "msg-";
 
 static void messageListenerFunction(Consumer consumer, const Message& msg, 
Latch& latch) {
@@ -287,3 +289,101 @@ TEST(ZeroQueueSizeTest, testPauseResumeNoReconnection) {
 
     client.close();
 }
+
+class ZeroQueueSizeTest : public ::testing::TestWithParam<bool> {};
+
+TEST_P(ZeroQueueSizeTest, testReceptionAfterUnloading) {
+    Client client(lookupUrl);
+    auto isAsync = GetParam();
+    std::string topicName = "zero-queue-size-reception-after-unloading";
+    if (isAsync) {
+        topicName += "-async";
+    }
+    std::string subName = "my-sub";
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consConfig;
+    consConfig.setReceiverQueueSize(0);
+    result = client.subscribe(topicName, subName, consConfig, consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    for (int i = 0; i < totalMessages / 2; i++) {
+        std::ostringstream ss;
+        ss << contentBase << i;
+        Message msg = MessageBuilder().setContent(ss.str()).build();
+        result = producer.send(msg);
+        ASSERT_EQ(ResultOk, result);
+    }
+
+    for (int i = 0; i < totalMessages / 2; i++) {
+        ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
+        std::ostringstream ss;
+        ss << contentBase << i;
+        if (isAsync) {
+            Latch latch(1);
+            consumer.receiveAsync([&consumer, &ss, &latch](Result res, const 
Message& receivedMsg) {
+                ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+                ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
+                ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
+                latch.countdown();
+            });
+            ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+        } else {
+            Message receivedMsg;
+            consumer.receive(receivedMsg);
+            ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+            ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
+            ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
+        }
+    }
+
+    // Wait for messages to be delivered while performing `receive` or 
`receiveAsync` in a separate thread.
+    // At this time, the value of availablePermits should be 1.
+    std::thread consumeThread([&consumer, &isAsync] {
+        for (int i = totalMessages / 2; i < totalMessages; i++) {
+            ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
+            std::ostringstream ss;
+            ss << contentBase << i;
+            if (isAsync) {
+                Latch latch(1);
+                consumer.receiveAsync([&consumer, &ss, &latch](Result res, 
const Message& receivedMsg) {
+                    ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+                    ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
+                    ASSERT_EQ(0, 
ConsumerTest::getNumOfMessagesInQueue(consumer));
+                    latch.countdown();
+                });
+                ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+            } else {
+                Message receivedMsg;
+                consumer.receive(receivedMsg);
+                ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+                ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
+                ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
+            }
+        }
+    });
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    int res = makePutRequest(adminUrl + "/admin/v2/persistent/public/default/" 
+ topicName + "/unload", "");
+    ASSERT_TRUE(res / 100 == 2) << "res: " << res;
+
+    for (int i = totalMessages / 2; i < totalMessages; i++) {
+        std::ostringstream ss;
+        ss << contentBase << i;
+        Message msg = MessageBuilder().setContent(ss.str()).build();
+        result = producer.send(msg);
+        ASSERT_EQ(ResultOk, result);
+    }
+
+    consumeThread.join();
+    consumer.unsubscribe();
+    consumer.close();
+    producer.close();
+    client.close();
+}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, ZeroQueueSizeTest, ::testing::Values(false, 
true));

Reply via email to