This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 85b1b53  [fix] Fix PartitionedProducerImpl::closeAsync to close 
sub-producers properly (#125)
85b1b53 is described below

commit 85b1b53f1e9ed571d813276bde4f9ad7b31b5659
Author: erobot <[email protected]>
AuthorDate: Mon Nov 28 15:37:31 2022 +0800

    [fix] Fix PartitionedProducerImpl::closeAsync to close sub-producers 
properly (#125)
    
    ### Motivation
    
    PartitionedProducerImpl do not close sub-producers properly when any 
sub-producer creation fails. Continuing to retry creating producer will 
eventually reach the maximum producer limit. It seems a regression caused by 
#54.
    
    When sub-producer creation fails, state_ is set to Failed. 
PartitionedProducerImpl::closeAsync only do cleanup when state_==Ready and 
sub-producers do not close when state_==Failed.
    
    
https://github.com/apache/pulsar-client-cpp/blob/f0268ecd29a6d0030b7d07379ec609884b4c14ff/lib/PartitionedProducerImpl.cc#L273-L276
    
    ### Modifications
    
    Close sub-producers when state != Closed.
---
 lib/PartitionedProducerImpl.cc |  7 ++----
 tests/ProducerTest.cc          | 51 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 53 insertions(+), 5 deletions(-)

diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 26d5796..0b5d452 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -266,14 +266,11 @@ void PartitionedProducerImpl::closeAsync(CloseCallback 
originalCallback) {
             originalCallback(result);
         }
     };
-    if (state_ == Closed) {
+
+    if (state_ == Closed || state_.exchange(Closing) == Closing) {
         closeCallback(ResultAlreadyClosed);
         return;
     }
-    State expectedState = Ready;
-    if (!state_.compare_exchange_strong(expectedState, Closing)) {
-        return;
-    }
 
     cancelTimers();
 
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index a0b1e7e..74d3cf2 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -373,4 +373,55 @@ TEST_P(ProducerTest, testFlushNoBatch) {
     client.close();
 }
 
+TEST(ProducerTest, testCloseSubProducerWhenFail) {
+    Client client(serviceUrl);
+
+    std::string ns = "test-close-sub-producer-when-fail";
+    std::string localName = std::string("testCloseSubProducerWhenFail") + 
std::to_string(time(nullptr));
+    std::string topicName = "persistent://public/" + ns + '/' + localName;
+    const int maxProducersPerTopic = 10;
+    const int partitionNum = 5;
+
+    // call admin api to create namespace with max prodcuer limit
+    std::string url = adminUrl + "admin/v2/namespaces/public/" + ns;
+    int res =
+        makePutRequest(url, "{\"max_producers_per_topic\": " + 
std::to_string(maxProducersPerTopic) + "}");
+    ASSERT_TRUE(res == 204 || res == 409) << "res:" << res;
+
+    // call admin api to create partitioned topic
+    res = makePutRequest(adminUrl + "admin/v2/persistent/public/" + ns + "/" + 
localName + "/partitions",
+                         std::to_string(partitionNum));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setBatchingEnabled(false);
+
+    // create producers for partition-0 up to max producer limit
+    std::vector<Producer> producers;
+    for (int i = 0; i < maxProducersPerTopic; ++i) {
+        Producer producer;
+        ASSERT_EQ(ResultOk,
+                  client.createProducer(topicName + "-partition-0", 
producerConfiguration, producer));
+        producers.push_back(producer);
+    }
+
+    // create partitioned producer, should fail because partition-0 already 
reach max producer limit
+    for (int i = 0; i < maxProducersPerTopic; ++i) {
+        Producer producer;
+        ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, 
producer));
+    }
+
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    // create producer for partition-1, should succeed
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName + "-partition-1", 
producerConfiguration, producer));
+    producers.push_back(producer);
+
+    for (auto& producer : producers) {
+        producer.close();
+    }
+    client.close();
+}
+
 INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

Reply via email to