This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new afeac78 Fix an issue where zero queue consumers are unable to receive
messages after topic unloading (#473)
afeac78 is described below
commit afeac788d1e951951516b7aabee4c70af579dda2
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Sat Feb 8 14:46:42 2025 +0900
Fix an issue where zero queue consumers are unable to receive messages
after topic unloading (#473)
---
lib/ConsumerImpl.cc | 68 +++++++++++++++++++++++-------
tests/ZeroQueueSizeTest.cc | 100 +++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 154 insertions(+), 14 deletions(-)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index d540849..250845b 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -310,15 +310,23 @@ Result ConsumerImpl::handleCreateConsumer(const
ClientConnectionPtr& cnx, Result
if (result == ResultOk) {
LOG_INFO(getName() << "Created consumer on broker " <<
cnx->cnxString());
{
- Lock lock(mutex_);
+ Lock mutexLock(mutex_);
setCnx(cnx);
incomingMessages_.clear();
possibleSendToDeadLetterTopicMessages_.clear();
state_ = Ready;
backoff_.reset();
- // Complicated logic since we don't have a isLocked() function for
mutex
- if (waitingForZeroQueueSizeMessage) {
- sendFlowPermitsToBroker(cnx, 1);
+ if (!messageListener_ && config_.getReceiverQueueSize() == 0) {
+ // Complicated logic since we don't have a isLocked() function
for mutex
+ if (waitingForZeroQueueSizeMessage) {
+ sendFlowPermitsToBroker(cnx, 1);
+ }
+ // Note that the order of lock acquisition must be mutex_ ->
pendingReceiveMutex_,
+ // otherwise a deadlock will occur.
+ Lock pendingReceiveMutexLock(pendingReceiveMutex_);
+ if (!pendingReceives_.empty()) {
+ sendFlowPermitsToBroker(cnx, pendingReceives_.size());
+ }
}
availablePermits_ = 0;
}
@@ -915,7 +923,6 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message&
msg) {
}
// Using RAII for locking
- ClientConnectionPtr currentCnx = getCnx().lock();
Lock lock(mutexForReceiveWithZeroQueueSize);
// Just being cautious
@@ -924,9 +931,18 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message&
msg) {
getName() << "The incoming message queue should never be greater
than 0 when Queue size is 0");
incomingMessages_.clear();
}
- waitingForZeroQueueSizeMessage = true;
- sendFlowPermitsToBroker(currentCnx, 1);
+ {
+ // Lock mutex_ to prevent a race condition with handleCreateConsumer.
+ // If handleCreateConsumer is executed after setting
waitingForZeroQueueSizeMessage to true and
+ // before calling sendFlowPermitsToBroker, the result may be that a
flow permit is sent twice.
+ Lock lock(mutex_);
+ waitingForZeroQueueSizeMessage = true;
+ // If connection_ is nullptr, sendFlowPermitsToBroker does nothing.
+ // In other words, a flow permit will not be sent until setCnx(cnx) is
executed in
+ // handleCreateConsumer.
+ sendFlowPermitsToBroker(getCnx().lock(), 1);
+ }
while (true) {
if (!incomingMessages_.pop(msg)) {
@@ -939,6 +955,7 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message&
msg) {
Lock localLock(mutex_);
// if message received due to an old flow - discard it and wait
for the message from the
// latest flow command
+ ClientConnectionPtr currentCnx = getCnx().lock();
if (msg.impl_->cnx_ == currentCnx.get()) {
waitingForZeroQueueSizeMessage = false;
// Can't use break here else it may trigger a race with
connection opened.
@@ -966,19 +983,42 @@ void ConsumerImpl::receiveAsync(ReceiveCallback callback)
{
return;
}
- Lock lock(pendingReceiveMutex_);
+ if (messageListener_) {
+ LOG_ERROR(getName() << "Can not receive when a listener has been set");
+ callback(ResultInvalidConfiguration, msg);
+ return;
+ }
+
+ Lock mutexlock(mutex_, std::defer_lock);
+ if (config_.getReceiverQueueSize() == 0) {
+ // Lock mutex_ to prevent a race condition with handleCreateConsumer.
+ // If handleCreateConsumer is executed after pushing the callback to
pendingReceives_ and
+ // before calling sendFlowPermitsToBroker, the result may be that a
flow permit is sent twice.
+ // Note that the order of lock acquisition must be mutex_ ->
pendingReceiveMutex_,
+ // otherwise a deadlock will occur.
+ mutexlock.lock();
+ }
+
+ Lock pendingReceiveMutexLock(pendingReceiveMutex_);
if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
- lock.unlock();
+ pendingReceiveMutexLock.unlock();
+ if (config_.getReceiverQueueSize() == 0) {
+ mutexlock.unlock();
+ }
messageProcessed(msg);
msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
callback(ResultOk, msg);
+ } else if (config_.getReceiverQueueSize() == 0) {
+ pendingReceives_.push(callback);
+ // If connection_ is nullptr, sendFlowPermitsToBroker does nothing.
+ // In other words, a flow permit will not be sent until setCnx(cnx) is
executed in
+ // handleCreateConsumer.
+ sendFlowPermitsToBroker(getCnx().lock(), 1);
+ pendingReceiveMutexLock.unlock();
+ mutexlock.unlock();
} else {
pendingReceives_.push(callback);
- lock.unlock();
-
- if (config_.getReceiverQueueSize() == 0) {
- sendFlowPermitsToBroker(getCnx().lock(), 1);
- }
+ pendingReceiveMutexLock.unlock();
}
}
diff --git a/tests/ZeroQueueSizeTest.cc b/tests/ZeroQueueSizeTest.cc
index 644f42c..b3ed066 100644
--- a/tests/ZeroQueueSizeTest.cc
+++ b/tests/ZeroQueueSizeTest.cc
@@ -27,6 +27,7 @@
#include <mutex>
#include "ConsumerTest.h"
+#include "HttpHelper.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"
@@ -37,6 +38,7 @@ using namespace pulsar;
static int totalMessages = 10;
static int globalCount = 0;
static std::string lookupUrl = "pulsar://localhost:6650";
+static std::string adminUrl = "http://localhost:8080";
static std::string contentBase = "msg-";
static void messageListenerFunction(Consumer consumer, const Message& msg,
Latch& latch) {
@@ -287,3 +289,101 @@ TEST(ZeroQueueSizeTest, testPauseResumeNoReconnection) {
client.close();
}
+
+class ZeroQueueSizeTest : public ::testing::TestWithParam<bool> {};
+
+TEST_P(ZeroQueueSizeTest, testReceptionAfterUnloading) {
+ Client client(lookupUrl);
+ auto isAsync = GetParam();
+ std::string topicName = "zero-queue-size-reception-after-unloading";
+ if (isAsync) {
+ topicName += "-async";
+ }
+ std::string subName = "my-sub";
+
+ Producer producer;
+ Result result = client.createProducer(topicName, producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ ConsumerConfiguration consConfig;
+ consConfig.setReceiverQueueSize(0);
+ result = client.subscribe(topicName, subName, consConfig, consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ for (int i = 0; i < totalMessages / 2; i++) {
+ std::ostringstream ss;
+ ss << contentBase << i;
+ Message msg = MessageBuilder().setContent(ss.str()).build();
+ result = producer.send(msg);
+ ASSERT_EQ(ResultOk, result);
+ }
+
+ for (int i = 0; i < totalMessages / 2; i++) {
+ ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
+ std::ostringstream ss;
+ ss << contentBase << i;
+ if (isAsync) {
+ Latch latch(1);
+ consumer.receiveAsync([&consumer, &ss, &latch](Result res, const
Message& receivedMsg) {
+ ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+ ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
+ ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
+ latch.countdown();
+ });
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+ } else {
+ Message receivedMsg;
+ consumer.receive(receivedMsg);
+ ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+ ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
+ ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
+ }
+ }
+
+ // Wait for messages to be delivered while performing `receive` or
`receiveAsync` in a separate thread.
+ // At this time, the value of availablePermits should be 1.
+ std::thread consumeThread([&consumer, &isAsync] {
+ for (int i = totalMessages / 2; i < totalMessages; i++) {
+ ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
+ std::ostringstream ss;
+ ss << contentBase << i;
+ if (isAsync) {
+ Latch latch(1);
+ consumer.receiveAsync([&consumer, &ss, &latch](Result res,
const Message& receivedMsg) {
+ ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+ ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
+ ASSERT_EQ(0,
ConsumerTest::getNumOfMessagesInQueue(consumer));
+ latch.countdown();
+ });
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+ } else {
+ Message receivedMsg;
+ consumer.receive(receivedMsg);
+ ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+ ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
+ ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
+ }
+ }
+ });
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ int res = makePutRequest(adminUrl + "/admin/v2/persistent/public/default/"
+ topicName + "/unload", "");
+ ASSERT_TRUE(res / 100 == 2) << "res: " << res;
+
+ for (int i = totalMessages / 2; i < totalMessages; i++) {
+ std::ostringstream ss;
+ ss << contentBase << i;
+ Message msg = MessageBuilder().setContent(ss.str()).build();
+ result = producer.send(msg);
+ ASSERT_EQ(ResultOk, result);
+ }
+
+ consumeThread.join();
+ consumer.unsubscribe();
+ consumer.close();
+ producer.close();
+ client.close();
+}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, ZeroQueueSizeTest, ::testing::Values(false,
true));