This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit aa6d52c99949f6c9c7ebbf38fecb1a268a06b424
Author: erobot <[email protected]>
AuthorDate: Thu Dec 1 18:12:40 2022 +0800

    [fix] Close broker producer created after producer close (#131)
    
    ### Motivation
    
    Close broker producer created after producer close to prevent producers 
from keeping alive in broker, and fix a race condition here.
    
    Race condition sequence:
    1. prodcuer.start() is called
    2. handleCreateProducer() is called, and runs to right before setCnx()
    3. closeAsync() is called and returns directly after getCnx() returns 
nullptr, so no CloseProducer cmd is sent
    4. handleCreateProducer() continues
    5. Result: Producer is closed and no CloseProducer cmd is sent to broker
    
    ### Modifications
    
    * Close broker producer created after producer close
    * Use mutex to prevent race condition. handleCreateProducer() and 
closeAsync() should be low frequency operations and using mutex here should not 
affect performance.
---
 lib/ProducerImpl.cc   | 35 +++++++++++++++++++++++++++--------
 tests/ProducerTest.cc | 43 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 70 insertions(+), 8 deletions(-)

diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 7fa3ff2..a3a5a95 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -172,6 +172,8 @@ void ProducerImpl::connectionFailed(Result result) {
 
 void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result 
result,
                                         const ResponseData& responseData) {
+    Lock lock(mutex_);
+
     LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << 
strResult(result));
 
     // make sure we're still in the Pending/Ready state, closeAsync could have 
been invoked
@@ -180,11 +182,21 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
     if (state != Ready && state != Pending) {
         LOG_DEBUG("Producer created response received but producer already 
closed");
         failPendingMessages(ResultAlreadyClosed, false);
+        if (result == ResultOk || result == ResultTimeout) {
+            auto client = client_.lock();
+            if (client) {
+                int requestId = client->newRequestId();
+                cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, 
requestId), requestId);
+            }
+        }
+        if (!producerCreatedPromise_.isComplete()) {
+            lock.unlock();
+            producerCreatedPromise_.setFailed(ResultAlreadyClosed);
+        }
         return;
     }
 
     if (result == ResultOk) {
-        Lock lock(mutex_);
         // We are now reconnected to broker and clear to send messages. 
Re-send all pending messages and
         // set the cnx pointer so that new messages will be sent immediately
         LOG_INFO(getName() << "Created producer on broker " << 
cnx->cnxString());
@@ -203,7 +215,6 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
         setCnx(cnx);
         state_ = Ready;
         backoff_.reset();
-        lock.unlock();
 
         if (conf_.isEncryptionEnabled()) {
             auto weakSelf = weak_from_this();
@@ -226,6 +237,7 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
             startSendTimeoutTimer();
         }
 
+        lock.unlock();
         producerCreatedPromise_.setValue(shared_from_this());
 
     } else {
@@ -234,22 +246,26 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
             // Creating the producer has timed out. We need to ensure the 
broker closes the producer
             // in case it was indeed created, otherwise it might prevent new 
create producer operation,
             // since we are not closing the connection
-            int requestId = client_.lock()->newRequestId();
-            cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, 
requestId), requestId);
+            auto client = client_.lock();
+            if (client) {
+                int requestId = client->newRequestId();
+                cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, 
requestId), requestId);
+            }
         }
 
         if (result == ResultProducerFenced) {
             state_ = Producer_Fenced;
-            failPendingMessages(result, true);
+            failPendingMessages(result, false);
             auto client = client_.lock();
             if (client) {
                 client->cleanupProducer(this);
             }
+            lock.unlock();
             producerCreatedPromise_.setFailed(result);
         } else if (producerCreatedPromise_.isComplete()) {
             if (result == ResultProducerBlockedQuotaExceededException) {
                 LOG_WARN(getName() << "Backlog is exceeded on topic. Sending 
exception to producer");
-                
failPendingMessages(ResultProducerBlockedQuotaExceededException, true);
+                
failPendingMessages(ResultProducerBlockedQuotaExceededException, false);
             } else if (result == ResultProducerBlockedQuotaExceededError) {
                 LOG_WARN(getName() << "Producer is blocked on creation because 
backlog is exceeded on topic");
             }
@@ -264,9 +280,10 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
                 scheduleReconnection(shared_from_this());
             } else {
                 LOG_ERROR(getName() << "Failed to create producer: " << 
strResult(result));
-                failPendingMessages(result, true);
-                producerCreatedPromise_.setFailed(result);
+                failPendingMessages(result, false);
                 state_ = Failed;
+                lock.unlock();
+                producerCreatedPromise_.setFailed(result);
             }
         }
     }
@@ -694,6 +711,8 @@ void ProducerImpl::closeAsync(CloseCallback 
originalCallback) {
         }
     };
 
+    Lock lock(mutex_);
+
     // if the producer was never started then there is nothing to clean up
     State expectedState = NotStarted;
     if (state_.compare_exchange_strong(expectedState, Closed)) {
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 74d3cf2..6969735 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -424,4 +424,47 @@ TEST(ProducerTest, testCloseSubProducerWhenFail) {
     client.close();
 }
 
+TEST(ProducerTest, testCloseProducerBeforeCreated) {
+    Client client(serviceUrl);
+
+    std::string ns = "test-close-producer-before-created";
+    std::string localName = std::string("testCloseProducerBeforeCreated") + 
std::to_string(time(nullptr));
+    std::string topicName = "persistent://public/" + ns + '/' + localName;
+    const int maxProducersPerTopic = 10;
+    const int partitionNum = 5;
+
+    // call admin api to create namespace with max prodcuer limit
+    std::string url = adminUrl + "admin/v2/namespaces/public/" + ns;
+    int res =
+        makePutRequest(url, "{\"max_producers_per_topic\": " + 
std::to_string(maxProducersPerTopic) + "}");
+    ASSERT_TRUE(res == 204 || res == 409) << "res:" << res;
+
+    // call admin api to create partitioned topic
+    res = makePutRequest(adminUrl + "admin/v2/persistent/public/" + ns + "/" + 
localName + "/partitions",
+                         std::to_string(partitionNum));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setLazyStartPartitionedProducers(true);
+    
producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    producerConfiguration.setBatchingEnabled(false);
+
+    Message msg = MessageBuilder().setContent("test").build();
+    for (int i = 0; i < maxProducersPerTopic * 100; ++i) {
+        Producer producer;
+        ASSERT_EQ(ResultOk, client.createProducer(topicName, 
producerConfiguration, producer));
+        // trigger lazy producer creation
+        for (int j = 0; j < partitionNum; ++j) {
+            producer.sendAsync(msg, [](pulsar::Result, const 
pulsar::MessageId&) {});
+        }
+        producer.close();
+    }
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, {}, producer));
+    producer.close();
+
+    client.close();
+}
+
 INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

Reply via email to