This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 85b1b53 [fix] Fix PartitionedProducerImpl::closeAsync to close
sub-producers properly (#125)
85b1b53 is described below
commit 85b1b53f1e9ed571d813276bde4f9ad7b31b5659
Author: erobot <[email protected]>
AuthorDate: Mon Nov 28 15:37:31 2022 +0800
[fix] Fix PartitionedProducerImpl::closeAsync to close sub-producers
properly (#125)
### Motivation
PartitionedProducerImpl do not close sub-producers properly when any
sub-producer creation fails. Continuing to retry creating producer will
eventually reach the maximum producer limit. It seems a regression caused by
#54.
When sub-producer creation fails, state_ is set to Failed.
PartitionedProducerImpl::closeAsync only do cleanup when state_==Ready and
sub-producers do not close when state_==Failed.
https://github.com/apache/pulsar-client-cpp/blob/f0268ecd29a6d0030b7d07379ec609884b4c14ff/lib/PartitionedProducerImpl.cc#L273-L276
### Modifications
Close sub-producers when state != Closed.
---
lib/PartitionedProducerImpl.cc | 7 ++----
tests/ProducerTest.cc | 51 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 53 insertions(+), 5 deletions(-)
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 26d5796..0b5d452 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -266,14 +266,11 @@ void PartitionedProducerImpl::closeAsync(CloseCallback
originalCallback) {
originalCallback(result);
}
};
- if (state_ == Closed) {
+
+ if (state_ == Closed || state_.exchange(Closing) == Closing) {
closeCallback(ResultAlreadyClosed);
return;
}
- State expectedState = Ready;
- if (!state_.compare_exchange_strong(expectedState, Closing)) {
- return;
- }
cancelTimers();
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index a0b1e7e..74d3cf2 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -373,4 +373,55 @@ TEST_P(ProducerTest, testFlushNoBatch) {
client.close();
}
+TEST(ProducerTest, testCloseSubProducerWhenFail) {
+ Client client(serviceUrl);
+
+ std::string ns = "test-close-sub-producer-when-fail";
+ std::string localName = std::string("testCloseSubProducerWhenFail") +
std::to_string(time(nullptr));
+ std::string topicName = "persistent://public/" + ns + '/' + localName;
+ const int maxProducersPerTopic = 10;
+ const int partitionNum = 5;
+
+ // call admin api to create namespace with max prodcuer limit
+ std::string url = adminUrl + "admin/v2/namespaces/public/" + ns;
+ int res =
+ makePutRequest(url, "{\"max_producers_per_topic\": " +
std::to_string(maxProducersPerTopic) + "}");
+ ASSERT_TRUE(res == 204 || res == 409) << "res:" << res;
+
+ // call admin api to create partitioned topic
+ res = makePutRequest(adminUrl + "admin/v2/persistent/public/" + ns + "/" +
localName + "/partitions",
+ std::to_string(partitionNum));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ ProducerConfiguration producerConfiguration;
+ producerConfiguration.setBatchingEnabled(false);
+
+ // create producers for partition-0 up to max producer limit
+ std::vector<Producer> producers;
+ for (int i = 0; i < maxProducersPerTopic; ++i) {
+ Producer producer;
+ ASSERT_EQ(ResultOk,
+ client.createProducer(topicName + "-partition-0",
producerConfiguration, producer));
+ producers.push_back(producer);
+ }
+
+ // create partitioned producer, should fail because partition-0 already
reach max producer limit
+ for (int i = 0; i < maxProducersPerTopic; ++i) {
+ Producer producer;
+ ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName,
producer));
+ }
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ // create producer for partition-1, should succeed
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName + "-partition-1",
producerConfiguration, producer));
+ producers.push_back(producer);
+
+ for (auto& producer : producers) {
+ producer.close();
+ }
+ client.close();
+}
+
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));