This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 95b5ef8be37da96c99711bddad2d7996dd80007d Author: Yunze Xu <[email protected]> AuthorDate: Thu Apr 14 14:59:46 2022 +0800 [C++] Fix UnknownError might be returned for a partitioned producer (#15161) Fixes #15078 When a partitioned producer is created and some of the partitioned failed to create, `closeAsync` will be called immediately, even if other partitions were still in progress of creating the associated single producers. Since `closeAsync` is called before calling `setFailed` on the `partitionedProducerCreatedPromise_` field, there is a race condition that all single producers are closed before the promise is set. Then the promise will be set with `ResultUnknownError`, see https://github.com/apache/pulsar/blob/4aeeed5dab9dfe9493526f36d539b3ef29cf6fe5/pulsar-client-cpp/lib/PartitionedProducerImpl.cc#L317. Only after all single producers failed or succeeded then call `closeAsync` if one of them failed. And ensure `partitionedProducerCreatedPromise_` was completed before calling `closeAsync`. This PR also makes the state of a partitioned producer atomic because using a mutex to protect it makes code hard to write. Create a separate namespace `public/test-backlog-quotas` to test the case when the backlog quota exceeds. Then add `testBacklogQuotasExceeded` test to make some backlog via creating a consumer and sending some messages to a partition of the topic. In this test, only 1 partition has backlog and will fail with the related error. So the test verifies that `createProducer` could return a correct error instead of `ResultUnknownError`. (cherry picked from commit 0f8559611914a17efe222bf6ed1dd621d5a9cf45) --- pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 55 +++++++++--------------- pulsar-client-cpp/lib/PartitionedProducerImpl.h | 12 ++---- pulsar-client-cpp/pulsar-test-service-start.sh | 4 ++ pulsar-client-cpp/tests/ProducerTest.cc | 43 ++++++++++++++++++ 4 files changed, 71 insertions(+), 43 deletions(-) diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index bdd23ed6c91..9f197f0d45f 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -136,28 +136,29 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result unsigned int partitionIndex) { // to indicate, we are doing cleanup using closeAsync after producer create // has failed and the invocation of closeAsync is not from client - CloseCallback closeCallback = NULL; - Lock lock(mutex_); + const auto numPartitions = getNumPartitionsWithLock(); + assert(numProducersCreated_ <= numPartitions && partitionIndex <= numPartitions); + if (state_ == Failed) { - // Ignore, we have already informed client that producer creation failed + // We have already informed client that producer creation failed + if (++numProducersCreated_ == numPartitions) { + closeAsync(nullptr); + } return; } - const auto numPartitions = getNumPartitionsWithLock(); - assert(numProducersCreated_ <= numPartitions); + if (result != ResultOk) { - state_ = Failed; - lock.unlock(); - closeAsync(closeCallback); - partitionedProducerCreatedPromise_.setFailed(result); LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " << result); + partitionedProducerCreatedPromise_.setFailed(result); + state_ = Failed; + if (++numProducersCreated_ == numPartitions) { + closeAsync(nullptr); + } return; } - assert(partitionIndex <= numPartitions); - numProducersCreated_++; - if (numProducersCreated_ == numPartitions) { + if (++numProducersCreated_ == numPartitions) { state_ = Ready; - lock.unlock(); if (partitionsUpdateTimer_) { runPartitionUpdateTask(); } @@ -181,7 +182,7 @@ void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partition // override void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) { - if (!assertState(Ready)) { + if (state_ != Ready) { callback(ResultAlreadyClosed, msg.getMessageId()); return; } @@ -211,18 +212,7 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac } // override -void PartitionedProducerImpl::shutdown() { setState(Closed); } - -void PartitionedProducerImpl::setState(const PartitionedProducerState state) { - Lock lock(mutex_); - state_ = state; - lock.unlock(); -} - -bool PartitionedProducerImpl::assertState(const PartitionedProducerState state) { - Lock lock(mutex_); - return state_ == state; -} +void PartitionedProducerImpl::shutdown() { state_ = Closed; } const std::string& PartitionedProducerImpl::getProducerName() const { Lock producersLock(producersMutex_); @@ -251,7 +241,10 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const { * create one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure */ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) { - setState(Closing); + if (state_ == Closing || state_ == Closed) { + return; + } + state_ = Closing; unsigned int producerAlreadyClosed = 0; @@ -280,7 +273,7 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) { * handleSinglePartitionProducerCreated */ if (producerAlreadyClosed == numProducers && closeCallback) { - setState(Closed); + state_ = Closed; closeCallback(ResultOk); } } @@ -288,14 +281,12 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) { void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result, const unsigned int partitionIndex, CloseCallback callback) { - Lock lock(mutex_); if (state_ == Failed) { // we should have already notified the client by callback return; } if (result != ResultOk) { state_ = Failed; - lock.unlock(); LOG_ERROR("Closing the producer failed for partition - " << partitionIndex); if (callback) { callback(result); @@ -309,7 +300,6 @@ void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result, // closed all successfully if (!numProducersCreated_) { state_ = Closed; - lock.unlock(); // set the producerCreatedPromise to failure, if client called // closeAsync and it's not failure to create producer, the promise // is set second time here, first time it was successful. So check @@ -395,7 +385,6 @@ void PartitionedProducerImpl::getPartitionMetadata() { void PartitionedProducerImpl::handleGetPartitions(Result result, const LookupDataResultPtr& lookupDataResult) { - Lock stateLock(mutex_); if (state_ != Ready) { return; } @@ -428,11 +417,9 @@ void PartitionedProducerImpl::handleGetPartitions(Result result, } bool PartitionedProducerImpl::isConnected() const { - Lock stateLock(mutex_); if (state_ != Ready) { return false; } - stateLock.unlock(); Lock producersLock(producersMutex_); const auto producers = producers_; diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h index 874d6cda526..0a8c10e2213 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h @@ -30,7 +30,7 @@ namespace pulsar { class PartitionedProducerImpl : public ProducerImplBase, public std::enable_shared_from_this<PartitionedProducerImpl> { public: - enum PartitionedProducerState + enum State { Pending, Ready, @@ -73,8 +73,6 @@ class PartitionedProducerImpl : public ProducerImplBase, void notifyResult(CloseCallback closeCallback); - void setState(PartitionedProducerState state); - friend class PulsarFriend; private: @@ -83,7 +81,7 @@ class PartitionedProducerImpl : public ProducerImplBase, const TopicNamePtr topicName_; const std::string topic_; - unsigned int numProducersCreated_ = 0; + std::atomic_uint numProducersCreated_{0}; /* * set when one or more Single Partition Creation fails, close will cleanup and fail the create callbackxo @@ -99,10 +97,7 @@ class PartitionedProducerImpl : public ProducerImplBase, mutable std::mutex producersMutex_; MessageRoutingPolicyPtr routerPolicy_; - // mutex_ is used to share state_, and numProducersCreated_ - mutable std::mutex mutex_; - - PartitionedProducerState state_ = Pending; + std::atomic<State> state_{Pending}; // only set this promise to value, when producers on all partitions are created. Promise<Result, ProducerImplBaseWeakPtr> partitionedProducerCreatedPromise_; @@ -124,7 +119,6 @@ class PartitionedProducerImpl : public ProducerImplBase, void runPartitionUpdateTask(); void getPartitionMetadata(); void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata); - bool assertState(const PartitionedProducerState state); }; } // namespace pulsar diff --git a/pulsar-client-cpp/pulsar-test-service-start.sh b/pulsar-client-cpp/pulsar-test-service-start.sh index 248d628b9c2..2bee18e64b9 100755 --- a/pulsar-client-cpp/pulsar-test-service-start.sh +++ b/pulsar-client-cpp/pulsar-test-service-start.sh @@ -106,6 +106,10 @@ $PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default-4 \ --role "anonymous" $PULSAR_DIR/bin/pulsar-admin namespaces set-encryption-required public/default-4 -e +# Create "public/test-backlog-quotas" to test backlog quotas policy +$PULSAR_DIR/bin/pulsar-admin namespaces create public/test-backlog-quotas \ + --clusters standalone + # Create "private" tenant $PULSAR_DIR/bin/pulsar-admin tenants create private -r "" -c "standalone" diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc index 210f01345d4..14461429da3 100644 --- a/pulsar-client-cpp/tests/ProducerTest.cc +++ b/pulsar-client-cpp/tests/ProducerTest.cc @@ -240,4 +240,47 @@ TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) { client.close(); LOG_INFO("End of run " << run); } +} + +TEST(ProducerTest, testBacklogQuotasExceeded) { + std::string ns = "public/test-backlog-quotas"; + std::string topic = ns + "/testBacklogQuotasExceeded" + std::to_string(time(nullptr)); + + int res = makePutRequest(adminUrl + "admin/v2/persistent/" + topic + "/partitions", "5"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + LOG_INFO("Created topic " << topic << " with 5 partitions"); + + auto setBacklogPolicy = [&ns](const std::string& policy, int limitSize) { + const auto body = + R"({"policy":")" + policy + R"(","limitSize":)" + std::to_string(limitSize) + "}"; + int res = makePostRequest(adminUrl + "admin/v2/namespaces/" + ns + "/backlogQuota", body); + LOG_INFO(res << " | Change the backlog policy to: " << body); + ASSERT_TRUE(res == 204 || res == 409); + }; + + Client client(serviceUrl); + + // Create a topic with backlog size that is greater than 1024 + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); // create a cursor + Producer producer; + + const auto partition = topic + "-partition-0"; + ASSERT_EQ(ResultOk, client.createProducer(partition, producer)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(std::string(1024L, 'a')).build())); + ASSERT_EQ(ResultOk, producer.close()); + + setBacklogPolicy("producer_request_hold", 1024); + ASSERT_EQ(ResultProducerBlockedQuotaExceededError, client.createProducer(topic, producer)); + ASSERT_EQ(ResultProducerBlockedQuotaExceededError, client.createProducer(partition, producer)); + + setBacklogPolicy("producer_exception", 1024); + ASSERT_EQ(ResultProducerBlockedQuotaExceededException, client.createProducer(topic, producer)); + ASSERT_EQ(ResultProducerBlockedQuotaExceededException, client.createProducer(partition, producer)); + + setBacklogPolicy("consumer_backlog_eviction", 1024); + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + ASSERT_EQ(ResultOk, client.createProducer(partition, producer)); + + client.close(); } \ No newline at end of file
